观察者以下功能:
public Task RunInOrderAsync(IEnumerable taskSeedGenerator, CreateTaskDelegate createTask, OnTaskErrorDelegate onError = null, OnTaskSuccessDelegate onSuccess = null) where TTaskSeed : class { Action onFailed = (exc, taskSeed) => { if (onError != null) { onError(exc, taskSeed); } }; Action onDone = t => { var taskSeed = (TTaskSeed)t.AsyncState; if (t.Exception != null) { onFailed(t.Exception, taskSeed); } else if (onSuccess != null) { onSuccess(t, taskSeed); } }; var enumerator = taskSeedGenerator.GetEnumerator(); Task task = null; while (enumerator.MoveNext()) { if (task == null) { try { task = createTask(enumerator.Current); Debug.Assert(ReferenceEquals(task.AsyncState, enumerator.Current)); } catch (Exception exc) { onFailed(exc, enumerator.Current); } } else { task = task.ContinueWith((t, taskSeed) => { onDone(t); var res = createTask((TTaskSeed)taskSeed); Debug.Assert(ReferenceEquals(res.AsyncState, taskSeed)); return res; }, enumerator.Current).TaskUnwrap(); } } if (task != null) { task = task.ContinueWith(onDone); } return task; }
TaskUnwrap
国家保留标准的版本在哪里Task.Unwrap
:
public static class Extensions { public static Task TaskUnwrap(this Tasktask, object state = null) { return task.Unwrap().ContinueWith((t, _) => { if (t.Exception != null) { throw t.Exception; } }, state ?? task.AsyncState); } }
该RunInOrderAsync
方法允许异步运行N个任务,但是顺序地 - 一个接一个地运行.实际上,它运行从给定种子创建的任务,并发限制为1.
让我们假设createTask
委托从种子创建的任务不对应于多个并发任务.
现在,我想抛出maxConcurrencyLevel参数,所以函数签名看起来像这样:
Task RunInOrderAsync(int maxConcurrencyLevel, IEnumerable taskSeedGenerator, CreateTaskDelegate createTask, OnTaskErrorDelegate onError = null, OnTaskSuccessDelegate onSuccess = null) where TTaskSeed : class
在这里,我有点卡住了.
SO有这样的问题:
System.Threading.Tasks - 限制并发任务的数量
基于任务的处理,使用.NET 4.5和c#限制并发任务数
.Net TPL:具有任务优先级的有限并发级别任务调度程序?
这基本上提出了两种方法来解决问题:
使用Parallel.ForEach
与ParallelOptions
指定MaxDegreeOfParallelism
属性值等于所需的最大并发级别.
使用TaskScheduler
具有所需MaximumConcurrencyLevel
值的自定义.
第二种方法不会削减它,因为所涉及的所有任务必须使用相同的任务调度程序实例.为此,用于返回a的所有方法Task
必须具有接受自定义TaskScheduler
实例的重载.不幸的是,微软在这方面并不十分一致.例如,SqlConnection.OpenAsync
不接受这样的论证(但TaskFactory.FromAsync
确实如此).
第一种方法暗示我必须将任务转换为动作,如下所示:
() => t.Wait()
我不确定这是一个好主意,但我很乐意获得更多的意见.
另一种方法是利用TaskFactory.ContinueWhenAny
,但这是混乱的.
有任何想法吗?
编辑1
我想澄清想要限制的原因.我们的任务最终针对同一SQL服务器执行SQL语句.我们想要的是一种限制并发传出SQL语句数量的方法.完全有可能会有其他SQL语句从其他代码段同时执行,但这一个是批处理器,可能会泛滥服务器.
现在,请注意,虽然我们谈论的是同一个SQL服务器,但同一台服务器上有许多数据库.因此,它不是限制开放SQL连接到同一数据库的数量,因为数据库可能根本不相同.
这就是为什么厄运日的解决方案ThreadPool.SetMaxThreads()
无关紧要.
现在,关于SqlConnection.OpenAsync
.由于某种原因它被异步 - 它可能会向服务器进行往返,因此可能会受到网络延迟和分布式环境的其他可爱副作用的影响.因此,它与接受TaskScheduler
参数的其他异步方法没有什么不同.我倾向于认为不接受一个只是一个错误.
编辑2
我想保留原始函数的异步精神.因此,我希望避免任何明确的阻止解决方案.
编辑3
感谢@ fsimonazzi的回答,我现在已经实现了所需功能.这是代码:
var sem = new SemaphoreSlim(maxConcurrencyLevel); var tasks = new List(); var enumerator = taskSeedGenerator.GetEnumerator(); while (enumerator.MoveNext()) { tasks.Add(sem.WaitAsync().ContinueWith((_, taskSeed) => { Task task = null; try { task = createTask((TTaskSeed)taskSeed); if (task != null) { Debug.Assert(ReferenceEquals(task.AsyncState, taskSeed)); task = task.ContinueWith(t => { sem.Release(); onDone(t); }); } } catch (Exception exc) { sem.Release(); onFailed(exc, (TTaskSeed)taskSeed); } return task; }, enumerator.Current).TaskUnwrap()); } return Task.Factory.ContinueWhenAll(tasks.ToArray(), _ => sem.Dispose());
fsimonazzi.. 13
您可以使用信号量来限制处理.使用WaitAsync()方法可以获得预期的异步.像这样的东西(为简洁起见,删除了错误处理):
private static async Task DoStuff(int maxConcurrency, IEnumerable items, Func createTask) { using (var sem = new SemaphoreSlim(maxConcurrency)) { var tasks = new List (); foreach (var item in items) { await sem.WaitAsync(); var task = createTask(item).ContinueWith(t => sem.Release()); tasks.Add(task); } await Task.WhenAll(tasks); } }
编辑以删除在所有释放操作都有机会执行之前可以处置信号量的错误.
您可以使用信号量来限制处理.使用WaitAsync()方法可以获得预期的异步.像这样的东西(为简洁起见,删除了错误处理):
private static async Task DoStuff<T>(int maxConcurrency, IEnumerable<T> items, Func<T, Task> createTask) { using (var sem = new SemaphoreSlim(maxConcurrency)) { var tasks = new List<Task>(); foreach (var item in items) { await sem.WaitAsync(); var task = createTask(item).ContinueWith(t => sem.Release()); tasks.Add(task); } await Task.WhenAll(tasks); } }
编辑以删除在所有释放操作都有机会执行之前可以处置信号量的错误.