任务排序和重新进入

 季孙意如 发布于 2023-01-30 17:14

我有以下场景,我认为这可能很常见:

    有一个任务(一个UI命令处理程序)可以同步或异步完成.

    命令的到达速度可能比处理它们的速度快.

    如果命令已有待处理任务,则应对新命令处理程序任务进行排队并按顺序处理.

    每个新任务的结果可能取决于前一个任务的结果.

应该遵守取消,但为了简单起见,我想将其排除在本问题的范围之外.此外,线程安全(并发)不是必需的,但必须支持重入.

这是我想要实现的基本示例(作为控制台应用程序,为简单起见):

using System;
using System.Threading.Tasks;

namespace ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var asyncOp = new AsyncOp();

            Func> handleAsync = async (arg) =>
            {
                Console.WriteLine("this task arg: " + arg);

                //await Task.Delay(arg); // make it async

                return await Task.FromResult(arg); // sync
            };

            Console.WriteLine("Test #1...");
            asyncOp.RunAsync(() => handleAsync(1000));
            asyncOp.RunAsync(() => handleAsync(900));
            asyncOp.RunAsync(() => handleAsync(800));
            asyncOp.CurrentTask.Wait();

            Console.WriteLine("\nPress any key to continue to test #2...");
            Console.ReadLine();

            asyncOp.RunAsync(() =>
            {
                asyncOp.RunAsync(() => handleAsync(200));
                return handleAsync(100);
            });

            asyncOp.CurrentTask.Wait();
            Console.WriteLine("\nPress any key to exit...");
            Console.ReadLine();
        }

        // AsyncOp
        class AsyncOp
        {
            Task _pending = Task.FromResult(default(T));

            public Task CurrentTask { get { return _pending; } }

            public Task RunAsync(Func> handler)
            {
                var pending = _pending;
                Func> wrapper = async () =>
                {
                    // await the prev task
                    var prevResult = await pending;
                    Console.WriteLine("\nprev task result:  " + prevResult);
                    // start and await the handler
                    return await handler();
                };

                _pending = wrapper();
                return _pending;
            }
        }

    }
}

输出:

Test #1...

prev task result:  0
this task arg: 1000

prev task result:  1000
this task arg: 900

prev task result:  900
this task arg: 800

Press any key to continue to test #2...


prev task result:  800

prev task result:  800
this task arg: 200
this task arg: 100

Press any key to exit...

它按照要求工作,直到在测试#2中引入重新引入:

asyncOp.RunAsync(() =>
{
    asyncOp.RunAsync(() => handleAsync(200));
    return handleAsync(100);
});

所需的输出应该是100,200而不是200,100因为有已经是一个悬而未决外任务100.这显然是因为内部任务同步执行,打破var pending = _pending; /* ... */ _pending = wrapper()了外部任务的逻辑.

如何使其适用于测试#2?

一种解决方案是为每个任务强制执行异步Task.Factory.StartNew(..., TaskScheduler.FromCurrentSynchronizationContext().但是,我不想对可能在内部同步的命令处理程序强加异步执行.此外,我不想依赖于任何特定同步上下文的行为(即依赖于它Task.Factory.StartNew应该在创建的任务实际启动之前返回).

在现实生活中,我负责AsyncOp上面的内容,但无法控制命令处理程序(即内部的任何内容handleAsync).

1 个回答
  • 我几乎忘记了可以Task手动构建,而无需启动或安排它.然后,"Task.Factory.StartNew"vs"new Task(...).Start"让我回到正轨.我认为这是Task<TResult>构造函数实际上可能有用的少数情况之一,以及嵌套的任务(Task<Task<T>>)和Task.Unwrap():

    // AsyncOp
    class AsyncOp<T>
    {
        Task<T> _pending = Task.FromResult(default(T));
    
        public Task<T> CurrentTask { get { return _pending; } }
    
        public Task<T> RunAsync(Func<Task<T>> handler, bool useSynchronizationContext = false)
        {
            var pending = _pending;
            Func<Task<T>> wrapper = async () =>
            {
                // await the prev task
                var prevResult = await pending;
                Console.WriteLine("\nprev task result:  " + prevResult);
                // start and await the handler
                return await handler();
            };
    
            var task = new Task<Task<T>>(wrapper);
            var inner = task.Unwrap();
            _pending = inner;
    
            task.RunSynchronously(useSynchronizationContext ?
                TaskScheduler.FromCurrentSynchronizationContext() :
                TaskScheduler.Current);
    
            return inner;
        }
    }
    

    输出:

    Test #1...
    
    prev task result:  0
    this task arg: 1000
    
    prev task result:  1000
    this task arg: 900
    
    prev task result:  900
    this task arg: 800
    
    Press any key to continue to test #2...
    
    
    prev task result:  800
    this task arg: 100
    
    prev task result:  100
    this task arg: 200
    

    如果需要,现在AsyncOp通过添加lock保护来实现线程安全也很容易_pending.


    更新,下面是此模式的最新版本,它使用Task并且是线程安全的:

    // AsyncOp
    class AsyncOp<T>
    {
        Task<T> _pending = Task.FromResult(default(T));
    
        public Task<T> CurrentTask { get { return _pending; } }
    
        public Task<T> RunAsync(Func<Task<T>> handler, bool useSynchronizationContext = false)
        {
            var pending = _pending;
            Func<Task<T>> wrapper = async () =>
            {
                // await the prev task
                var prevResult = await pending;
                Console.WriteLine("\nprev task result:  " + prevResult);
                // start and await the handler
                return await handler();
            };
    
            var task = new Task<Task<T>>(wrapper);
            var inner = task.Unwrap();
            _pending = inner;
    
            task.RunSynchronously(useSynchronizationContext ?
                TaskScheduler.FromCurrentSynchronizationContext() :
                TaskScheduler.Current);
    
            return inner;
        }
    }
    

    2023-01-30 17:19 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有