# Rxjs 介绍
参考手册
中文手册
RxJS 是 ReactiveX 编程理念的 javascript 版本。reactiveX 来自微软,它是一种针对异步数据流的编程。简单来说,它将一切数据,包括 HTTP 请求,DOM 事件或者普通数据等包装成流的形式,然后用强大丰富的操作符对流进行处理,使你能以同步编程的方式处理异步数据,并组合不同的操作符来轻松优雅的实现你所需要的功能。
RxJS 相当于 es6 的 Promise,但是比 Promise 更强大,比如 RxJS 中可以中途撤回、可以发射多个值、并且提供了多种工具函数等。Angular 引入 RxJS 为了就是让异步可控、更简单。RxJS 里面提供了很多模块。这里我们主要了解下最常用的 Observable 和 fromEvent。
目前常见的异步编程的几种方法:
- 回调函数
- 事件监听 / 发布订阅
- Promise
- RxJS
# 创建项目
| D:\angular>ng new angulardemo09 |
| D:\angular>cd angulardemo09 |
| D:\angular\angulardemo09>ng g component components/home |
| D:\angular\angulardemo09>ng g service services/request |
| D:\angular\angulardemo09>ng serve --open |
# 简单演示
# services 服务提供数据
| |
| getData(){ |
| return 'this is service data' |
| } |
| |
| |
| getAsyncData(){ |
| setTimeout(() => { |
| let name = '张三-async'; |
| return name; |
| }, 1000) |
| } |
| |
| |
| getCallbackData(cb:any){ |
| setTimeout(() => { |
| let name = '张三-callback'; |
| cb(name); |
| return name; |
| }, 1000) |
| } |
| |
| |
| getPromiseData(){ |
| return new Promise((resolve, reject)=>{ |
| setTimeout(() => { |
| let name = '张三-Promise'; |
| resolve(name); |
| }, 1000) |
| }) |
| } |
| |
| |
| getRxJSData(){ |
| return new Observable((observer)=>{ |
| setTimeout(() => { |
| let name = '张三-rxJS'; |
| observer.next(name); |
| |
| }, 3000) |
| }) |
| } |
# 组件里面调用服务获取数据
| ngOnInit(): void { |
| |
| let data = this.request.getData(); |
| console.log(data); |
| |
| |
| let callbackData = this.request.getAsyncData(); |
| console.log(callbackData); |
| |
| |
| this.request.getCallbackData((data:any)=>{ |
| console.log(data) |
| }); |
| |
| |
| let promiseData = this.request.getPromiseData(); |
| promiseData.then((data)=>{ |
| console.log(data); |
| }) |
| |
| |
| let rxJSData = this.request.getRxJSData(); |
| rxJSData.subscribe((data)=>{ |
| console.log(data); |
| }) |
| } |
# RxJS 取消订阅
Promise 创建之后无法撤回,但是 RxJS 可以。同上面 services 里的例子:
# 服务
| |
| getRxJSData(){ |
| return new Observable((observer)=>{ |
| setTimeout(() => { |
| let name = '张三-rxJS'; |
| observer.next(name); |
| |
| }, 3000) |
| }) |
| } |
# 组件
| |
| let stream = this.request.getRxJSData(); |
| let d = stream.subscribe((data)=>{ |
| console.log(data); |
| }) |
| setTimeout(() => { |
| d.unsubscribe(); |
| }, 1000) |
# RxJS 订阅后多次执行
比如我们有个异步方法,要求每个 2s 返回数据(定时器),如果使用 Promise,那么这个方法只会执行 1 次,此时就可以使用 RxJS。每隔 2s 去拿数据。
# 测试 Promise 是否可以多次执行
# 服务提供数据
| |
| getPromiseIntervalData(){ |
| return new Promise((resolve)=>{ |
| setInterval(() => { |
| let name = '张三-Promise-interval'; |
| resolve(name); |
| }, 1000) |
| }) |
| } |
# 组件调用服务
| |
| let intervalData = this.request.getPromiseIntervalData(); |
| intervalData.then((data)=>{ |
| console.log(data); |
| }) |
# 测试 RxJS 是否可以多次执行
# 服务提供数据
| getRxJSIntervalData(){ |
| let count = 0; |
| return new Observable((observer)=>{ |
| setInterval(() => { |
| let name = '张三-rxJS-interval-' + count; |
| observer.next(name); |
| count++; |
| }, 1000) |
| }) |
# 组件调用服务
| let streamInterval = this.request.getRxJSIntervalData(); |
| streamInterval.subscribe((data)=>{ |
| console.log(data); |
| }) |
# 工具函数
Angular6.x 之前使用 RxJS 的工具函数 map 和 filter,filter 过滤数据,map 处理数据。
# 使用 filter 过滤偶数
# 服务提供数据
| getRxJSIntervalNum(){ |
| let count = 0; |
| return new Observable((observer)=>{ |
| setInterval(() => { |
| observer.next(count); |
| count++; |
| }, 1000) |
| }) |
| } |
# 组件调用数据并过滤
| let streamNum = this.request.getRxJSIntervalNum(); |
| streamNum.pipe(filter((value:any)=>{ |
| return value % 2 === 0; |
| })) |
| .subscribe((data)=>{ |
| console.log(data); |
| }) |
# 使用 map 处理数据
# 服务提供数据
| getRxJSIntervalNum(){ |
| let count = 0; |
| return new Observable((observer)=>{ |
| setInterval(() => { |
| observer.next(count); |
| count++; |
| }, 1000) |
| }) |
| } |
# 组件调用服务提供的数据并处理
| let streamNum = this.request.getRxJSIntervalNum(); |
| streamNum.pipe(map((value:any)=>{ |
| return value * value; |
| })) |
| .subscribe((data)=>{ |
| console.log(data); |
| }) |
# map 和 filter 结合使用
# 服务
| getRxJSIntervalNum(){ |
| let count = 0; |
| return new Observable((observer)=>{ |
| setInterval(() => { |
| observer.next(count); |
| count++; |
| }, 1000) |
| }) |
| } |
# 组件
| let streamNum = this.request.getRxJSIntervalNum(); |
| streamNum.pipe( |
| filter((value:any)=>{ |
| return value % 2 === 0; |
| }), |
| map((value:any)=>{ |
| return value * value; |
| }) |
| ).subscribe((data)=>{ |
| console.log(data); |
| }) |
# 详细代码
# 根组件
# app.module.js
| import { NgModule } from '@angular/core'; |
| import { BrowserModule } from '@angular/platform-browser'; |
| |
| import { AppComponent } from './app.component'; |
| import { HomeComponent } from './components/home/home.component'; |
| |
| import { RequestService } from './services/request.service'; |
| |
| @NgModule({ |
| declarations: [ |
| AppComponent, |
| HomeComponent |
| ], |
| imports: [ |
| BrowserModule |
| ], |
| providers: [RequestService], |
| bootstrap: [AppComponent] |
| }) |
| export class AppModule { } |
# app.component.html
# request 服务
# request.service.ts
| import { Injectable } from '@angular/core'; |
| |
| import { Observable } from 'rxjs' |
| |
| @Injectable({ |
| providedIn: 'root' |
| }) |
| export class RequestService { |
| |
| constructor() { } |
| |
| |
| getData(){ |
| return 'this is service data' |
| } |
| |
| |
| getAsyncData(){ |
| setTimeout(() => { |
| let name = '张三-async'; |
| return name; |
| }, 1000) |
| } |
| |
| |
| getCallbackData(cb:any){ |
| setTimeout(() => { |
| let name = '张三-callback'; |
| cb(name); |
| return name; |
| }, 1000) |
| } |
| |
| |
| getPromiseData(){ |
| return new Promise((resolve, reject)=>{ |
| setTimeout(() => { |
| let name = '张三-Promise'; |
| resolve(name); |
| }, 1000) |
| }) |
| } |
| |
| |
| getRxJSData(){ |
| return new Observable((observer)=>{ |
| setTimeout(() => { |
| let name = '张三-rxJS'; |
| observer.next(name); |
| |
| }, 3000) |
| }) |
| } |
| |
| |
| getPromiseIntervalData(){ |
| return new Promise((resolve)=>{ |
| setInterval(() => { |
| let name = '张三-Promise'; |
| resolve(name); |
| }, 1000) |
| }) |
| } |
| |
| getRxJSIntervalData(){ |
| let count = 0; |
| return new Observable((observer)=>{ |
| setInterval(() => { |
| let name = '张三-rxJS-interval-' + count; |
| observer.next(name); |
| count++; |
| }, 1000) |
| }) |
| } |
| |
| getRxJSIntervalNum(){ |
| let count = 0; |
| return new Observable((observer)=>{ |
| setInterval(() => { |
| observer.next(count); |
| count++; |
| }, 1000) |
| }) |
| } |
| } |
# home 组件
# home.component.ts
| import { Component, OnInit } from '@angular/core'; |
| |
| import { RequestService } from 'src/app/services/request.service'; |
| |
| import { map, filter } from 'rxjs/operators'; |
| |
| @Component({ |
| selector: 'app-home', |
| templateUrl: './home.component.html', |
| styleUrls: ['./home.component.scss'] |
| }) |
| export class HomeComponent implements OnInit { |
| |
| constructor(public request:RequestService) { |
| |
| } |
| |
| ngOnInit(): void { |
| |
| let data = this.request.getData(); |
| console.log(data); |
| |
| |
| let callbackData = this.request.getAsyncData(); |
| console.log(callbackData); |
| |
| |
| this.request.getCallbackData((data:any)=>{ |
| console.log(data) |
| }); |
| |
| |
| let promiseData = this.request.getPromiseData(); |
| promiseData.then((data)=>{ |
| console.log(data); |
| }) |
| |
| |
| let rxJSData = this.request.getRxJSData(); |
| rxJSData.subscribe((data)=>{ |
| console.log(data); |
| }) |
| |
| |
| let stream = this.request.getRxJSData(); |
| let d = stream.subscribe((data)=>{ |
| console.log(data); |
| }) |
| setTimeout(() => { |
| d.unsubscribe(); |
| }, 1000) |
| |
| |
| let intervalData = this.request.getPromiseIntervalData(); |
| intervalData.then((data)=>{ |
| console.log(data); |
| }) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| } |
| |
| } |