一次执行一个异步可观察

 雅婷婉君323 发布于 2023-02-12 16:51

我正在尝试将一些TPL异步集成到更大的Rx链中Observable.FromAsync,就像在这个小例子中一样:

using System;
using System.Reactive.Linq;
using System.Threading.Tasks;

namespace rxtest
{
    class Program
    {
        static void Main(string[] args)
        {
            MainAsync().Wait();
        }

        static async Task MainAsync()
        {
            await Observable.Generate(new Random(), x => true,
                                      x => x, x => x.Next(250, 500))
                .SelectMany((x, idx) => Observable.FromAsync(async ct =>
                {
                    Console.WriteLine("start:  " + idx.ToString());
                    await Task.Delay(x, ct);
                    Console.WriteLine("finish: " + idx.ToString());
                    return idx;
                }))
                .Take(10)
                .LastOrDefaultAsync();
        }
    }
}

但是,我注意到这似乎同时启动所有异步任务,而不是一次启动它们,这会导致应用程序的内存使用量增加.在SelectMany似乎作用并不比不同Merge.

在这里,我看到这样的输出:

start:  0
start:  1
start:  2
...

我想看:

start:  0
finish: 0
start:  1
finish: 1
start:  2
finish: 2
...

我怎样才能做到这一点?

1 个回答
  • SelectMany将a 更改为Selecta Concat:

        static async Task MainAsync()
        {
            await Observable.Generate(new Random(), x => true,
                                      x => x, x => x.Next(250, 500))
                .Take(10)
                .Select((x, idx) => Observable.FromAsync(async ct =>
                {
                    Console.WriteLine("start:  " + idx.ToString());
                    await Task.Delay(x, ct);
                    Console.WriteLine("finish: " + idx.ToString());
                    return idx;
                }))
                .Concat()
                .LastOrDefaultAsync();
        }
    

    编辑 - 我将Take(10)向上移动了链,因为Generate不会阻止 - 所以它会阻止它逃跑.

    Select每个事件项目分成一个流,表示将在Subscription上启动的异步任务.Concat当前一个子流完成时,接受一个流流并订阅每个连续的子流,将所有流连接成一个单独的流.

    2023-02-12 16:53 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有