热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Java并发编程札记(六)JUC线程池02ThreadPoolExecutor实现原理

本文通过学习ThreadPoolExecutor源码来学习线程池的实现原理。简介为什么要使用线程池许多服务器都面临着处理大量客户端远程请求的压力,如果每收到一个请

本文通过学习ThreadPoolExecutor源码来学习线程池的实现原理。


简介

为什么要使用线程池
许多服务器都面临着处理大量客户端远程请求的压力,如果每收到一个请求,就创建一个线程来处理,表面看是没有问题的,但实际上存在着很严重的缺陷。服务器应用程序中经常出现的情况是请求处理的任务很简单但客户端的数目却是庞大的,这种情况下如果还是每收到一个请求就创建一个线程来处理它,服务器在创建和销毁线程所花费的时间和资源可能比处理客户端请求处理的任务花费的时间和资源更多。为了缓解服务器压力,需要解决频繁创建和销毁线程的问题。线程池可以实现这个需求。

什么是线程池
线程池可以看做是许多线程的集合。在没有任务时线程处于空闲状态,当请求到来,线程池给这个请求分配一个空闲的线程,任务完成后回到线程池中等待下次任务。这样就实现了线程的重用。线程池会通过相应的调度策略和拒绝策略,对添加到线程池中的线程进行管理。

工作模型
MarkdownPhotos/master/CSDNBlogs/concurrency/0602/workModel.png

工作模型中一共有三种队列:正在执行的任务队列,等待被执行的阻塞队列,等待被commit进阻塞队列中的任务队列。

Java中的线程池
Java中常用的线程池有三个,最出名的当然是ThreadPoolExecutor,除此之外还有ScheduledThreadPoolExecutor、ForkJoinPool。本文主要学习ThreadPoolExecutor的实现原理。


创建ThreadPoolExecutor线程池

强烈推荐使用Executors工厂方法创建线程池,如Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和 Executors.newSingleThreadExecutor()(单个后台线程),它们均为大多数使用场景预定义了设置。本文主要学习如何手动配置,下面是ThreadPoolExecutor的一个构造方法。

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize <0 ||maximumPoolSize <&#61; 0 ||maximumPoolSize 0)throw new IllegalArgumentException();if (workQueue &#61;&#61; null || threadFactory &#61;&#61; null || handler &#61;&#61; null)throw new NullPointerException();this.corePoolSize &#61; corePoolSize;this.maximumPoolSize &#61; maximumPoolSize;this.workQueue &#61; workQueue;this.keepAliveTime &#61; unit.toNanos(keepAliveTime);this.threadFactory &#61; threadFactory;this.handler &#61; handler;
}

ThreadPoolExecutor一共有四个构造方法&#xff0c;其他三个构造方法都是通过上述的构造方法来实现的。毫无疑问手动配置线程池的关键就是学好构造方法中的几个参数如何设置。这几个参数对应着ThreadPoolExecutor中的几个成员属性。


属性

corePoolSize与maximumPoolSize分别是核心池大小与最大池大小。在源码中的声明为
private volatile int corePoolSize;private volatile int maximumPoolSize;

当新任务在方法 execute(java.lang.Runnable) 中提交时&#xff0c;如果运行的线程少于 corePoolSize&#xff0c;则创建新线程来处理请求&#xff0c;即使其他辅助线程是空闲的。如果运行的线程多于 corePoolSize 而少于 maximumPoolSize&#xff0c;则仅当队列满时才创建新线程。如果设置的 corePoolSize 和 maximumPoolSize 相同&#xff0c;则创建了固定大小的线程池。如果将 maximumPoolSize 设置为基本的无界值&#xff08;如 Integer.MAX_VALUE&#xff09;&#xff0c;则允许池适应任意数量的并发任务。在大多数情况下&#xff0c;核心和最大池大小仅基于构造来设置&#xff0c;不过也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。

workQueue是线程池工作模型中的阻塞队列&#xff0c;用于传输和保持提交的任务。在源码中的声明为private final BlockingQueue workQueue;

keepAliveTime是池中线程空闲时的活动时间。如果池中当前有多于 corePoolSize 的线程&#xff0c;则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止&#xff08;参见 getKeepAliveTime(java.util.concurrent.TimeUnit)&#xff09;。这提供了当池处于非活动状态时减少资源消耗的方法。如果池后来变得更为活动&#xff0c;则可以创建新的线程。也可以使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit) 动态地更改此参数。使用 Long.MAX_VALUE TimeUnit.NANOSECONDS 的值在关闭前有效地从以前的终止状态禁用空闲线程。默认情况下&#xff0c;保持活动策略只在有多于 corePoolSizeThreads 的线程时应用。但是只要 keepAliveTime 值非 0&#xff0c; allowCoreThreadTimeOut(boolean) 方法也可将此超时策略应用于核心线程。

threadFactory是一个线程集合。线程池可以使用ThreadFactory创建新线程。如果没有另外说明&#xff0c;则在同一个 ThreadGroup 中一律使用 Executors.defaultThreadFactory() 创建线程&#xff0c;并且这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。通过提供不同的 ThreadFactory&#xff0c;可以改变线程的名称、线程组、优先级、守护进程状态&#xff0c;等等。如果从 newThread 返回 null 时 ThreadFactory 未能创建线程&#xff0c;则执行程序将继续运行&#xff0c;但不能执行任何任务。

handler是线程池拒绝策略&#xff0c;RejectedExecutionHandler类型的对象。当 Executor 已经关闭&#xff0c;并且 Executor 将有限边界用于最大线程和工作队列容量&#xff0c;且已经饱和时&#xff0c;在方法 execute(java.lang.Runnable) 中提交的新任务将被拒绝。在以上两种情况下&#xff0c; execute 方法都将调用其 RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四种预定义的处理程序策略&#xff1a;


  • ThreadPoolExecutor.AbortPolicy &#xff0c;默认策略&#xff0c;处理程序遭到拒绝将抛出运行时 RejectedExecutionException。
  • ThreadPoolExecutor.CallerRunsPolicy&#xff0c;线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制&#xff0c;能够减缓新任务的提交速度。
  • ThreadPoolExecutor.DiscardPolicy&#xff0c;不能执行的任务将被删除。
  • ThreadPoolExecutor.DiscardOldestPolicy&#xff0c;如果执行程序尚未关闭&#xff0c;则位于工作队列头部的任务将被删除&#xff0c;然后重试执行程序&#xff08;如果再次失败&#xff0c;则重复此过程&#xff09;。

除了上面的几个属性外&#xff0c;ThreadPoolExecutor还有下面的几个参数。

//ctl是一个AtomicInteger类型的原子对象。ctl记录了线程池中的任务数量和线程池状态。
private final AtomicInteger ctl &#61; new AtomicInteger(ctlOf(RUNNING, 0));
//线程池的锁
private final ReentrantLock mainLock &#61; new ReentrantLock();
//所有工作的线程
private final HashSet workers &#61; new HashSet();
//支持的等待condition
private final Condition termination &#61; mainLock.newCondition();
//线程池中线程数量曾经达到过的最大值
private int largestPoolSize;
//已完成任务数量
private long completedTaskCount;
//是否允许为核心线程设置存活时间
private volatile boolean allowCoreThreadTimeOut;

排队策略

排队有三种通用策略&#xff1a;
SynchronousQueue
它将任务直接传输给工作队列workers&#xff0c;而不保持任务。如果不存在空闲线程&#xff0c;则会新建一个线程来执行任务。比如&#xff0c;在Executors.newCachedThreadPool()方法中使用的就是此策略。

public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue());
}

LinkedBlockingQueue
有界队列&#xff0c;使用此队列会导致在所有corePoolSize线程都忙时新任务在队列中等待。这样&#xff0c;创建的线程就不会超过corePoolSize。比如&#xff0c;在Executors.newFixedThreadPool()和Executors.newSingleThreadExecutor()方法中使用的就是此策略。

public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue());
}
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue()));
}

ArrayBlockingQueue
有界队列&#xff0c;没见到在哪里用到了这种策略。


线程池状态

源码已经告诉了我们线程池有几个状态。

private final AtomicInteger ctl &#61; new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS &#61; Integer.SIZE - 3;
private static final int CAPACITY &#61; (1 <1;// runState is stored in the high-order bits
private static final int RUNNING &#61; -1 <private static final int SHUTDOWN &#61; 0 <private static final int STOP &#61; 1 <private static final int TIDYING &#61; 2 <private static final int TERMINATED &#61; 3 <

可以看出&#xff0c;一共有RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五种状态。ctl对象一共32位&#xff0c;高3位保存线程池状态信息&#xff0c;后29位保存线程池容量信息。线程池的初始化状态是RUNNING&#xff0c;在源码中体现为private final AtomicInteger ctl &#61; new AtomicInteger(ctlOf(RUNNING, 0));


状态高三位工作队列workers中的任务阻塞队列workQueue中的任务未添加的任务
RUNNING111继续处理继续处理添加
SHUTDOWN000继续处理继续处理不添加
STOP001尝试中断不处理不添加
TIDYING010处理完了如果由SHUTDOWN - TIDYING &#xff0c;那就是处理完了&#xff1b;如果由STOP - TIDYING &#xff0c;那就是不处理不添加
TERMINATED011同TIDYING同TIDYING同TIDYING

各个状态的转换图如下所示
MarkdownPhotos/master/CSDNBlogs/concurrency/0602/status.png


执行任务

execute(Runnable)

/** 在将来某个时间执行给定任务。* 可以在新线程中或者在现有池线程中执行该任务。* 如果无法将任务提交执行&#xff0c;或者因为此执行程序已关闭&#xff0c;或者因为已达到其容量&#xff0c;* 则该任务由当前 RejectedExecutionHandler 处理。*/
public void execute(Runnable command) {if (command &#61;&#61; null)throw new NullPointerException();int c &#61; ctl.get();//分三种情况处理//case1&#xff1a;如果线程池中运行的线程数量if (workerCountOf(c) //创建新线程来处理请求&#xff0c;即使其他辅助线程是空闲的if (addWorker(command, true))return;c &#61; ctl.get();}//case2&#xff1a;如果线程池中运行的线程数量>&#61;corePoolSize&#xff0c;且线程池处于RUNNING状态&#xff0c;且把提交的任务成功放入阻塞队列中if (isRunning(c) && workQueue.offer(command)) {int recheck &#61; ctl.get();//再次检查线程池的状态&#xff0c;如果线程池不是RUNNING状态&#xff0c;且成功从阻塞队列中删除任务if (! isRunning(recheck) && remove(command))//该任务由当前 RejectedExecutionHandler 处理reject(command);//如果线程池中运行的线程数量为0else if (workerCountOf(recheck) &#61;&#61; 0)//则通过addWorker(null, false)尝试新建一个线程&#xff0c;新建线程对应的任务为null。addWorker(null, false);}//case3&#xff1a;如果以上两种case不成立&#xff0c;即没能将任务成功放入阻塞队列中&#xff0c;且addWoker新建线程失败else if (!addWorker(command, false))//该任务由当前 RejectedExecutionHandler 处理reject(command);
}

看完后&#xff0c;我们知道execute()分三种情况处理任务

case1&#xff1a;如果线程池中运行的线程数量 case2&#xff1a;如果线程池中运行的线程数量>&#61;corePoolSize&#xff0c;且线程池处于RUNNING状态&#xff0c;且把提交的任务成功放入阻塞队列中&#xff0c;就再次检查线程池的状态&#xff0c;1.如果线程池不是RUNNING状态&#xff0c;且成功从阻塞队列中删除任务&#xff0c;则该任务由当前 RejectedExecutionHandler 处理。2.否则如果线程池中运行的线程数量为0&#xff0c;则通过addWorker(null, false)尝试新建一个线程&#xff0c;新建线程对应的任务为null。
case3&#xff1a;如果以上两种case不成立&#xff0c;即没能将任务成功放入阻塞队列中&#xff0c;且addWoker新建线程失败&#xff0c;则该任务由当前 RejectedExecutionHandler 处理。

submit

/*** 提交一个 Runnable 任务用于执行&#xff0c;并返回一个表示该任务的 Future。* 该 Future 的 get 方法在 成功 完成时将会返回 null。*/
public Future submit(Runnable task) {if (task &#61;&#61; null) throw new NullPointerException();RunnableFuture ftask &#61; newTaskFor(task, null);execute(ftask);return ftask;
}

可以看到此方法是通过调用execute(Runnable)实现的。


关闭线程池

ThreadPoolExecutor提供了shutdown()和shutdownNow()两个方法来关闭线程池。shutdown() 按过去执行已提交任务的顺序发起一个有序的关闭&#xff0c;但是不接受新任务。shutdownNow()尝试停止所有的活动执行任务、暂停等待任务的处理&#xff0c;并返回等待执行的任务列表。

shutdown()

/** 按过去执行已提交任务的顺序发起一个有序的关闭&#xff0c;不接受新任务。* 如果已经关闭&#xff0c;则调用没有其他作用。*/
public void shutdown() {final ReentrantLock mainLock &#61; this.mainLock;//step1.获取独占锁mainLock.lock();try {//step2.如果有安全管理器&#xff0c;使用安全管理器检查当前线程是否有权限关闭线程池checkShutdownAccess();//step3.将线程池状态设为SHUTDOWNadvanceRunState(SHUTDOWN);//step4.中断所有空闲线程interruptIdleWorkers();//step5.钩子函数&#xff0c;没有执行任何操作onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {//step6.释放锁mainLock.unlock();}//step7.将线程池状态设置为TERMINATEDtryTerminate();
}

将shutdown()方法总结如下
step1.获取独占锁
step2.如果有安全管理器&#xff0c;使用安全管理器检查当前线程是否有权限关闭线程池&#xff0c;如果没有权限则抛出SecurityException。
step3.将线程池状态设为SHUTDOWN。一旦将线程池状态设为SHUTDOWN&#xff0c;就不能像线程池中添加新任务了。
step4.中断所有空闲线程
step5.钩子函数&#xff0c;没有执行任何操作
step6.释放锁
step7.将线程池状态设置为TERMINATED

shutdownNow()

/** 尝试停止所有的活动执行任务、暂停等待任务的处理&#xff0c;并返回等待执行的任务列表。* /
public List shutdownNow() {List tasks;final ReentrantLock mainLock &#61; this.mainLock;//step1.获取独占锁mainLock.lock();try {//step2.如果有安全管理器&#xff0c;使用安全管理器检查当前线程是否有权限关闭线程池checkShutdownAccess();//step3.尝试停止所有的活动执行任务advanceRunState(STOP);//step4.暂停等待任务的处理interruptWorkers();//step5.获取等待执行的任务列表tasks &#61; drainQueue();} finally {//step6.释放锁mainLock.unlock();}//step7.将线程池状态设置为TERMINATEDtryTerminate();//step8.返回等待执行的任务列表return tasks;
}

将shutdownNow()方法总结如下
step1.获取独占锁
step2.如果有安全管理器&#xff0c;使用安全管理器检查当前线程是否有权限关闭线程池&#xff0c;如果没有权限则抛出SecurityException。
step3.尝试停止所有的活动执行任务
step4.暂停等待任务的处理
step5.获取等待执行的任务列表
step6.释放锁
step7.将线程池状态设置为TERMINATED
step8.返回等待执行的任务列表

shutdown()和shutdownNow()的区别


  • 调用shutdown()后&#xff0c;线程池状态立刻变为SHUTDOWN&#xff0c;而调用shutdownNow()&#xff0c;线程池状态立刻变为STOP。
  • shutdown()通过中断空闲线程、不接受新任务的方式按过去执行已提交任务的顺序发起一个有序的关闭&#xff0c;shutdownNow()无差别地停止所有的活动执行任务&#xff0c;暂停等待任务的处理。也就是说&#xff0c;shutdown()等待任务执行完才中断线程&#xff0c;而shutdownNow()不等任务执行完就中断了线程。

本文就先讲到这里&#xff0c;想了解Java并发编程更多内容请参考&#xff1a;


  • Java并发编程札记-目录

推荐阅读
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • 本文介绍了Windows操作系统的版本及其特点,包括Windows 7系统的6个版本:Starter、Home Basic、Home Premium、Professional、Enterprise、Ultimate。Windows操作系统是微软公司研发的一套操作系统,具有人机操作性优异、支持的应用软件较多、对硬件支持良好等优点。Windows 7 Starter是功能最少的版本,缺乏Aero特效功能,没有64位支持,最初设计不能同时运行三个以上应用程序。 ... [详细]
  • Spring学习(4):Spring管理对象之间的关联关系
    本文是关于Spring学习的第四篇文章,讲述了Spring框架中管理对象之间的关联关系。文章介绍了MessageService类和MessagePrinter类的实现,并解释了它们之间的关联关系。通过学习本文,读者可以了解Spring框架中对象之间的关联关系的概念和实现方式。 ... [详细]
  • Android系统源码分析Zygote和SystemServer启动过程详解
    本文详细解析了Android系统源码中Zygote和SystemServer的启动过程。首先介绍了系统framework层启动的内容,帮助理解四大组件的启动和管理过程。接着介绍了AMS、PMS等系统服务的作用和调用方式。然后详细分析了Zygote的启动过程,解释了Zygote在Android启动过程中的决定作用。最后通过时序图展示了整个过程。 ... [详细]
  • HashMap的相关问题及其底层数据结构和操作流程
    本文介绍了关于HashMap的相关问题,包括其底层数据结构、JDK1.7和JDK1.8的差异、红黑树的使用、扩容和树化的条件、退化为链表的情况、索引的计算方法、hashcode和hash()方法的作用、数组容量的选择、Put方法的流程以及并发问题下的操作。文章还提到了扩容死链和数据错乱的问题,并探讨了key的设计要求。对于对Java面试中的HashMap问题感兴趣的读者,本文将为您提供一些有用的技术和经验。 ... [详细]
  • 在Kubernetes上部署JupyterHub的步骤和实验依赖
    本文介绍了在Kubernetes上部署JupyterHub的步骤和实验所需的依赖,包括安装Docker和K8s,使用kubeadm进行安装,以及更新下载的镜像等。 ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 本文介绍了C#中生成随机数的三种方法,并分析了其中存在的问题。首先介绍了使用Random类生成随机数的默认方法,但在高并发情况下可能会出现重复的情况。接着通过循环生成了一系列随机数,进一步突显了这个问题。文章指出,随机数生成在任何编程语言中都是必备的功能,但Random类生成的随机数并不可靠。最后,提出了需要寻找其他可靠的随机数生成方法的建议。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 本文讨论了如何优化解决hdu 1003 java题目的动态规划方法,通过分析加法规则和最大和的性质,提出了一种优化的思路。具体方法是,当从1加到n为负时,即sum(1,n)sum(n,s),可以继续加法计算。同时,还考虑了两种特殊情况:都是负数的情况和有0的情况。最后,通过使用Scanner类来获取输入数据。 ... [详细]
  • JavaSE笔试题-接口、抽象类、多态等问题解答
    本文解答了JavaSE笔试题中关于接口、抽象类、多态等问题。包括Math类的取整数方法、接口是否可继承、抽象类是否可实现接口、抽象类是否可继承具体类、抽象类中是否可以有静态main方法等问题。同时介绍了面向对象的特征,以及Java中实现多态的机制。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 本文详细介绍了Java中vector的使用方法和相关知识,包括vector类的功能、构造方法和使用注意事项。通过使用vector类,可以方便地实现动态数组的功能,并且可以随意插入不同类型的对象,进行查找、插入和删除操作。这篇文章对于需要频繁进行查找、插入和删除操作的情况下,使用vector类是一个很好的选择。 ... [详细]
  • Java学习笔记之面向对象编程(OOP)
    本文介绍了Java学习笔记中的面向对象编程(OOP)内容,包括OOP的三大特性(封装、继承、多态)和五大原则(单一职责原则、开放封闭原则、里式替换原则、依赖倒置原则)。通过学习OOP,可以提高代码复用性、拓展性和安全性。 ... [详细]
author-avatar
刘海龙_高安大城
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有