我有一个热的观察,我通常使用正常的Subject
下面实现,所以有兴趣的人可以订阅实时的通知流.
现在我想保留这个实时流,但是还要公开所有事件的历史流,这些事件已经并且具有绝对时间附加到这些通知以了解它们究竟发生的时间如果允许订阅者将历史流推进到任何在重播年表之前的时间点.
我相信大部分可以使用HistoricalScheduler及其AdvanceTo方法实现,但我不确定如何?
是否使用Timestamped来节省所需事件的时间?
并且是ReplaySubject需要缓存实时流成可再利用的HistoricalScheduler回放历史记录?
如何为同一来源实施这两个流,或者换句话说,如何将下面的内容用于当前的要求?
[见"重播过去"标题]
什么HistoricalScheduler
给你是控制的调度程序的虚拟时间向前运动的能力.
你没有得到的是随着时间的推移随机访问.随着虚拟时间的提前,执行预定的操作,因此必须提前安排它们.过去安排的任何行动 - 即在HistoricalScheduler.Now
价值后面的绝对时间- 立即执行.
要重放事件,您需要以某种方式记录它们,然后使用a的实例安排它们HistoricalScheduler
- 然后提前时间.
当您提前时间时,计划的操作将在其到期时间执行 - 并且当可观察量发送OnXXX()
给其订户时,Now
调度程序的属性将具有当前虚拟时间.
每个用户都需要访问它自己的调度程序,以便独立于其他用户控制时间.这实际上意味着为每个订户创建一个可观察的.
这是一个我敲了一个快速的例子(如果您引用了nuget包rx-main,它将在LINQPad中运行).
首先,我录制一个实时流(完全非生产方式!)将事件记录到列表中.正如你的建议,使用TimeStamp()
很好地捕捉时间:
/* record a live stream */ var source = Observable.Interval(TimeSpan.FromSeconds(1)); var log = source.Take(5).Timestamp().ToList().Wait(); Console.WriteLine("Time now is " + DateTime.Now);
现在我们可以使用HistoricalScheduler结合使用Generate来安排事件.请注意,这种方法可以防止大量预定事件提前排队 - 而不是我们一次只安排一个:
var scheduler = new HistoricalScheduler(); /* set up the scheduling of the recording events */ var replay = Observable.Generate( log.GetEnumerator(), events => events.MoveNext(), events => events, events => events.Current.Value, events => events.Current.Timestamp, scheduler);
现在,当我们同意,你可以看到HistoricalScheduler
的Now
财产有事件的虚拟时间:
replay.Subscribe( i => Console.WriteLine("Event: {0} happened at {1}", i, scheduler.Now));
最后我们可以开始计划(使用Start()只是尝试播放所有事件,而不是使用AdvanceTo
移动到特定时间 - 这就像做AdvanceTo(DateTime.MaxValue);
scheduler.Start();
我的输出是:
Time now is 07/01/2014 15:17:27 Event: 0 happened at 07/01/2014 15:17:23 +00:00 Event: 1 happened at 07/01/2014 15:17:24 +00:00 Event: 2 happened at 07/01/2014 15:17:25 +00:00 Event: 3 happened at 07/01/2014 15:17:26 +00:00 Event: 4 happened at 07/01/2014 15:17:27 +00:00
结果是你最终可能不得不通过这个工具创建自己的API来获得适合你特定目的的东西.它给你留下了相当多的工作 - 但它仍然是非常强大的东西.
有趣的是,实时可观察和重放的可观察对象看起来并没有什么不同 - 只要你记得总是参数化你的调度程序(!) - 这样就可以轻松地在它们上面运行相同的查询,并且时间查询都可以使用调度程序的虚拟时间.
我已经用它来测试旧数据的新查询,以便在商业场景中发挥很大作用.
它没有尝试的是传输控制,例如在GUI中来回滚动时间.通常,您以大块运行历史记录,存储新查询的输出,然后使用此数据随后在GUI中显示,以便用户可以通过您提供的其他一些机制来回移动.
最后,您不需要ReplaySubject
缓存直播流; 但是你确实需要一些记录事件来重放的方法 - 这可能只是一个写入日志的观察者.