我正在尝试将一些TPL异步集成到更大的Rx链中Observable.FromAsync
,就像在这个小例子中一样:
using System; using System.Reactive.Linq; using System.Threading.Tasks; namespace rxtest { class Program { static void Main(string[] args) { MainAsync().Wait(); } static async Task MainAsync() { await Observable.Generate(new Random(), x => true, x => x, x => x.Next(250, 500)) .SelectMany((x, idx) => Observable.FromAsync(async ct => { Console.WriteLine("start: " + idx.ToString()); await Task.Delay(x, ct); Console.WriteLine("finish: " + idx.ToString()); return idx; })) .Take(10) .LastOrDefaultAsync(); } } }
但是,我注意到这似乎同时启动所有异步任务,而不是一次启动它们,这会导致应用程序的内存使用量增加.在SelectMany
似乎作用并不比不同Merge
.
在这里,我看到这样的输出:
start: 0 start: 1 start: 2 ...
我想看:
start: 0 finish: 0 start: 1 finish: 1 start: 2 finish: 2 ...
我怎样才能做到这一点?
SelectMany
将a 更改为Select
a Concat
:
static async Task MainAsync() { await Observable.Generate(new Random(), x => true, x => x, x => x.Next(250, 500)) .Take(10) .Select((x, idx) => Observable.FromAsync(async ct => { Console.WriteLine("start: " + idx.ToString()); await Task.Delay(x, ct); Console.WriteLine("finish: " + idx.ToString()); return idx; })) .Concat() .LastOrDefaultAsync(); }
编辑 - 我将Take(10)向上移动了链,因为Generate不会阻止 - 所以它会阻止它逃跑.
将Select
每个事件项目分成一个流,表示将在Subscription上启动的异步任务.Concat
当前一个子流完成时,接受一个流流并订阅每个连续的子流,将所有流连接成一个单独的流.