热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

java多线程执行任务_java多线程处理并行任务

在多线程编程过程中,遇到这样的情况,主线程需要等待多个子线程的处理结果,才能继续运行下去。个人给这样的子线程任务取了个名字叫并行任务。对于

在多线程编程过程中,遇到这样的情况,主线程需要等待多个子线程的处理结果,才能继续运行下去。个人给这样的子线程任务取了个名字叫并行任务。对于这种任务,每次去编写代码加锁控制时序,觉得太麻烦,正好朋友提到CountDownLatch这个类,于是用它来编写了个小工具。

首先,要处理的是多个任务,于是定义了一个接口

packagecom.zyj.thread;importcom.zyj.exception.ChildThreadException;/*** 多任务处理

*@authorzengyuanjun*/

public interfaceMultiThreadHandler {/*** 添加任务

*@paramtasks*/

voidaddTask(Runnable... tasks);/*** 执行任务

*@throwsChildThreadException*/

void run() throwsChildThreadException;

}

要处理的是并行任务,需要用到CountDownLatch来统计所有子线程执行结束,还要一个集合记录所有任务,另外加上我自定义的ChildThreadException类来记录子线程中的异常,通知主线程是否所有子线程都执行成功,便得到了下面这个抽象类AbstractMultiParallelThreadHandler。在这个类中,我顺便完成了addTask这个方法。

packagecom.zyj.thread.parallel;importjava.util.ArrayList;importjava.util.List;importjava.util.concurrent.CountDownLatch;importcom.zyj.exception.ChildThreadException;importcom.zyj.thread.MultiThreadHandler;/*** 并行线程处理

*@authorzengyuanjun*/

public abstract class AbstractMultiParallelThreadHandler implementsMultiThreadHandler {/*** 子线程倒计数锁*/

protectedCountDownLatch childLatch;/*** 任务列表*/

protected ListtaskList;/*** 子线程异常*/

protectedChildThreadException childThreadException;publicAbstractMultiParallelThreadHandler() {

taskList= new ArrayList();

childThreadException= newChildThreadException();

}public voidsetCountDownLatch(CountDownLatch latch) {this.childLatch =latch;

}/*** {@inheritDoc}*/@Overridepublic voidaddTask(Runnable... tasks) {if (null ==tasks) {

taskList= new ArrayList();

}for(Runnable task : tasks) {

taskList.add(task);

}

}/*** {@inheritDoc}*/@Overridepublic abstract void run() throwsChildThreadException;

}

具体的实现,则是下面这个类。实现原理也很简单,主线程根据并行任务数创建一个CountDownLatch,传到子线程中,并运行所有子线程,然后await等待。子线程执行结束后调用CountDownLatch的countDown()方法,当所有子线程执行结束后,CountDownLatch计数清零,主线程被唤醒继续执行。

packagecom.zyj.thread.parallel;importjava.util.concurrent.CountDownLatch;importcom.zyj.exception.ChildThreadException;/*** 并行任务处理工具

*

*@authorzengyuanjun

**/

public class MultiParallelThreadHandler extendsAbstractMultiParallelThreadHandler {/*** 无参构造器*/

publicMultiParallelThreadHandler() {super();

}/*** 根据任务数量运行任务*/@Overridepublic void run() throwsChildThreadException {if (null == taskList || taskList.size() == 0) {return;

}else if (taskList.size() == 1) {

runWithoutNewThread();

}else if (taskList.size() > 1) {

runInNewThread();

}

}/*** 新建线程运行任务

*

*@throwsChildThreadException*/

private void runInNewThread() throwsChildThreadException {

childLatch= newCountDownLatch(taskList.size());

childThreadException.clearExceptionList();for(Runnable task : taskList) {

invoke(new MultiParallelRunnable(newMultiParallelContext(task, childLatch, childThreadException)));

}

taskList.clear();try{

childLatch.await();

}catch(InterruptedException e) {

childThreadException.addException(e);

}

throwChildExceptionIfRequired();

}/*** 默认线程执行方法

*

*@paramcommand*/

protected voidinvoke(Runnable command) {if(command.getClass().isAssignableFrom(Thread.class)){

Thread.class.cast(command).start();

}else{newThread(command).start();

}

}/*** 在当前线程中直接运行

*

*@throwsChildThreadException*/

private void runWithoutNewThread() throwsChildThreadException {try{

taskList.get(0).run();

}catch(Exception e) {

childThreadException.addException(e);

}

throwChildExceptionIfRequired();

}/*** 根据需要抛出子线程异常

*

*@throwsChildThreadException*/

private void throwChildExceptionIfRequired() throwsChildThreadException {if(childThreadException.hasException()) {

childExceptionHandler(childThreadException);

}

}/*** 默认抛出子线程异常

*@parame

*@throwsChildThreadException*/

protected void childExceptionHandler(ChildThreadException e) throwsChildThreadException {throwe;

}

}

并行任务是要运行的子线程,只要实现Runnable接口就行,并没有CountDownLatch对象,所以我用MultiParallelRunnable类对它封装一次,MultiParallelRunnable类里有个属性叫 MultiParallelContext,MultiParallelContext里面就是保存的子线程task、倒计数锁CountDownLatch和ChildThreadException这些参数。MultiParallelRunnable类完成运行子线程、记录子线程异常和倒计数锁减一。

packagecom.zyj.thread.parallel;/*** 并行线程对象

*

*@authorzengyuanjun

**/

public class MultiParallelRunnable implementsRunnable {/*** 并行任务参数*/

privateMultiParallelContext context;/*** 构造函数

*@paramcontext*/

publicMultiParallelRunnable(MultiParallelContext context) {this.context =context;

}/*** 运行任务*/@Overridepublic voidrun() {try{

context.getTask().run();

}catch(Exception e) {

e.printStackTrace();

context.getChildException().addException(e);

}finally{

context.getChildLatch().countDown();

}

}

}

packagecom.zyj.thread.parallel;importjava.util.concurrent.CountDownLatch;importcom.zyj.exception.ChildThreadException;/*** 并行任务参数

*@authorzengyuanjun

**/

public classMultiParallelContext {/*** 运行的任务*/

privateRunnable task;/*** 子线程倒计数锁*/

privateCountDownLatch childLatch;/*** 子线程异常*/

privateChildThreadException childException;publicMultiParallelContext() {

}publicMultiParallelContext(Runnable task, CountDownLatch childLatch, ChildThreadException childException) {this.task =task;this.childLatch =childLatch;this.childException =childException;

}publicRunnable getTask() {returntask;

}public voidsetTask(Runnable task) {this.task =task;

}publicCountDownLatch getChildLatch() {returnchildLatch;

}public voidsetChildLatch(CountDownLatch childLatch) {this.childLatch =childLatch;

}publicChildThreadException getChildException() {returnchildException;

}public voidsetChildException(ChildThreadException childException) {this.childException =childException;

}

}

这里提一下ChildThreadException这个自定义异常,跟普通异常不一样,我在里面加了个List exceptionList,用来保存子线程的异常。因为有多个子线程,抛出的异常可能有多个。

packagecom.zyj.exception;importjava.io.PrintStream;importjava.util.ArrayList;importjava.util.List;importjava.util.concurrent.locks.Lock;importjava.util.concurrent.locks.ReentrantLock;importcom.zyj.exception.util.ExceptionMessageFormat;importcom.zyj.exception.util.factory.ExceptionMsgFormatFactory;/*** 子线程异常,子线程出现异常时抛出

*@authorzengyuanjun*/

public class ChildThreadException extendsException {/*** serialVersionUID*/

private static final long serialVersionUID = 5682825039992529875L;/*** 子线程的异常列表*/

private ListexceptionList;/*** 异常信息格式化工具*/

privateExceptionMessageFormat formatter;/*** 锁*/

privateLock lock;publicChildThreadException() {super();

initial();

}publicChildThreadException(String message) {super(message);

initial();

}publicChildThreadException(String message, StackTraceElement[] stackTrace) {this(message);

setStackTrace(stackTrace);

}private voidinitial() {

exceptionList= new ArrayList();

lock= newReentrantLock();

formatter=ExceptionMsgFormatFactory.getInstance().getFormatter(ExceptionMsgFormatFactory.STACK_TRACE);

}/*** 子线程是否有异常

*@return

*/

public booleanhasException() {return exceptionList.size() > 0;

}/*** 添加子线程的异常

*@parame*/

public voidaddException(Exception e) {try{

lock.lock();

e.setStackTrace(e.getStackTrace());

exceptionList.add(e);

}finally{

lock.unlock();

}

}/*** 获取子线程的异常列表

*@return

*/

public ListgetExceptionList() {returnexceptionList;

}/*** 清空子线程的异常列表*/

public voidclearExceptionList() {

exceptionList.clear();

}/*** 获取所有子线程异常的堆栈跟踪信息

*@return

*/

publicString getAllStackTraceMessage() {

StringBuffer sb= newStringBuffer();for(Exception e : exceptionList) {

sb.append(e.getClass().getName());

sb.append(": ");

sb.append(e.getMessage());

sb.append("\n");

sb.append(formatter.formate(e));

}returnsb.toString();

}/*** 打印所有子线程的异常的堆栈跟踪信息*/

public voidprintAllStackTrace() {

printAllStackTrace(System.err);

}/*** 打印所有子线程的异常的堆栈跟踪信息

*@params*/

public voidprintAllStackTrace(PrintStream s) {for(Exception e : exceptionList) {

e.printStackTrace(s);

}

}

}

·有没有问题试一下才知道,写了个类来测试:TestCase 为并行任务子线程,resultMap为并行任务共同完成的结果集。假设resultMap由5部分组成,main方法中启动5个子线程分别完成一个部分,等5个子线程处理完后,main方法将结果resultMap打印出来。

packagecom.zyj.thread.test;importjava.util.HashMap;importjava.util.Map;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importcom.zyj.exception.ChildThreadException;importcom.zyj.thread.MultiThreadHandler;importcom.zyj.thread.parallel.MultiParallelThreadHandler;importcom.zyj.thread.parallel.ParallelTaskWithThreadPool;public class TestCase implementsRunnable {privateString name;private Mapresult;public TestCase(String name, Mapresult) {this.name =name;this.result =result;

}

@Overridepublic voidrun() {//模拟线程执行1000ms

try{

Thread.sleep(1000);

}catch(InterruptedException e) {

e.printStackTrace();

}//模拟线程1和线程3抛出异常//if(name.equals("1") || name.equals("3"))//throw new RuntimeException(name + ": throw exception");

result.put(name, "complete part " + name + "!");

}public static voidmain(String[] args) {

System.out.println("main begin \t=================");

Map resultMap = new HashMap(8, 1);

MultiThreadHandler handler= newMultiParallelThreadHandler();//ExecutorService service = Executors.newFixedThreadPool(3);//MultiThreadHandler handler = new ParallelTaskWithThreadPool(service);

TestCase task = null;//启动5个子线程作为要处理的并行任务,共同完成结果集resultMap

for(int i&#61;1; i<&#61;5 ; i&#43;&#43;){

task&#61; new TestCase("" &#43;i, resultMap);

handler.addTask(task);

}try{

handler.run();

}catch(ChildThreadException e) {

System.out.println(e.getAllStackTraceMessage());

}

System.out.println(resultMap);//service.shutdown();

System.out.println("main end \t&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;");

}

}

运行main方法&#xff0c;测试结果如下

main begin &#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;{3&#61;complete part 3!, 2&#61;complete part 2!, 1&#61;complete part 1!, 5&#61;complete part 5!, 4&#61;complete part 4!}

main end&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;

将模拟线程1和线程3抛出异常的注释打开&#xff0c;测试结果如下

8c50cbb23417d38350460dfb170b25f6.png

红色的打印是子线程中捕获异常打印的堆栈跟踪信息&#xff0c;黑色的异常信息是主线程main方法中打印的&#xff0c;这说明主线程能够监视到子线程的出错&#xff0c;以便采取对应的处理。由于线程1和线程3出现了异常&#xff0c;未能完成任务&#xff0c;所以打印的resultMap只有第2、4、5三个部分完成。

为了便于扩展&#xff0c;我把MultiParallelThreadHandler类中的invoke方法和childExceptionHandler方法定义为protected类型。invoke方法中是具体的线程执行&#xff0c;childExceptionHandler方法是子线程抛出异常后的处理&#xff0c;可以去继承&#xff0c;重写为自己想要的&#xff0c;比如我想用线程池去运行子线程&#xff0c;就可以去继承并重写invoke方法&#xff0c;得到下面的这个类

packagecom.zyj.thread.parallel;importjava.util.concurrent.ExecutorService;/*** 使用线程池运行并行任务

*&#64;authorzengyuanjun

**/

public class ParallelTaskWithThreadPool extendsMultiParallelThreadHandler {privateExecutorService service;publicParallelTaskWithThreadPool() {

}publicParallelTaskWithThreadPool(ExecutorService service) {this.service &#61;service;

}publicExecutorService getService() {returnservice;

}public voidsetService(ExecutorService service) {this.service &#61;service;

}/*** 使用线程池运行*/&#64;Overrideprotected voidinvoke(Runnable command) {if(null !&#61;service){

service.execute(command);

}else{super.invoke(command);

}

}

}

测试就在上面的测试类中&#xff0c;只不过被注释掉了&#xff0c;测试结果是一样的&#xff0c;就不多说了。



推荐阅读
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • 本文由编程笔记#小编为大家整理,主要介绍了logistic回归(线性和非线性)相关的知识,包括线性logistic回归的代码和数据集的分布情况。希望对你有一定的参考价值。 ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • Linux重启网络命令实例及关机和重启示例教程
    本文介绍了Linux系统中重启网络命令的实例,以及使用不同方式关机和重启系统的示例教程。包括使用图形界面和控制台访问系统的方法,以及使用shutdown命令进行系统关机和重启的句法和用法。 ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • Go Cobra命令行工具入门教程
    本文介绍了Go语言实现的命令行工具Cobra的基本概念、安装方法和入门实践。Cobra被广泛应用于各种项目中,如Kubernetes、Hugo和Github CLI等。通过使用Cobra,我们可以快速创建命令行工具,适用于写测试脚本和各种服务的Admin CLI。文章还通过一个简单的demo演示了Cobra的使用方法。 ... [详细]
  • Java自带的观察者模式及实现方法详解
    本文介绍了Java自带的观察者模式,包括Observer和Observable对象的定义和使用方法。通过添加观察者和设置内部标志位,当被观察者中的事件发生变化时,通知观察者对象并执行相应的操作。实现观察者模式非常简单,只需继承Observable类和实现Observer接口即可。详情请参考Java官方api文档。 ... [详细]
  • Python爬虫中使用正则表达式的方法和注意事项
    本文介绍了在Python爬虫中使用正则表达式的方法和注意事项。首先解释了爬虫的四个主要步骤,并强调了正则表达式在数据处理中的重要性。然后详细介绍了正则表达式的概念和用法,包括检索、替换和过滤文本的功能。同时提到了re模块是Python内置的用于处理正则表达式的模块,并给出了使用正则表达式时需要注意的特殊字符转义和原始字符串的用法。通过本文的学习,读者可以掌握在Python爬虫中使用正则表达式的技巧和方法。 ... [详细]
  • 【shell】网络处理:判断IP是否在网段、两个ip是否同网段、IP地址范围、网段包含关系
    本文介绍了使用shell脚本判断IP是否在同一网段、判断IP地址是否在某个范围内、计算IP地址范围、判断网段之间的包含关系的方法和原理。通过对IP和掩码进行与计算,可以判断两个IP是否在同一网段。同时,还提供了一段用于验证IP地址的正则表达式和判断特殊IP地址的方法。 ... [详细]
  • 本文介绍了Java集合库的使用方法,包括如何方便地重复使用集合以及下溯造型的应用。通过使用集合库,可以方便地取用各种集合,并将其插入到自己的程序中。为了使集合能够重复使用,Java提供了一种通用类型,即Object类型。通过添加指向集合的对象句柄,可以实现对集合的重复使用。然而,由于集合只能容纳Object类型,当向集合中添加对象句柄时,会丢失其身份或标识信息。为了恢复其本来面貌,可以使用下溯造型。本文还介绍了Java 1.2集合库的特点和优势。 ... [详细]
  • 本文介绍了使用readlink命令获取文件的完整路径的简单方法,并提供了一个示例命令来打印文件的完整路径。共有28种解决方案可供选择。 ... [详细]
author-avatar
WingKeii-
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有