热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

并行编程数据并行System.Threading.Tasks.Parallel类(.NET)

原文转载于:https:www.cnblogs.comspringsnowp9405016.html目录一、并行概念1、并行编程2、数据并行二、Parallel.Invoke&

原文转:https://www.cnblogs.com/springsnow/p/9405016.html

目录

  • 一、并行概念
    • 1、并行编程
    • 2、数据并行
  • 二、Parallel.Invoke():并行调用多个任务 。
  • 三、Parallel.For(): for 循环的并行运算 
  • 四、Parallel.ForEach():foreach 循环的并行运算 
  • 五、线程局部变量
    • 1、Parallel.For中定义局部变量:
    • 2、Parallel.Each中定义局部变量:
  • 六、Break、Stop中断与停止线程
  • 七、Cancel取消循环
  • 八、Handel Exceptions异常处理

 


回到顶部

一、并行概念


1、并行编程

      在.NET 4中的并行编程是依赖Task Parallel Library(后面简称为TPL) 实现的。在TPL中,最基本的执行单元是task(中文可以理解为"任务"),一个task就代表了你要执行的一个操作。你可以为你所要执行的每一个操作定义一个task,TPL就负责创建线程来执行你所定义的task,并且管理线程。TPL是面向task的,自动的;而传统的多线程是以人工为导向的。

现在已经进入了多核的时代,我们的程序如何更多的利用好硬件cpu,答案是并行处理。在.net4.0之前我们要开发并行的程序是非常的困难,在.net4.0中,在命名空间System.Threading.Tasks提供了方便的并行开发的类库。


2、数据并行

      数据并行指的是对源集合或数组的元素同时(即,并行)执行相同操作的场景。 在数据并行操作中,对源集合进行分区,以便多个线程能够同时在不同的网段上操作。

      任务并行库 (TPL) 支持通过 System.Threading.Tasks.Parallel 类实现的数据并行。 此类对 for 循环和 foreach 循环提供了基于方法的并行执行。你为Parallel.For 或 Parallel.ForEach 循环编写的循环逻辑与编写连续循环的相似。 无需创建线程或列工作项。 在基本循环中,不需要加锁。TPL 为你处理所有低级别的工作。

      Parallel.For()和Parallel.ForEach()方法多次调用同一个方法,而Parallel.Invoke()方法允许同时调用不同的方法。

回到顶部

二、Parallel.Invoke():并行调用多个任务 。

例1:同时调用2个任务

static void Main(string[] args)
{
var watch = Stopwatch.StartNew();
Parallel.Invoke(Run1, Run2);
watch.Stop();
Console.WriteLine("我是并行开发,总共耗时:{0}", watch.ElapsedMilliseconds)
}static void Run1()
{
Console.WriteLine("我是任务一,我跑了3s");
Thread.Sleep(3000);
}static void Run2()
{
Console.WriteLine("我是任务二,我跑了5s");
Thread.Sleep(5000);
}

例2:说明并不是每个任务一个线程。

// 定义一个线程局部变量,返回其线程名
ThreadLocal ThreadName = new ThreadLocal(() =>
{
return "Thread" + Thread.CurrentThread.ManagedThreadId;
});// 打印出当前线程名的方法。
Action action = () =>
{
// 如果 ThreadName.IsValueCreated 为true,在这个线程上不是第一次运行这个方法。
bool repeat = ThreadName.IsValueCreated;
Console.WriteLine("ThreadName = {0} {1}", ThreadName.Value, repeat ? "(repeat)" : "");
};// 调用8个方法,你应该会看到一些重复的线程名
Parallel.Invoke(action, action, action, action, action, action, action, action);
ThreadName.Dispose();

image

回到顶部

三、Parallel.For(): for 循环的并行运算 

      我们知道串行代码中也有一个for,但是那个for并没有用到多核,而Paraller.for它会在底层根据硬件线程的运行状况来充分的使用所有的可利用的硬件线程,注意这里的Parallel.for的步行是1。

      在For()方法中,前两个参数定义了循环的开头和结束。示例从0迭代到9。第3个参数是一个 Action委托。整数参数是循环的迭代次数,该参数被传递给Action 委托引用的方法。 Parallel.For方法的返回类型是ParallelLoopResult结构,它提供了循环是否结束的信息。

ParallelLoopResult result = Parallel.For(0, 10, i =>
{
Console.WriteLine("{0}, task: {1}, thread: {2}", i, Task.CurrentId, Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(10);
});
Console.WriteLine(result.IsCompleted);

首先先写一个普通的循环:

private void NormalFor()
{
for (var i &#61; 0; i <10000; i&#43;&#43;)
{
for (var j &#61; 0; j <1000; j&#43;&#43;)
{
for (var k &#61; 0; k <100; k&#43;&#43;)
{
DoSomething();
}
}
}
}

再看一个并行的For语句&#xff1a;

private void ParallelFor()
{
Parallel.For(0, 10000, i &#61;>
{
for (int j &#61; 0; j <1000; j&#43;&#43;)
{
for (var k &#61; 0; k <100; k&#43;&#43;)
{
DoSomething();
}
}});
}

上面的例子中&#xff0c;只是将最外层的For语句替换成了Parallel.For&#xff0c;Parallel执行速度可以提高近一倍。

回到顶部

四、Parallel.ForEach&#xff08;&#xff09;&#xff1a;foreach 循环的并行运算 

private void NormalForeach()
{
foreach (var file in GetFiles())
{
DoSomething();
}}private void ParallelForeach()
{
Parallel.ForEach(GetFiles(), file &#61;> {
DoSomething();
});
}

ForEach的使用跟For使用几乎是差不多了&#xff0c;只是在对非泛型的Collection进行操作的时候&#xff0c;需要通过Cast方法进行转换。

ForEach的独到之处就是可以将数据进行分区&#xff0c;每一个小区内实现串行计算&#xff0c;分区采用Partitioner.Create实现。

for (int j &#61; 1; j <4; j&#43;&#43;)
{
Console.WriteLine("\n第{0}次比较", j);
ConcurrentBag bag &#61; new ConcurrentBag();
var watch &#61; Stopwatch.StartNew();
watch.Start();
for (int i &#61; 0; i <3000000; i&#43;&#43;)
{
bag.Add(i);
}
Console.WriteLine("串行计算&#xff1a;集合有:{0},总共耗时&#xff1a;{1}", bag.Count, watch.ElapsedMilliseconds);GC.Collect();
bag &#61; new ConcurrentBag();
watch &#61; Stopwatch.StartNew();
watch.Start();
Parallel.ForEach(Partitioner.Create(0, 3000000), i &#61;>
{
for (int m &#61; i.Item1; m {
bag.Add(m);
}
});
Console.WriteLine("并行计算&#xff1a;集合有:{0},总共耗时&#xff1a;{1}", bag.Count, watch.ElapsedMilliseconds);
GC.Collect();
}

回到顶部

五、线程局部变量

下面这段代码多次运行每次的结果都不一样&#xff0c;因为total变量是公共的&#xff0c;而我们的程序是多个线程的加&#xff0c;而多个线程之间是不能把数据共享的。

public void NormalParallelTest()
{
int[] nums &#61; Enumerable.Range(0, 1000000).ToArray();
long total &#61; 0;
Parallel.For(0,nums.Length,i&#61;>
{
total &#43;&#61; nums[i];
});
Console.WriteLine("The total is {0}", total);
}

其实我们需要的是在每个线程中计算出一个和值&#xff0c;然后再进行累加。我们来看看线程局部变量&#xff1a;

泛型方法Parallel.For的原型&#xff1a;

public static ParallelLoopResult For
(int fromInclusive, int toExclusive, Func localInit, Func body,Action localFinally);

  • TLocal:线程变量的类型&#xff1b;第一个、第二个参数就不必多说了&#xff0c;就是起始值跟结束值。
  • localInit&#xff1a;每个线程的线程局部变量初始值的设置&#xff1b;
  • body&#xff1a;每次循环执行的方法&#xff0c;其中方法的最后一个参数就是线程局部变量&#xff1b;
  • localFinally&#xff1a;每个线程之后执行的方法。

1、Parallel.For中定义局部变量&#xff1a;

从2开始&#xff0c;累加2个&#xff0c;得49.

int[] nums &#61; Enumerable.Range(0, 10).ToArray();
long total &#61; 0;Parallel.For(0, nums.Length, () &#61;> { return 2; },(j, loop, subtotal) &#61;>//1、每次循环执行的方法
{
subtotal &#43;&#61; nums[j];
Console.WriteLine("主体&#xff1a; thread {1}, task {2},结果&#xff1a;{0}", j&#43; ":" &#43;nums[j] &#43; "-" &#43; subtotal, Thread.CurrentThread.ManagedThreadId, Task.CurrentId);return subtotal;
},(x) &#61;>//2、每个线程执行之后执行的方法
{Console.WriteLine(" 最终执行&#xff1a;thread {1}, task {2}&#xff0c;结果&#xff1a;{0} ", x, Thread.CurrentThread.ManagedThreadId, Task.CurrentId);
Interlocked.Add(ref total, x);
});
Console.WriteLine("The total is {0}", total);

image


2、Parallel.Each中定义局部变量&#xff1a;

要注意的是&#xff0c;我们必须要使用ForEach&#xff0c;因为第一个参数表示的是迭代源的类型&#xff0c;第二个表示的是线程局部变量的类型&#xff0c;其方法的参数跟For是差不多的。

public void ForeachThreadLocalTest()
{int[] nums &#61; Enumerable.Range(0, 1000000).ToArray();long total &#61; 0;Parallel.ForEach(nums,()&#61;>0,(member,loopState,subTotal)&#61;>//1、每次循环执行的方法{subTotal &#43;&#61; member;return subTotal;},(perLocal)&#61;>//2、每个线程执行之后执行的方法Interlocked.Add(ref total,perLocal));Console.WriteLine("The total is {0}", total);
}

回到顶部

六、Break、Stop中断与停止线程

      在并行循环的委托参数中提供了一个ParallelLoopState&#xff0c;该实例提供了Break和Stop方法来帮我们实现。

  • Break“中断”&#xff1a;表示完成当前线程上当前迭代之前的所有线程上的所有迭代&#xff0c;然后退出循环。&#xff08;比如并行计算正在迭代100&#xff0c;那么break后程序还会迭代所有小于100的。&#xff09;
  • Stop“停止”&#xff1a;表示在方便的情况下尽快停止所有迭代。&#xff08;比如正在迭代100突然遇到stop&#xff0c;那它啥也不管了&#xff0c;直接退出。&#xff09;

首先我们可以看到在Parallel.For的一个重载方法&#xff1a;

public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action body)

在委托的最后一个参数类型为ParallelLoopState,而ParallelLoopState里面提供给我们两个方法&#xff1a;Break、Stop来终止迭代。

private void StopLoop()
{var Stack &#61; new ConcurrentStack();Parallel.For(0, 10000, (i, loopState) &#61;>{if (i <1000)Stack.Push(i.ToString());else{loopState.Stop();return;}});Console.WriteLine("Stop Loop Info:\n elements count:{0}", Stack.Count);
}

 

回到顶部

七、Cancel取消循环

      在并行的循环中支持通过传递ParallelOptions参数中的CancellationToken进行取消循环的控制&#xff0c;我们可以CancellationTokenSource实例化之后传递给ParallelOptions对象Cancellation值。下面来看个示例&#xff1a;

      在For循环的实现代码内部&#xff0c;Parallel类验证CancellationToken 的结果&#xff0c;并取消操作。一旦取消操作&#xff0c;For&#xff08;&#xff09;方法就抛出个OperationCanceledException类型的异常&#xff0c;这是本例捕获的异常。使用 CancellationTokeri可以注册取消操作时的信息。为此&#xff0c;需要调用Register方法&#xff0c;并传递一个在取消 操作时调用的委托。

var cts &#61; new CancellationTokenSource();
cts.Token.Register(() &#61;>Console.WriteLine("*** token canceled"));// start a task that sends a cancel after 500 ms
new Task(() &#61;>
{
Thread.Sleep(500);
cts.Cancel(false);
}).Start();try
{
ParallelLoopResult result &#61;
Parallel.For(0, 100,
new ParallelOptions()
{
CancellationToken &#61; cts.Token,
},
x &#61;>
{
Console.WriteLine("loop {0} started", x);
int sum &#61; 0;
for (int i &#61; 0; i <100; i&#43;&#43;)
{
Thread.Sleep(2);
sum &#43;&#61; i;
}
Console.WriteLine("loop {0} finished", x);
});
}
catch (OperationCanceledException ex)
{
Console.WriteLine(ex.Message);
}

回到顶部

八、Handel Exceptions异常处理

      在处理并行循环的异常的与顺序循环异常的处理是有所不同的,并行循环里面可能会一个异常在多个循环中出现,或则一个线程上的异常导致另外一个线程上也出现异常。比较好的处理方式就是&#xff0c;首先获取所有的异常最后通过AggregateException来包装所有的循环的异常&#xff0c;循环结束后进行throw。看一段示例代码&#xff1a;

private void HandleNumbers(int[] numbers)
{var exceptions &#61; new ConcurrentQueue();Parallel.For(0, numbers.Length, i &#61;> {try{if (numbers[i] > 10 && numbers[i] <20){throw new Exception(String.Format("numbers[{0}] betwewn 10 to 20",i));}}catch (Exception e){exceptions.Enqueue(e);}});if (exceptions.Count > 0) throw new AggregateException(exceptions); }

测试方法&#xff1a;

public void HandleExceptions()
{
var numbers &#61; Enumerable.Range(0, 10000).ToArray();
try
{
this.HandleNumbers(numbers);
}
catch(AggregateException exceptions)
{
foreach (var ex in exceptions.InnerExceptions)
{
Console.WriteLine(ex.Message);
}
}
}

对上面的方法说明下&#xff0c;在HandleNumbers方法中&#xff0c;就是一个小的demo如果元素的值出现在10-20之间就抛出异常。在上面我们的处理方法就是&#xff1a;在循环时通过队列将所有的异常都集中起来&#xff0c;循环结束后来抛出一个AggregateException。


推荐阅读
  • 在Docker中,将主机目录挂载到容器中作为volume使用时,常常会遇到文件权限问题。这是因为容器内外的UID不同所导致的。本文介绍了解决这个问题的方法,包括使用gosu和suexec工具以及在Dockerfile中配置volume的权限。通过这些方法,可以避免在使用Docker时出现无写权限的情况。 ... [详细]
  • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
  • 本文介绍了Python爬虫技术基础篇面向对象高级编程(中)中的多重继承概念。通过继承,子类可以扩展父类的功能。文章以动物类层次的设计为例,讨论了按照不同分类方式设计类层次的复杂性和多重继承的优势。最后给出了哺乳动物和鸟类的设计示例,以及能跑、能飞、宠物类和非宠物类的增加对类数量的影响。 ... [详细]
  • 预备知识可参考我整理的博客Windows编程之线程:https:www.cnblogs.comZhuSenlinp16662075.htmlWindows编程之线程同步:https ... [详细]
  • 欢乐的票圈重构之旅——RecyclerView的头尾布局增加
    项目重构的Git地址:https:github.comrazerdpFriendCircletreemain-dev项目同步更新的文集:http:www.jianshu.comno ... [详细]
  • EPPlus绘制刻度线的方法及示例代码
    本文介绍了使用EPPlus绘制刻度线的方法,并提供了示例代码。通过ExcelPackage类和List对象,可以实现在Excel中绘制刻度线的功能。具体的方法和示例代码在文章中进行了详细的介绍和演示。 ... [详细]
  • imx6ull开发板驱动MT7601U无线网卡的方法和步骤详解
    本文详细介绍了在imx6ull开发板上驱动MT7601U无线网卡的方法和步骤。首先介绍了开发环境和硬件平台,然后说明了MT7601U驱动已经集成在linux内核的linux-4.x.x/drivers/net/wireless/mediatek/mt7601u文件中。接着介绍了移植mt7601u驱动的过程,包括编译内核和配置设备驱动。最后,列举了关键词和相关信息供读者参考。 ... [详细]
  • 本文介绍了深入浅出Linux设备驱动编程的重要性,以及两种加载和删除Linux内核模块的方法。通过一个内核模块的例子,展示了模块的编译和加载过程,并讨论了模块对内核大小的控制。深入理解Linux设备驱动编程对于开发者来说非常重要。 ... [详细]
  • 本文介绍了在wepy中运用小顺序页面受权的计划,包含了用户点击作废后的从新受权计划。 ... [详细]
  • 模板引擎StringTemplate的使用方法和特点
    本文介绍了模板引擎StringTemplate的使用方法和特点,包括强制Model和View的分离、Lazy-Evaluation、Recursive enable等。同时,还介绍了StringTemplate语法中的属性和普通字符的使用方法,并提供了向模板填充属性的示例代码。 ... [详细]
  • 本文介绍了如何在Mac上使用Pillow库加载不同于默认字体和大小的字体,并提供了一个简单的示例代码。通过该示例,读者可以了解如何在Python中使用Pillow库来写入不同字体的文本。同时,本文也解决了在Mac上使用Pillow库加载字体时可能遇到的问题。读者可以根据本文提供的示例代码,轻松实现在Mac上使用Pillow库加载不同字体的功能。 ... [详细]
  • 本文讨论了编写可保护的代码的重要性,包括提高代码的可读性、可调试性和直观性。同时介绍了优化代码的方法,如代码格式化、解释函数和提炼函数等。还提到了一些常见的坏代码味道,如不规范的命名、重复代码、过长的函数和参数列表等。最后,介绍了如何处理数据泥团和进行函数重构,以提高代码质量和可维护性。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • 重入锁(ReentrantLock)学习及实现原理
    本文介绍了重入锁(ReentrantLock)的学习及实现原理。在学习synchronized的基础上,重入锁提供了更多的灵活性和功能。文章详细介绍了重入锁的特性、使用方法和实现原理,并提供了类图和测试代码供读者参考。重入锁支持重入和公平与非公平两种实现方式,通过对比和分析,读者可以更好地理解和应用重入锁。 ... [详细]
  • 篇首语:本文由编程笔记#小编为大家整理,主要介绍了软件测试知识点之数据库压力测试方法小结相关的知识,希望对你有一定的参考价值。 ... [详细]
author-avatar
兜兜岁月真伟大
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有