好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

RxJS中的Observable和Observer示例详解

引言

最近在项目当中别的小伙伴使用到了 Rxjs ,我一眼看上去有点懵,感觉挺复杂,挺绕的。于是抓紧补补课,然后就可以和小伙伴们一起交流怎么能优雅的使用 Rxjs 。由于内容比较多,会分为三篇来讲解说明

初识 RxJS中的 Observable 和 Observer 细说 RxJS中的 Operators 在谈 RxJS中的 Subject 和 Schedulers

概念

RxJS是一个库,可以使用可观察队列来编写异步和基于事件的程序的库。

RxJS 中管理和解决异步事件的几个关键点:

Observable: 表示未来值或事件的可调用集合的概念。 Observer: 是一个回调集合,它知道如何监听 Observable 传递的值。 Subscription: 表示一个 Observable 的执行,主要用于取消执行。 Operators :** 是纯函数,可以使用函数式编程风格来处理具有 map 、 filter 、 concat 、 reduce 等操作的集合。 Subject: 相当于一个EventEmitter,也是将一个值或事件多播到多个Observers的唯一方式。 Schedulers : 是控制并发的集中调度程序,允许我们在计算发生在 eg  setTimeout or  requestAnimationFrame 或者其它上时进行协调。

牛刀小试

我们通过在dom上绑定事件的小案例,感受一下Rxjs的魅力。

在dom绑定事件,我们通常这样处理

document.addEventListener('click', () => console.log('Clicked!'));

用Rxjs创建一个 observable ,内容如下

import { fromEvent } from 'rxjs';
fromEvent(document, 'click').subscribe(() => console.log('Clicked!'));

这时候我们简单升级一下,需要记录一下点击的数量

let count = 0;
document.addEventListener('click', () => console.log(`Clicked ${++count} times`));

用Rxjs可以隔离状态,

import { fromEvent, scan } from 'rxjs';
fromEvent(document, 'click')
  .pipe(scan((count) => count + 1, 0))
  .subscribe((count) => console.log(`Clicked ${count} times`));

可以看到,我们用到了 scan 操作符,该操作符的工作方式和数组的 reduce 类似,回调函数接收一个值, 回调的返回值作为下一次回调运行暴露的一个值。

通过上面的案例可以看出, RxJS 的强大之处在于它能够使用纯函数生成值。这意味着您的代码不太容易出错。 通常你会创建一个不纯的函数,你的代码的其他部分可能会弄乱你的状态。

这时候,需求又有变动了,要求我们一秒内只能有一次点击

let count = 0;
let rate = 1000;
let lastClick = Date.now() - rate;
document.addEventListener('click', () => {
  if (Date.now() - lastClick >= rate) {
    console.log(`Clicked ${++count} times`);
    lastClick = Date.now();
  }
});

使用Rxjs

fromEvent(document, 'click')
  .pipe(
    throttleTime(1000),
    scan((count) => count + 1, 0)
  )
  .subscribe((count) => console.log(`Clicked ${count} times`));

RxJS 有一系列的操作符,可以帮助你控制事件如何在你的 observables 中流动。

这时候,我们要每次累计鼠标x的值

let count = 0;
const rate = 1000;
let lastClick = Date.now() - rate;
document.addEventListener('click', (event) => {
  if (Date.now() - lastClick >= rate) {
    count += event.clientX;
    console.log(count);
    lastClick = Date.now();
  }
});

使用Rxjs

import { fromEvent, throttleTime, map, scan } from 'rxjs';
fromEvent(document, 'click')
  .pipe(
    throttleTime(1000),
    map((event) => event.clientX),
    scan((count, clientX) => count + clientX, 0)
  )
  .subscribe((count) => console.log(count));

从上面看可以通过 map 去转换 observables 的值。

Observable

我们先来写一个案例代码,大家可以猜下它的执行顺序

import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  setTimeout(() => {
    subscriber.next(4);
    subscriber测试数据plete();
  }, 1000);
});
console.log('just before subscribe');
observable.subscribe({
  next(x) { console.log('got value ' + x); },
  error(err) { console.error('something wrong occurred: ' + err); },
  complete() { console.log('done'); }
});
console.log('just after subscribe');

可以稍微想一下,正确的输出结果

just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done

怎么样,和大家想的结果一样吗,我们来一下分析一下。

Observable 剖析

Observable 有两种方式创建,一种是通过 new Observable() ,还有一种是通过 Rx.Observable.create() 的方式去创建。

Observable 核心的关注点:

创建 Observable 订阅 Observable 执行 Observable 取消 Observable

创建Observable

const observable = new Observable(function subscribe(subscriber) {
  const id = setInterval(() => {
    subscriber.next('hi')
  }, 1000);
});

该代码是创建一个 Observable ,然后每隔1s向订阅者发送消息。我们看到上边的回调函数是 subscribe , 该函数是描述 Observable 最重要的部分。

订阅Observable

observable.subscribe(x => console.log(x));

observable中的 subscribe 中参数是一个回调 x => console.log(x) ,官方叫它 Observer ,其实 Observer 有多种形式,后边我们会说到,在这里就简单理解, Observer 可以去消费数据,比如,在react中,我们这可以更新状态数据等。

执行Observable

 subscriber.next(1);   // Next 通知
 subscriber测试数据plete(); // 完成 通知
 subscriber.error(err);  // Error 通知

其实就是执行一个惰性计算,可同步可异步,

Observable Execution 可以传递三种类型的值:

Next :发送数值、字符串、对象等。 Error :发送 JavaScript 错误或异常。 complete :不发送值。

Next 通知是最重要和最常见的类型:它们代表传递给订阅者的实际数据。在 Observable 执行期间, Error 和 complete 通知可能只发生一次,并且只能有其中之一。

取消Observable

function subscribe(subscriber) {
  const intervalId = setInterval(() => {
    subscriber.next('hi');
  }, 1000);
  return function unsubscribe() {
    clearInterval(intervalId);
  };
}
const observable = new Observable(subscribe)
const unsubscribe = observable.subscribe({next: (x) => console.log(x)});
// Later:
unsubscribe(); // 取消执行

我们有看代码,创建了一个每秒输出一个 hi 内容的 Observable ,但在我们的使用场景中,会有取消改行为,这时候就需要返回一个 unsubscribe 的方法,用于取消。

Observer

我们在上边的场景中也提到了 Observer , 但什么是 Observer 呢,其实就是数据的消费者,先回顾一下上面的代码

observable.subscribe(x =&gt; console.log(x));

其实可以写成

const observer = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};
observable.subscribe(observer);

这样应就比较清晰了, observer 只是具有三个回调的对象,每一个用于 Observable 可能传递不同类型的通知。注意, observer 对象中的类型可以不必要全都写。

其实 observer 有许多变种,我们看下它的TS声明就比较清楚了。

可以直接传递一个 observer 对象,或者只传递一个 next 回调函数,在或者传多个可选的回调函数类型。

结束语

RxJS 不建议大家盲目的去用,一定要有合适的场景,盲目的去用可能会造成项目的复杂度会大幅度的提升。

以上就是RxJS中的Observable和Observer示例详解的详细内容,更多关于RxJS Observable Observer的资料请关注其它相关文章!

查看更多关于RxJS中的Observable和Observer示例详解的详细内容...

  阅读:31次