我有以下场景,我认为这可能很常见:
有一个任务(一个UI命令处理程序)可以同步或异步完成.
命令的到达速度可能比处理它们的速度快.
如果命令已有待处理任务,则应对新命令处理程序任务进行排队并按顺序处理.
每个新任务的结果可能取决于前一个任务的结果.
应该遵守取消,但为了简单起见,我想将其排除在本问题的范围之外.此外,线程安全(并发)不是必需的,但必须支持重入.
这是我想要实现的基本示例(作为控制台应用程序,为简单起见):
using System; using System.Threading.Tasks; namespace ConsoleApp { class Program { static void Main(string[] args) { var asyncOp = new AsyncOp(); Func > handleAsync = async (arg) => { Console.WriteLine("this task arg: " + arg); //await Task.Delay(arg); // make it async return await Task.FromResult(arg); // sync }; Console.WriteLine("Test #1..."); asyncOp.RunAsync(() => handleAsync(1000)); asyncOp.RunAsync(() => handleAsync(900)); asyncOp.RunAsync(() => handleAsync(800)); asyncOp.CurrentTask.Wait(); Console.WriteLine("\nPress any key to continue to test #2..."); Console.ReadLine(); asyncOp.RunAsync(() => { asyncOp.RunAsync(() => handleAsync(200)); return handleAsync(100); }); asyncOp.CurrentTask.Wait(); Console.WriteLine("\nPress any key to exit..."); Console.ReadLine(); } // AsyncOp class AsyncOp { Task _pending = Task.FromResult(default(T)); public Task CurrentTask { get { return _pending; } } public Task RunAsync(Func > handler) { var pending = _pending; Func > wrapper = async () => { // await the prev task var prevResult = await pending; Console.WriteLine("\nprev task result: " + prevResult); // start and await the handler return await handler(); }; _pending = wrapper(); return _pending; } } } }
输出:
Test #1... prev task result: 0 this task arg: 1000 prev task result: 1000 this task arg: 900 prev task result: 900 this task arg: 800 Press any key to continue to test #2... prev task result: 800 prev task result: 800 this task arg: 200 this task arg: 100 Press any key to exit...
它按照要求工作,直到在测试#2中引入重新引入:
asyncOp.RunAsync(() => { asyncOp.RunAsync(() => handleAsync(200)); return handleAsync(100); });
所需的输出应该是100
,200
而不是200
,100
因为有已经是一个悬而未决外任务100
.这显然是因为内部任务同步执行,打破var pending = _pending; /* ... */ _pending = wrapper()
了外部任务的逻辑.
如何使其适用于测试#2?
一种解决方案是为每个任务强制执行异步Task.Factory.StartNew(..., TaskScheduler.FromCurrentSynchronizationContext()
.但是,我不想对可能在内部同步的命令处理程序强加异步执行.此外,我不想依赖于任何特定同步上下文的行为(即依赖于它Task.Factory.StartNew
应该在创建的任务实际启动之前返回).
在现实生活中,我负责AsyncOp
上面的内容,但无法控制命令处理程序(即内部的任何内容handleAsync
).
我几乎忘记了可以Task
手动构建,而无需启动或安排它.然后,"Task.Factory.StartNew"vs"new Task(...).Start"让我回到正轨.我认为这是Task<TResult>
构造函数实际上可能有用的少数情况之一,以及嵌套的任务(Task<Task<T>>
)和Task.Unwrap()
:
// AsyncOp class AsyncOp<T> { Task<T> _pending = Task.FromResult(default(T)); public Task<T> CurrentTask { get { return _pending; } } public Task<T> RunAsync(Func<Task<T>> handler, bool useSynchronizationContext = false) { var pending = _pending; Func<Task<T>> wrapper = async () => { // await the prev task var prevResult = await pending; Console.WriteLine("\nprev task result: " + prevResult); // start and await the handler return await handler(); }; var task = new Task<Task<T>>(wrapper); var inner = task.Unwrap(); _pending = inner; task.RunSynchronously(useSynchronizationContext ? TaskScheduler.FromCurrentSynchronizationContext() : TaskScheduler.Current); return inner; } }
输出:
Test #1... prev task result: 0 this task arg: 1000 prev task result: 1000 this task arg: 900 prev task result: 900 this task arg: 800 Press any key to continue to test #2... prev task result: 800 this task arg: 100 prev task result: 100 this task arg: 200
如果需要,现在AsyncOp
通过添加lock
保护来实现线程安全也很容易_pending
.
更新,下面是此模式的最新版本,它使用Task
并且是线程安全的:
// AsyncOp class AsyncOp<T> { Task<T> _pending = Task.FromResult(default(T)); public Task<T> CurrentTask { get { return _pending; } } public Task<T> RunAsync(Func<Task<T>> handler, bool useSynchronizationContext = false) { var pending = _pending; Func<Task<T>> wrapper = async () => { // await the prev task var prevResult = await pending; Console.WriteLine("\nprev task result: " + prevResult); // start and await the handler return await handler(); }; var task = new Task<Task<T>>(wrapper); var inner = task.Unwrap(); _pending = inner; task.RunSynchronously(useSynchronizationContext ? TaskScheduler.FromCurrentSynchronizationContext() : TaskScheduler.Current); return inner; } }