热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

java多线程执行任务_java多线程处理并行任务

在多线程编程过程中,遇到这样的情况,主线程需要等待多个子线程的处理结果,才能继续运行下去。个人给这样的子线程任务取了个名字叫并行任务。对于

在多线程编程过程中,遇到这样的情况,主线程需要等待多个子线程的处理结果,才能继续运行下去。个人给这样的子线程任务取了个名字叫并行任务。对于这种任务,每次去编写代码加锁控制时序,觉得太麻烦,正好朋友提到CountDownLatch这个类,于是用它来编写了个小工具。

首先,要处理的是多个任务,于是定义了一个接口

packagecom.zyj.thread;importcom.zyj.exception.ChildThreadException;/*** 多任务处理

*@authorzengyuanjun*/

public interfaceMultiThreadHandler {/*** 添加任务

*@paramtasks*/

voidaddTask(Runnable... tasks);/*** 执行任务

*@throwsChildThreadException*/

void run() throwsChildThreadException;

}

要处理的是并行任务,需要用到CountDownLatch来统计所有子线程执行结束,还要一个集合记录所有任务,另外加上我自定义的ChildThreadException类来记录子线程中的异常,通知主线程是否所有子线程都执行成功,便得到了下面这个抽象类AbstractMultiParallelThreadHandler。在这个类中,我顺便完成了addTask这个方法。

packagecom.zyj.thread.parallel;importjava.util.ArrayList;importjava.util.List;importjava.util.concurrent.CountDownLatch;importcom.zyj.exception.ChildThreadException;importcom.zyj.thread.MultiThreadHandler;/*** 并行线程处理

*@authorzengyuanjun*/

public abstract class AbstractMultiParallelThreadHandler implementsMultiThreadHandler {/*** 子线程倒计数锁*/

protectedCountDownLatch childLatch;/*** 任务列表*/

protected ListtaskList;/*** 子线程异常*/

protectedChildThreadException childThreadException;publicAbstractMultiParallelThreadHandler() {

taskList= new ArrayList();

childThreadException= newChildThreadException();

}public voidsetCountDownLatch(CountDownLatch latch) {this.childLatch =latch;

}/*** {@inheritDoc}*/@Overridepublic voidaddTask(Runnable... tasks) {if (null ==tasks) {

taskList= new ArrayList();

}for(Runnable task : tasks) {

taskList.add(task);

}

}/*** {@inheritDoc}*/@Overridepublic abstract void run() throwsChildThreadException;

}

具体的实现,则是下面这个类。实现原理也很简单,主线程根据并行任务数创建一个CountDownLatch,传到子线程中,并运行所有子线程,然后await等待。子线程执行结束后调用CountDownLatch的countDown()方法,当所有子线程执行结束后,CountDownLatch计数清零,主线程被唤醒继续执行。

packagecom.zyj.thread.parallel;importjava.util.concurrent.CountDownLatch;importcom.zyj.exception.ChildThreadException;/*** 并行任务处理工具

*

*@authorzengyuanjun

**/

public class MultiParallelThreadHandler extendsAbstractMultiParallelThreadHandler {/*** 无参构造器*/

publicMultiParallelThreadHandler() {super();

}/*** 根据任务数量运行任务*/@Overridepublic void run() throwsChildThreadException {if (null == taskList || taskList.size() == 0) {return;

}else if (taskList.size() == 1) {

runWithoutNewThread();

}else if (taskList.size() > 1) {

runInNewThread();

}

}/*** 新建线程运行任务

*

*@throwsChildThreadException*/

private void runInNewThread() throwsChildThreadException {

childLatch= newCountDownLatch(taskList.size());

childThreadException.clearExceptionList();for(Runnable task : taskList) {

invoke(new MultiParallelRunnable(newMultiParallelContext(task, childLatch, childThreadException)));

}

taskList.clear();try{

childLatch.await();

}catch(InterruptedException e) {

childThreadException.addException(e);

}

throwChildExceptionIfRequired();

}/*** 默认线程执行方法

*

*@paramcommand*/

protected voidinvoke(Runnable command) {if(command.getClass().isAssignableFrom(Thread.class)){

Thread.class.cast(command).start();

}else{newThread(command).start();

}

}/*** 在当前线程中直接运行

*

*@throwsChildThreadException*/

private void runWithoutNewThread() throwsChildThreadException {try{

taskList.get(0).run();

}catch(Exception e) {

childThreadException.addException(e);

}

throwChildExceptionIfRequired();

}/*** 根据需要抛出子线程异常

*

*@throwsChildThreadException*/

private void throwChildExceptionIfRequired() throwsChildThreadException {if(childThreadException.hasException()) {

childExceptionHandler(childThreadException);

}

}/*** 默认抛出子线程异常

*@parame

*@throwsChildThreadException*/

protected void childExceptionHandler(ChildThreadException e) throwsChildThreadException {throwe;

}

}

并行任务是要运行的子线程,只要实现Runnable接口就行,并没有CountDownLatch对象,所以我用MultiParallelRunnable类对它封装一次,MultiParallelRunnable类里有个属性叫 MultiParallelContext,MultiParallelContext里面就是保存的子线程task、倒计数锁CountDownLatch和ChildThreadException这些参数。MultiParallelRunnable类完成运行子线程、记录子线程异常和倒计数锁减一。

packagecom.zyj.thread.parallel;/*** 并行线程对象

*

*@authorzengyuanjun

**/

public class MultiParallelRunnable implementsRunnable {/*** 并行任务参数*/

privateMultiParallelContext context;/*** 构造函数

*@paramcontext*/

publicMultiParallelRunnable(MultiParallelContext context) {this.context =context;

}/*** 运行任务*/@Overridepublic voidrun() {try{

context.getTask().run();

}catch(Exception e) {

e.printStackTrace();

context.getChildException().addException(e);

}finally{

context.getChildLatch().countDown();

}

}

}

packagecom.zyj.thread.parallel;importjava.util.concurrent.CountDownLatch;importcom.zyj.exception.ChildThreadException;/*** 并行任务参数

*@authorzengyuanjun

**/

public classMultiParallelContext {/*** 运行的任务*/

privateRunnable task;/*** 子线程倒计数锁*/

privateCountDownLatch childLatch;/*** 子线程异常*/

privateChildThreadException childException;publicMultiParallelContext() {

}publicMultiParallelContext(Runnable task, CountDownLatch childLatch, ChildThreadException childException) {this.task =task;this.childLatch =childLatch;this.childException =childException;

}publicRunnable getTask() {returntask;

}public voidsetTask(Runnable task) {this.task =task;

}publicCountDownLatch getChildLatch() {returnchildLatch;

}public voidsetChildLatch(CountDownLatch childLatch) {this.childLatch =childLatch;

}publicChildThreadException getChildException() {returnchildException;

}public voidsetChildException(ChildThreadException childException) {this.childException =childException;

}

}

这里提一下ChildThreadException这个自定义异常,跟普通异常不一样,我在里面加了个List exceptionList,用来保存子线程的异常。因为有多个子线程,抛出的异常可能有多个。

packagecom.zyj.exception;importjava.io.PrintStream;importjava.util.ArrayList;importjava.util.List;importjava.util.concurrent.locks.Lock;importjava.util.concurrent.locks.ReentrantLock;importcom.zyj.exception.util.ExceptionMessageFormat;importcom.zyj.exception.util.factory.ExceptionMsgFormatFactory;/*** 子线程异常,子线程出现异常时抛出

*@authorzengyuanjun*/

public class ChildThreadException extendsException {/*** serialVersionUID*/

private static final long serialVersionUID = 5682825039992529875L;/*** 子线程的异常列表*/

private ListexceptionList;/*** 异常信息格式化工具*/

privateExceptionMessageFormat formatter;/*** 锁*/

privateLock lock;publicChildThreadException() {super();

initial();

}publicChildThreadException(String message) {super(message);

initial();

}publicChildThreadException(String message, StackTraceElement[] stackTrace) {this(message);

setStackTrace(stackTrace);

}private voidinitial() {

exceptionList= new ArrayList();

lock= newReentrantLock();

formatter=ExceptionMsgFormatFactory.getInstance().getFormatter(ExceptionMsgFormatFactory.STACK_TRACE);

}/*** 子线程是否有异常

*@return

*/

public booleanhasException() {return exceptionList.size() > 0;

}/*** 添加子线程的异常

*@parame*/

public voidaddException(Exception e) {try{

lock.lock();

e.setStackTrace(e.getStackTrace());

exceptionList.add(e);

}finally{

lock.unlock();

}

}/*** 获取子线程的异常列表

*@return

*/

public ListgetExceptionList() {returnexceptionList;

}/*** 清空子线程的异常列表*/

public voidclearExceptionList() {

exceptionList.clear();

}/*** 获取所有子线程异常的堆栈跟踪信息

*@return

*/

publicString getAllStackTraceMessage() {

StringBuffer sb= newStringBuffer();for(Exception e : exceptionList) {

sb.append(e.getClass().getName());

sb.append(": ");

sb.append(e.getMessage());

sb.append("\n");

sb.append(formatter.formate(e));

}returnsb.toString();

}/*** 打印所有子线程的异常的堆栈跟踪信息*/

public voidprintAllStackTrace() {

printAllStackTrace(System.err);

}/*** 打印所有子线程的异常的堆栈跟踪信息

*@params*/

public voidprintAllStackTrace(PrintStream s) {for(Exception e : exceptionList) {

e.printStackTrace(s);

}

}

}

·有没有问题试一下才知道,写了个类来测试:TestCase 为并行任务子线程,resultMap为并行任务共同完成的结果集。假设resultMap由5部分组成,main方法中启动5个子线程分别完成一个部分,等5个子线程处理完后,main方法将结果resultMap打印出来。

packagecom.zyj.thread.test;importjava.util.HashMap;importjava.util.Map;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importcom.zyj.exception.ChildThreadException;importcom.zyj.thread.MultiThreadHandler;importcom.zyj.thread.parallel.MultiParallelThreadHandler;importcom.zyj.thread.parallel.ParallelTaskWithThreadPool;public class TestCase implementsRunnable {privateString name;private Mapresult;public TestCase(String name, Mapresult) {this.name =name;this.result =result;

}

@Overridepublic voidrun() {//模拟线程执行1000ms

try{

Thread.sleep(1000);

}catch(InterruptedException e) {

e.printStackTrace();

}//模拟线程1和线程3抛出异常//if(name.equals("1") || name.equals("3"))//throw new RuntimeException(name + ": throw exception");

result.put(name, "complete part " + name + "!");

}public static voidmain(String[] args) {

System.out.println("main begin \t=================");

Map resultMap = new HashMap(8, 1);

MultiThreadHandler handler= newMultiParallelThreadHandler();//ExecutorService service = Executors.newFixedThreadPool(3);//MultiThreadHandler handler = new ParallelTaskWithThreadPool(service);

TestCase task = null;//启动5个子线程作为要处理的并行任务,共同完成结果集resultMap

for(int i&#61;1; i<&#61;5 ; i&#43;&#43;){

task&#61; new TestCase("" &#43;i, resultMap);

handler.addTask(task);

}try{

handler.run();

}catch(ChildThreadException e) {

System.out.println(e.getAllStackTraceMessage());

}

System.out.println(resultMap);//service.shutdown();

System.out.println("main end \t&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;");

}

}

运行main方法&#xff0c;测试结果如下

main begin &#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;{3&#61;complete part 3!, 2&#61;complete part 2!, 1&#61;complete part 1!, 5&#61;complete part 5!, 4&#61;complete part 4!}

main end&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;

将模拟线程1和线程3抛出异常的注释打开&#xff0c;测试结果如下

8c50cbb23417d38350460dfb170b25f6.png

红色的打印是子线程中捕获异常打印的堆栈跟踪信息&#xff0c;黑色的异常信息是主线程main方法中打印的&#xff0c;这说明主线程能够监视到子线程的出错&#xff0c;以便采取对应的处理。由于线程1和线程3出现了异常&#xff0c;未能完成任务&#xff0c;所以打印的resultMap只有第2、4、5三个部分完成。

为了便于扩展&#xff0c;我把MultiParallelThreadHandler类中的invoke方法和childExceptionHandler方法定义为protected类型。invoke方法中是具体的线程执行&#xff0c;childExceptionHandler方法是子线程抛出异常后的处理&#xff0c;可以去继承&#xff0c;重写为自己想要的&#xff0c;比如我想用线程池去运行子线程&#xff0c;就可以去继承并重写invoke方法&#xff0c;得到下面的这个类

packagecom.zyj.thread.parallel;importjava.util.concurrent.ExecutorService;/*** 使用线程池运行并行任务

*&#64;authorzengyuanjun

**/

public class ParallelTaskWithThreadPool extendsMultiParallelThreadHandler {privateExecutorService service;publicParallelTaskWithThreadPool() {

}publicParallelTaskWithThreadPool(ExecutorService service) {this.service &#61;service;

}publicExecutorService getService() {returnservice;

}public voidsetService(ExecutorService service) {this.service &#61;service;

}/*** 使用线程池运行*/&#64;Overrideprotected voidinvoke(Runnable command) {if(null !&#61;service){

service.execute(command);

}else{super.invoke(command);

}

}

}

测试就在上面的测试类中&#xff0c;只不过被注释掉了&#xff0c;测试结果是一样的&#xff0c;就不多说了。



推荐阅读
  • 深入解析Tomcat:开发者的实用指南
    深入解析Tomcat:开发者的实用指南 ... [详细]
  • 本文将详细介绍在Android应用中添加自定义返回按钮的方法,帮助开发者更好地理解和实现这一功能。通过具体的代码示例和步骤说明,本文旨在为初学者提供清晰的指导,确保他们在开发过程中能够顺利集成返回按钮,提升用户体验。 ... [详细]
  • 本项目在Java Maven框架下,利用POI库实现了Excel数据的高效导入与导出功能。通过优化数据处理流程,提升了数据操作的性能和稳定性。项目已发布至GitHub,当前最新版本为0.0.5。该项目不仅适用于小型应用,也可扩展用于大型企业级系统,提供了灵活的数据管理解决方案。GitHub地址:https://github.com/83945105/holygrail,Maven坐标:`com.github.83945105:holygrail:0.0.5`。 ... [详细]
  • 本题库精选了Java核心知识点的练习题,旨在帮助学习者巩固和检验对Java理论基础的掌握。其中,选择题部分涵盖了访问控制权限等关键概念,例如,Java语言中仅允许子类或同一包内的类访问的访问权限为protected。此外,题库还包括其他重要知识点,如异常处理、多线程、集合框架等,全面覆盖Java编程的核心内容。 ... [详细]
  • 2019年后蚂蚁集团与拼多多面试经验详述与深度剖析
    2019年后蚂蚁集团与拼多多面试经验详述与深度剖析 ... [详细]
  • 本文介绍了如何通过掌握 IScroll 技巧来实现流畅的上拉加载和下拉刷新功能。首先,需要按正确的顺序引入相关文件:1. Zepto;2. iScroll.js;3. scroll-probe.js。此外,还提供了完整的代码示例,可在 GitHub 仓库中查看。通过这些步骤,开发者可以轻松实现高效、流畅的滚动效果,提升用户体验。 ... [详细]
  • 在Spring与Ibatis集成的环境中,通过Spring AOP配置事务管理至服务层。当在一个服务方法中引入自定义多线程时,发现事务管理功能失效。若不使用多线程,事务管理则能正常工作。本文深入分析了这一现象背后的潜在风险,并探讨了可能的解决方案,以确保事务一致性和线程安全。 ... [详细]
  • Spring框架入门指南:专为新手打造的详细学习笔记
    Spring框架是Java Web开发中广泛应用的轻量级应用框架,以其卓越的功能和出色的性能赢得了广大开发者的青睐。本文为初学者提供了详尽的学习指南,涵盖基础概念、核心组件及实际应用案例,帮助新手快速掌握Spring框架的核心技术与实践技巧。 ... [详细]
  • 本文探讨了在Android应用中实现动态滚动文本显示控件的优化方法。通过详细分析焦点管理机制,特别是通过设置返回值为`true`来确保焦点不会被其他控件抢占,从而提升滚动文本的流畅性和用户体验。具体实现中,对`MarqueeText.java`进行了代码层面的优化,增强了控件的稳定性和兼容性。 ... [详细]
  • 全面解析Java虚拟机:内存模型深度剖析 ... [详细]
  • Java 零基础入门:SQL Server 学习笔记(第21篇)
    Java 零基础入门:SQL Server 学习笔记(第21篇) ... [详细]
  • Go语言实现Redis客户端与服务器的交互机制深入解析
    在前文对Godis v1.0版本的基础功能进行了详细介绍后,本文将重点探讨如何实现客户端与服务器之间的交互机制。通过具体代码实现,使客户端与服务器能够顺利通信,赋予项目实际运行的能力。本文将详细解析Go语言在实现这一过程中的关键技术和实现细节,帮助读者深入了解Redis客户端与服务器的交互原理。 ... [详细]
  • 深入解析零拷贝技术(Zerocopy)及其应用优势
    零拷贝技术(Zero-copy)是Netty框架中的一个关键特性,其核心在于减少数据在操作系统内核与用户空间之间的传输次数。通过避免不必要的内存复制操作,零拷贝显著提高了数据传输的效率和性能。本文将深入探讨零拷贝的工作原理及其在实际应用中的优势,包括降低CPU负载、减少内存带宽消耗以及提高系统吞吐量等方面。 ... [详细]
  • 本文深入探讨了IO复用技术的原理与实现,重点分析了其在解决C10K问题中的关键作用。IO复用技术允许单个进程同时管理多个IO对象,如文件、套接字和管道等,通过系统调用如`select`、`poll`和`epoll`,高效地处理大量并发连接。文章详细介绍了这些技术的工作机制,并结合实际案例,展示了它们在高并发场景下的应用效果。 ... [详细]
  • 如何构建和部署C# Windows服务应用程序
    本文介绍了如何从零开始构建和部署C# Windows服务应用程序。通过详细步骤和代码示例,帮助读者掌握创建、配置和部署Windows服务的关键技术点,适合初学者和有经验的开发人员参考。 ... [详细]
author-avatar
WingKeii-
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有