在多线程编程过程中,遇到这样的情况,主线程需要等待多个子线程的处理结果,才能继续运行下去。个人给这样的子线程任务取了个名字叫并行任务。对于这种任务,每次去编写代码加锁控制时序,觉得太麻烦,正好朋友提到CountDownLatch这个类,于是用它来编写了个小工具。
首先,要处理的是多个任务,于是定义了一个接口
packagecom.zyj.thread;importcom.zyj.exception.ChildThreadException;/*** 多任务处理
*@authorzengyuanjun*/
public interfaceMultiThreadHandler {/*** 添加任务
*@paramtasks*/
voidaddTask(Runnable... tasks);/*** 执行任务
*@throwsChildThreadException*/
void run() throwsChildThreadException;
}
要处理的是并行任务,需要用到CountDownLatch来统计所有子线程执行结束,还要一个集合记录所有任务,另外加上我自定义的ChildThreadException类来记录子线程中的异常,通知主线程是否所有子线程都执行成功,便得到了下面这个抽象类AbstractMultiParallelThreadHandler。在这个类中,我顺便完成了addTask这个方法。
packagecom.zyj.thread.parallel;importjava.util.ArrayList;importjava.util.List;importjava.util.concurrent.CountDownLatch;importcom.zyj.exception.ChildThreadException;importcom.zyj.thread.MultiThreadHandler;/*** 并行线程处理
*@authorzengyuanjun*/
public abstract class AbstractMultiParallelThreadHandler implementsMultiThreadHandler {/*** 子线程倒计数锁*/
protectedCountDownLatch childLatch;/*** 任务列表*/
protected ListtaskList;/*** 子线程异常*/
protectedChildThreadException childThreadException;publicAbstractMultiParallelThreadHandler() {
taskList= new ArrayList();
childThreadException= newChildThreadException();
}public voidsetCountDownLatch(CountDownLatch latch) {this.childLatch =latch;
}/*** {@inheritDoc}*/@Overridepublic voidaddTask(Runnable... tasks) {if (null ==tasks) {
taskList= new ArrayList();
}for(Runnable task : tasks) {
taskList.add(task);
}
}/*** {@inheritDoc}*/@Overridepublic abstract void run() throwsChildThreadException;
}
具体的实现,则是下面这个类。实现原理也很简单,主线程根据并行任务数创建一个CountDownLatch,传到子线程中,并运行所有子线程,然后await等待。子线程执行结束后调用CountDownLatch的countDown()方法,当所有子线程执行结束后,CountDownLatch计数清零,主线程被唤醒继续执行。
packagecom.zyj.thread.parallel;importjava.util.concurrent.CountDownLatch;importcom.zyj.exception.ChildThreadException;/*** 并行任务处理工具
*
*@authorzengyuanjun
**/
public class MultiParallelThreadHandler extendsAbstractMultiParallelThreadHandler {/*** 无参构造器*/
publicMultiParallelThreadHandler() {super();
}/*** 根据任务数量运行任务*/@Overridepublic void run() throwsChildThreadException {if (null == taskList || taskList.size() == 0) {return;
}else if (taskList.size() == 1) {
runWithoutNewThread();
}else if (taskList.size() > 1) {
runInNewThread();
}
}/*** 新建线程运行任务
*
*@throwsChildThreadException*/
private void runInNewThread() throwsChildThreadException {
childLatch= newCountDownLatch(taskList.size());
childThreadException.clearExceptionList();for(Runnable task : taskList) {
invoke(new MultiParallelRunnable(newMultiParallelContext(task, childLatch, childThreadException)));
}
taskList.clear();try{
childLatch.await();
}catch(InterruptedException e) {
childThreadException.addException(e);
}
throwChildExceptionIfRequired();
}/*** 默认线程执行方法
*
*@paramcommand*/
protected voidinvoke(Runnable command) {if(command.getClass().isAssignableFrom(Thread.class)){
Thread.class.cast(command).start();
}else{newThread(command).start();
}
}/*** 在当前线程中直接运行
*
*@throwsChildThreadException*/
private void runWithoutNewThread() throwsChildThreadException {try{
taskList.get(0).run();
}catch(Exception e) {
childThreadException.addException(e);
}
throwChildExceptionIfRequired();
}/*** 根据需要抛出子线程异常
*
*@throwsChildThreadException*/
private void throwChildExceptionIfRequired() throwsChildThreadException {if(childThreadException.hasException()) {
childExceptionHandler(childThreadException);
}
}/*** 默认抛出子线程异常
*@parame
*@throwsChildThreadException*/
protected void childExceptionHandler(ChildThreadException e) throwsChildThreadException {throwe;
}
}
并行任务是要运行的子线程,只要实现Runnable接口就行,并没有CountDownLatch对象,所以我用MultiParallelRunnable类对它封装一次,MultiParallelRunnable类里有个属性叫 MultiParallelContext,MultiParallelContext里面就是保存的子线程task、倒计数锁CountDownLatch和ChildThreadException这些参数。MultiParallelRunnable类完成运行子线程、记录子线程异常和倒计数锁减一。
packagecom.zyj.thread.parallel;/*** 并行线程对象
*
*@authorzengyuanjun
**/
public class MultiParallelRunnable implementsRunnable {/*** 并行任务参数*/
privateMultiParallelContext context;/*** 构造函数
*@paramcontext*/
publicMultiParallelRunnable(MultiParallelContext context) {this.context =context;
}/*** 运行任务*/@Overridepublic voidrun() {try{
context.getTask().run();
}catch(Exception e) {
e.printStackTrace();
context.getChildException().addException(e);
}finally{
context.getChildLatch().countDown();
}
}
}
packagecom.zyj.thread.parallel;importjava.util.concurrent.CountDownLatch;importcom.zyj.exception.ChildThreadException;/*** 并行任务参数
*@authorzengyuanjun
**/
public classMultiParallelContext {/*** 运行的任务*/
privateRunnable task;/*** 子线程倒计数锁*/
privateCountDownLatch childLatch;/*** 子线程异常*/
privateChildThreadException childException;publicMultiParallelContext() {
}publicMultiParallelContext(Runnable task, CountDownLatch childLatch, ChildThreadException childException) {this.task =task;this.childLatch =childLatch;this.childException =childException;
}publicRunnable getTask() {returntask;
}public voidsetTask(Runnable task) {this.task =task;
}publicCountDownLatch getChildLatch() {returnchildLatch;
}public voidsetChildLatch(CountDownLatch childLatch) {this.childLatch =childLatch;
}publicChildThreadException getChildException() {returnchildException;
}public voidsetChildException(ChildThreadException childException) {this.childException =childException;
}
}
这里提一下ChildThreadException这个自定义异常,跟普通异常不一样,我在里面加了个List exceptionList,用来保存子线程的异常。因为有多个子线程,抛出的异常可能有多个。
packagecom.zyj.exception;importjava.io.PrintStream;importjava.util.ArrayList;importjava.util.List;importjava.util.concurrent.locks.Lock;importjava.util.concurrent.locks.ReentrantLock;importcom.zyj.exception.util.ExceptionMessageFormat;importcom.zyj.exception.util.factory.ExceptionMsgFormatFactory;/*** 子线程异常,子线程出现异常时抛出
*@authorzengyuanjun*/
public class ChildThreadException extendsException {/*** serialVersionUID*/
private static final long serialVersionUID = 5682825039992529875L;/*** 子线程的异常列表*/
private ListexceptionList;/*** 异常信息格式化工具*/
privateExceptionMessageFormat formatter;/*** 锁*/
privateLock lock;publicChildThreadException() {super();
initial();
}publicChildThreadException(String message) {super(message);
initial();
}publicChildThreadException(String message, StackTraceElement[] stackTrace) {this(message);
setStackTrace(stackTrace);
}private voidinitial() {
exceptionList= new ArrayList();
lock= newReentrantLock();
formatter=ExceptionMsgFormatFactory.getInstance().getFormatter(ExceptionMsgFormatFactory.STACK_TRACE);
}/*** 子线程是否有异常
*@return
*/
public booleanhasException() {return exceptionList.size() > 0;
}/*** 添加子线程的异常
*@parame*/
public voidaddException(Exception e) {try{
lock.lock();
e.setStackTrace(e.getStackTrace());
exceptionList.add(e);
}finally{
lock.unlock();
}
}/*** 获取子线程的异常列表
*@return
*/
public ListgetExceptionList() {returnexceptionList;
}/*** 清空子线程的异常列表*/
public voidclearExceptionList() {
exceptionList.clear();
}/*** 获取所有子线程异常的堆栈跟踪信息
*@return
*/
publicString getAllStackTraceMessage() {
StringBuffer sb= newStringBuffer();for(Exception e : exceptionList) {
sb.append(e.getClass().getName());
sb.append(": ");
sb.append(e.getMessage());
sb.append("\n");
sb.append(formatter.formate(e));
}returnsb.toString();
}/*** 打印所有子线程的异常的堆栈跟踪信息*/
public voidprintAllStackTrace() {
printAllStackTrace(System.err);
}/*** 打印所有子线程的异常的堆栈跟踪信息
*@params*/
public voidprintAllStackTrace(PrintStream s) {for(Exception e : exceptionList) {
e.printStackTrace(s);
}
}
}
·有没有问题试一下才知道,写了个类来测试:TestCase 为并行任务子线程,resultMap为并行任务共同完成的结果集。假设resultMap由5部分组成,main方法中启动5个子线程分别完成一个部分,等5个子线程处理完后,main方法将结果resultMap打印出来。
packagecom.zyj.thread.test;importjava.util.HashMap;importjava.util.Map;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importcom.zyj.exception.ChildThreadException;importcom.zyj.thread.MultiThreadHandler;importcom.zyj.thread.parallel.MultiParallelThreadHandler;importcom.zyj.thread.parallel.ParallelTaskWithThreadPool;public class TestCase implementsRunnable {privateString name;private Mapresult;public TestCase(String name, Mapresult) {this.name =name;this.result =result;
}
@Overridepublic voidrun() {//模拟线程执行1000ms
try{
Thread.sleep(1000);
}catch(InterruptedException e) {
e.printStackTrace();
}//模拟线程1和线程3抛出异常//if(name.equals("1") || name.equals("3"))//throw new RuntimeException(name + ": throw exception");
result.put(name, "complete part " + name + "!");
}public static voidmain(String[] args) {
System.out.println("main begin \t=================");
Map resultMap = new HashMap(8, 1);
MultiThreadHandler handler= newMultiParallelThreadHandler();//ExecutorService service = Executors.newFixedThreadPool(3);//MultiThreadHandler handler = new ParallelTaskWithThreadPool(service);
TestCase task = null;//启动5个子线程作为要处理的并行任务,共同完成结果集resultMap
for(int i&#61;1; i<&#61;5 ; i&#43;&#43;){
task&#61; new TestCase("" &#43;i, resultMap);
handler.addTask(task);
}try{
handler.run();
}catch(ChildThreadException e) {
System.out.println(e.getAllStackTraceMessage());
}
System.out.println(resultMap);//service.shutdown();
System.out.println("main end \t&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;");
}
}
运行main方法&#xff0c;测试结果如下
main begin &#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;{3&#61;complete part 3!, 2&#61;complete part 2!, 1&#61;complete part 1!, 5&#61;complete part 5!, 4&#61;complete part 4!}
main end&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;
将模拟线程1和线程3抛出异常的注释打开&#xff0c;测试结果如下
红色的打印是子线程中捕获异常打印的堆栈跟踪信息&#xff0c;黑色的异常信息是主线程main方法中打印的&#xff0c;这说明主线程能够监视到子线程的出错&#xff0c;以便采取对应的处理。由于线程1和线程3出现了异常&#xff0c;未能完成任务&#xff0c;所以打印的resultMap只有第2、4、5三个部分完成。
为了便于扩展&#xff0c;我把MultiParallelThreadHandler类中的invoke方法和childExceptionHandler方法定义为protected类型。invoke方法中是具体的线程执行&#xff0c;childExceptionHandler方法是子线程抛出异常后的处理&#xff0c;可以去继承&#xff0c;重写为自己想要的&#xff0c;比如我想用线程池去运行子线程&#xff0c;就可以去继承并重写invoke方法&#xff0c;得到下面的这个类
packagecom.zyj.thread.parallel;importjava.util.concurrent.ExecutorService;/*** 使用线程池运行并行任务
*&#64;authorzengyuanjun
**/
public class ParallelTaskWithThreadPool extendsMultiParallelThreadHandler {privateExecutorService service;publicParallelTaskWithThreadPool() {
}publicParallelTaskWithThreadPool(ExecutorService service) {this.service &#61;service;
}publicExecutorService getService() {returnservice;
}public voidsetService(ExecutorService service) {this.service &#61;service;
}/*** 使用线程池运行*/&#64;Overrideprotected voidinvoke(Runnable command) {if(null !&#61;service){
service.execute(command);
}else{super.invoke(command);
}
}
}
测试就在上面的测试类中&#xff0c;只不过被注释掉了&#xff0c;测试结果是一样的&#xff0c;就不多说了。