限制.NET 4.5中的并发任务量

 手机用户2502937013 发布于 2023-02-13 10:38

观察者以下功能:

public Task RunInOrderAsync(IEnumerable taskSeedGenerator,
    CreateTaskDelegate createTask,
    OnTaskErrorDelegate onError = null,
    OnTaskSuccessDelegate onSuccess = null) where TTaskSeed : class
{
    Action onFailed = (exc, taskSeed) =>
    {
        if (onError != null)
        {
            onError(exc, taskSeed);
        }
    };

    Action onDone = t =>
    {
        var taskSeed = (TTaskSeed)t.AsyncState;
        if (t.Exception != null)
        {
            onFailed(t.Exception, taskSeed);
        }
        else if (onSuccess != null)
        {
            onSuccess(t, taskSeed);
        }
    };

    var enumerator = taskSeedGenerator.GetEnumerator();
    Task task = null;
    while (enumerator.MoveNext())
    {
        if (task == null)
        {
            try
            {
                task = createTask(enumerator.Current);
                Debug.Assert(ReferenceEquals(task.AsyncState, enumerator.Current));
            }
            catch (Exception exc)
            {
                onFailed(exc, enumerator.Current);
            }
        }
        else
        {
            task = task.ContinueWith((t, taskSeed) =>
            {
                onDone(t);
                var res = createTask((TTaskSeed)taskSeed);
                Debug.Assert(ReferenceEquals(res.AsyncState, taskSeed));
                return res;
            }, enumerator.Current).TaskUnwrap();
        }
    }

    if (task != null)
    {
        task = task.ContinueWith(onDone);
    }

    return task;
}

TaskUnwrap国家保留标准的版本在哪里Task.Unwrap:

public static class Extensions
{
    public static Task TaskUnwrap(this Task task, object state = null)
    {
        return task.Unwrap().ContinueWith((t, _) =>
        {
            if (t.Exception != null)
            {
                throw t.Exception;
            }
        }, state ?? task.AsyncState);
    }
}

RunInOrderAsync方法允许异步运行N个任务,但是顺序地 - 一个接一个地运行.实际上,它运行从给定种子创建的任务,并发限制为1.

让我们假设createTask委托从种子创建的任务不对应于多个并发任务.

现在,我想抛出maxConcurrencyLevel参数,所以函数签名看起来像这样:

Task RunInOrderAsync(int maxConcurrencyLevel,
  IEnumerable taskSeedGenerator,
  CreateTaskDelegate createTask,
  OnTaskErrorDelegate onError = null,
  OnTaskSuccessDelegate onSuccess = null) where TTaskSeed : class

在这里,我有点卡住了.

SO有这样的问题:

System.Threading.Tasks - 限制并发任务的数量

基于任务的处理,使用.NET 4.5和c#限制并发任务数

.Net TPL:具有任务优先级的有限并发级别任务调度程序?

这基本上提出了两种方法来解决问题:

    使用Parallel.ForEachParallelOptions指定MaxDegreeOfParallelism属性值等于所需的最大并发级别.

    使用TaskScheduler具有所需MaximumConcurrencyLevel值的自定义.

第二种方法不会削减它,因为所涉及的所有任务必须使用相同的任务调度程序实例.为此,用于返回a的所有方法Task必须具有接受自定义TaskScheduler实例的重载.不幸的是,微软在这方面并不十分一致.例如,SqlConnection.OpenAsync不接受这样的论证(但TaskFactory.FromAsync确实如此).

第一种方法暗示我必须将任务转换为动作,如下所示:

() => t.Wait()

我不确定这是一个好主意,但我很乐意获得更多的意见.

另一种方法是利用TaskFactory.ContinueWhenAny,但这是混乱的.

有任何想法吗?

编辑1

我想澄清想要限制的原因.我们的任务最终针对同一SQL服务器执行SQL语句.我们想要的是一种限制并发传出SQL语句数量的方法.完全有可能会有其他SQL语句从其他代码段同时执行,但这一个是批处理器,可能会泛滥服务器.

现在,请注意,虽然我们谈论的是同一个SQL服务器,但同一台服务器上有许多数据库.因此,它不是限制开放SQL连接到同一数据库的数量,因为数据库可能根本不相同.

这就是为什么厄运日的解决方案ThreadPool.SetMaxThreads()无关紧要.

现在,关于SqlConnection.OpenAsync.由于某种原因它被异步 - 它可能会向服务器进行往返,因此可能会受到网络延迟和分布式环境的其他可爱副作用的影响.因此,它与接受TaskScheduler参数的其他异步方法没有什么不同.我倾向于认为不接受一个只是一个错误.

编辑2

我想保留原始函数的异步精神.因此,我希望避免任何明确的阻止解决方案.

编辑3

感谢@ fsimonazzi的回答,我现在已经实现了所需功能.这是代码:

        var sem = new SemaphoreSlim(maxConcurrencyLevel);
        var tasks = new List();

        var enumerator = taskSeedGenerator.GetEnumerator();
        while (enumerator.MoveNext())
        {
            tasks.Add(sem.WaitAsync().ContinueWith((_, taskSeed) =>
            {
                Task task = null;
                try
                {
                    task = createTask((TTaskSeed)taskSeed);
                    if (task != null)
                    {
                        Debug.Assert(ReferenceEquals(task.AsyncState, taskSeed));
                        task = task.ContinueWith(t =>
                        {
                            sem.Release();
                            onDone(t);
                        });
                    }
                }
                catch (Exception exc)
                {
                    sem.Release();
                    onFailed(exc, (TTaskSeed)taskSeed);
                }
                return task;
            }, enumerator.Current).TaskUnwrap());
        }

        return Task.Factory.ContinueWhenAll(tasks.ToArray(), _ => sem.Dispose());

fsimonazzi.. 13

您可以使用信号量来限制处理.使用WaitAsync()方法可以获得预期的异步.像这样的东西(为简洁起见,删除了错误处理):

private static async Task DoStuff(int maxConcurrency, IEnumerable items, Func createTask)
{
    using (var sem = new SemaphoreSlim(maxConcurrency))
    {
        var tasks = new List();

        foreach (var item in items)
        {
            await sem.WaitAsync();
            var task = createTask(item).ContinueWith(t => sem.Release());
            tasks.Add(task);
        }

        await Task.WhenAll(tasks);
    }
}

编辑以删除在所有释放操作都有机会执行之前可以处置信号量的错误.

1 个回答
  • 您可以使用信号量来限制处理.使用WaitAsync()方法可以获得预期的异步.像这样的东西(为简洁起见,删除了错误处理):

    private static async Task DoStuff<T>(int maxConcurrency, IEnumerable<T> items, Func<T, Task> createTask)
    {
        using (var sem = new SemaphoreSlim(maxConcurrency))
        {
            var tasks = new List<Task>();
    
            foreach (var item in items)
            {
                await sem.WaitAsync();
                var task = createTask(item).ContinueWith(t => sem.Release());
                tasks.Add(task);
            }
    
            await Task.WhenAll(tasks);
        }
    }
    

    编辑以删除在所有释放操作都有机会执行之前可以处置信号量的错误.

    2023-02-13 10:41 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有