什么是Subject?在RxJS中,Subject是一类特别的Observable,它能够向多个Observer多路推送数值。一般的Observable并不具有多路推送的才能(每个
什么是Subject? 在RxJS中,Subject是一类特别的Observable,它能够向多个Observer多路推送数值。一般的Observable并不具有多路推送的才能(每个Observer都有本身自力的实行环境),而Subject能够同享一个实行环境。
Subject是一种能够多路推送的可视察对象。与EventEmitter类似,Subject保护着本身的Observer。
每个Subject都是一个Observable(可视察对象) 关于一个Subject,你能够定阅(subscribe
)它,Observer会和平常一样接收到数据。从Observer的视角看,它并不能辨别本身的实行环境是一般Observable的单路推送照样基于Subject的多路推送。
Subject的内部完成中,并不会在被定阅(subscribe
)后建立新的实行环境。它仅仅会把新的Observer注册在由它本身保护的Observer列表中,这和其他言语、库中的addListener
机制类似。
每个Subject也能够作为Observer(视察者) Subject一样也是一个由next(v)
,error(e)
,和 complete()
这些要领构成的对象。挪用next(theValue)
要领后,Subject会向一切已在其上注册的Observer多路推送theValue
。
下面的例子中,我们在Subject上注册了两个Observer,而且多路推送了一些数值:
var subject = new Rx.Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(1);
subject.next(2);
掌握台输出效果以下:
observerA: 1
observerB: 1
observerA: 2
observerB: 2
既然Subject是一个Observer,你能够把它作为subscribe
(定阅)一般Observable时的参数,以下面例子所示:
var subject = new Rx.Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
var observable = Rx.Observable.from([1, 2, 3]);
observable.subscribe(subject); // 你能够通报Subject来定阅observable
实行后效果以下:
observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
经由历程上面的完成:我们发明能够经由历程Subject将一般的Observable单路推送转换为多路推送。这说明了Subject的作用——作为单路Observable转变为多路Observable的桥梁。
另有几种特别的Subject
范例,分别是BehaviorSubject
,ReplaySubject
,和 AsyncSubject
。
多路推送的Observable
在今后的语境中,每当提到“多路推送的Observable”,我们特指经由历程Subject构建的Observable实行环境。不然“一般的Observable”只是一个不会同享实行环境而且被定阅后才见效的一系列值。
经由历程运用Subject能够建立具有雷同实行环境的多路的Observable。
下面展现了多路
的运作体式格局:Subject从一般的Observable定阅了数据,然后其他Observer又定阅了这个Subject,示例以下:
var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
// 经由历程`subject.subscribe({...})`定阅Subject的Observer:
multicasted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
// 让Subject从数据源定阅最先见效:
multicasted.connect();
multicast
要领返回一个类似于Observable的可视察对象,然则在其被定阅后,它会表现Subject的特征。 multicast
返回的对象同时是ConnectableObservable
范例的,具有connect()
要领。
connect()
要领异常的主要,它决议Observable什么时候最先实行。因为挪用connect()
后,Observable最先实行,因而,connect()
会返回一个Subscription
供挪用者来住手实行。
援用计数
经由历程手动挪用connect()
返回的Subscription掌握实行非常冗杂。一般,我们愿望在有第一个Observer定阅Subject后自动connnect
,当一切Observer都作废定阅后住手这个Subject。
我们来剖析一下下面例子中subscription的历程:
第一个Observer 定阅了多路推送的 Observable
多路Observable被衔接
向第一个Observer发送 值为0
的next
关照
第二个Observer定阅了多路推送的 Observable
向第一个Observer发送 值为1
的next
关照
向第二个Observer发送 值为1
的next
关照
第一个Observer作废了对多路推送的Observable的定阅
向第二个Observer发送 值为2
的next
关照
第二个Observer作废了对多路推送的Observable的定阅
作废对多路推送的Observable的衔接
经由历程显式地挪用connect()
,代码以下:
var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
var subscription1, subscription2, subscriptionConnect;
subscription1 = multicasted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subscriptiOnConnect= multicasted.connect();
setTimeout(() => {
subscription2 = multicasted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 600);
setTimeout(() => {
subscription1.unsubscribe();
}, 1200);
setTimeout(() => {
subscription2.unsubscribe();
subscriptionConnect.unsubscribe();
}, 2000);
假如你不想显式地挪用connect()
要领,能够在ConnectableObservable范例的Observable上挪用refCount()
要领。要领会举行援用计数:纪录Observable被定阅的行动。当定阅数从 0
到 1
时refCount()
会挪用connect()
要领。到定阅数从1
到 0
,他会住手全部实行历程。
refCount
使得多路推送的Observable在被定阅后自动实行,在一切视察者作废定阅后,住手实行。
下面是示例:
var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
setTimeout(() => {
console.log('observerB subscribed');
subscription2 = refCounted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 600);
setTimeout(() => {
console.log('observerA unsubscribed');
subscription1.unsubscribe();
}, 1200);
setTimeout(() => {
console.log('observerB unsubscribed');
subscription2.unsubscribe();
}, 2000);
实行输出效果以下:
observerA subscribed
observerA: 0
observerB subscribed
observerA: 1
observerB: 1
observerA unsubscribed
observerB: 2
observerB unsubscribed
只要ConnectableObservables具有refCount()
要领,挪用后会返回一个Observable
而不是新的ConnectableObservable。
BehaviorSubject
BehaviorSubject
是Subject的一个衍生类,具有“最新的值”的观点。它老是保留近来向数据消费者发送的值,当一个Observer定阅后,它会马上从BehaviorSubject
收到“最新的值”。
BehaviorSubjects异常适于示意“随时候推移的值”。举一个抽象的例子,Subject示意一个人的华诞,而Behavior则示意一个人的年龄。(华诞只是一天,一个人的年龄会坚持到下一次华诞之前。)
下面例子中,展现了如何用 0
初始化BehaviorSubject,当Observer定阅它时,0
是第一个被推送的值。紧接着,在第二个Observer定阅BehaviorSubject之前,它推送了2
,虽然定阅在推送2
以后,然则第二个Observer依然能接遭到2
:
var subject = new Rx.BehaviorSubject(0 /* 初始值 */);
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(3);
输出效果以下:
observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
ReplaySubject
ReplaySubject
如同于BehaviorSubject
是 Subject
的子类。经由历程 ReplaySubject
能够向新的定阅者推送旧数值,就像一个录像机ReplaySubject
能够纪录Observable的一部分状况(过去时候内推送的值)。
.一个ReplaySubject
能够纪录Observable实行历程当中推送的多个值,并向新的定阅者回放它们。
你能够指定回放值的数目:
var subject = new Rx.ReplaySubject(3 /* 回放数目 */);
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
输出以下:
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5
除了回放数目,你也能够以毫秒为单元去指定“窗口时候”,决议ReplaySubject纪录多久之前Observable推送的数值。下面的例子中,我们把回放数目设置为100
,把窗口时候设置为500
毫秒:
var subject = new Rx.ReplaySubject(100, 500 /* windowTime */);
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
var i = 1;
setInterval(() => subject.next(i++), 200);
setTimeout(() => {
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 1000);
第二个Observer接遭到3
(600ms), 4
(800ms) 和 5
(1000ms),这些值均在定阅之前的500
毫秒内推送(窗口长度 1000ms &#8211; 600ms = 400ms <500ms):
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
...
AsyncSubject
AsyncSubject是Subject的别的一个衍生类,Observable仅会在实行完成后,推送实行环境中的末了一个值。
var subject = new Rx.AsyncSubject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
subject.complete();
输出效果以下:
observerA: 5
observerB: 5
AsyncSubject 与 last()
操作符类似,守候完成关照后推送实行历程的末了一个值。