热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Java并发编程札记(六)JUC线程池02ThreadPoolExecutor实现原理

本文通过学习ThreadPoolExecutor源码来学习线程池的实现原理。简介为什么要使用线程池许多服务器都面临着处理大量客户端远程请求的压力,如果每收到一个请

本文通过学习ThreadPoolExecutor源码来学习线程池的实现原理。


简介

为什么要使用线程池
许多服务器都面临着处理大量客户端远程请求的压力,如果每收到一个请求,就创建一个线程来处理,表面看是没有问题的,但实际上存在着很严重的缺陷。服务器应用程序中经常出现的情况是请求处理的任务很简单但客户端的数目却是庞大的,这种情况下如果还是每收到一个请求就创建一个线程来处理它,服务器在创建和销毁线程所花费的时间和资源可能比处理客户端请求处理的任务花费的时间和资源更多。为了缓解服务器压力,需要解决频繁创建和销毁线程的问题。线程池可以实现这个需求。

什么是线程池
线程池可以看做是许多线程的集合。在没有任务时线程处于空闲状态,当请求到来,线程池给这个请求分配一个空闲的线程,任务完成后回到线程池中等待下次任务。这样就实现了线程的重用。线程池会通过相应的调度策略和拒绝策略,对添加到线程池中的线程进行管理。

工作模型
MarkdownPhotos/master/CSDNBlogs/concurrency/0602/workModel.png

工作模型中一共有三种队列:正在执行的任务队列,等待被执行的阻塞队列,等待被commit进阻塞队列中的任务队列。

Java中的线程池
Java中常用的线程池有三个,最出名的当然是ThreadPoolExecutor,除此之外还有ScheduledThreadPoolExecutor、ForkJoinPool。本文主要学习ThreadPoolExecutor的实现原理。


创建ThreadPoolExecutor线程池

强烈推荐使用Executors工厂方法创建线程池,如Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和 Executors.newSingleThreadExecutor()(单个后台线程),它们均为大多数使用场景预定义了设置。本文主要学习如何手动配置,下面是ThreadPoolExecutor的一个构造方法。

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize <0 ||maximumPoolSize <&#61; 0 ||maximumPoolSize 0)throw new IllegalArgumentException();if (workQueue &#61;&#61; null || threadFactory &#61;&#61; null || handler &#61;&#61; null)throw new NullPointerException();this.corePoolSize &#61; corePoolSize;this.maximumPoolSize &#61; maximumPoolSize;this.workQueue &#61; workQueue;this.keepAliveTime &#61; unit.toNanos(keepAliveTime);this.threadFactory &#61; threadFactory;this.handler &#61; handler;
}

ThreadPoolExecutor一共有四个构造方法&#xff0c;其他三个构造方法都是通过上述的构造方法来实现的。毫无疑问手动配置线程池的关键就是学好构造方法中的几个参数如何设置。这几个参数对应着ThreadPoolExecutor中的几个成员属性。


属性

corePoolSize与maximumPoolSize分别是核心池大小与最大池大小。在源码中的声明为
private volatile int corePoolSize;private volatile int maximumPoolSize;

当新任务在方法 execute(java.lang.Runnable) 中提交时&#xff0c;如果运行的线程少于 corePoolSize&#xff0c;则创建新线程来处理请求&#xff0c;即使其他辅助线程是空闲的。如果运行的线程多于 corePoolSize 而少于 maximumPoolSize&#xff0c;则仅当队列满时才创建新线程。如果设置的 corePoolSize 和 maximumPoolSize 相同&#xff0c;则创建了固定大小的线程池。如果将 maximumPoolSize 设置为基本的无界值&#xff08;如 Integer.MAX_VALUE&#xff09;&#xff0c;则允许池适应任意数量的并发任务。在大多数情况下&#xff0c;核心和最大池大小仅基于构造来设置&#xff0c;不过也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。

workQueue是线程池工作模型中的阻塞队列&#xff0c;用于传输和保持提交的任务。在源码中的声明为private final BlockingQueue workQueue;

keepAliveTime是池中线程空闲时的活动时间。如果池中当前有多于 corePoolSize 的线程&#xff0c;则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止&#xff08;参见 getKeepAliveTime(java.util.concurrent.TimeUnit)&#xff09;。这提供了当池处于非活动状态时减少资源消耗的方法。如果池后来变得更为活动&#xff0c;则可以创建新的线程。也可以使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit) 动态地更改此参数。使用 Long.MAX_VALUE TimeUnit.NANOSECONDS 的值在关闭前有效地从以前的终止状态禁用空闲线程。默认情况下&#xff0c;保持活动策略只在有多于 corePoolSizeThreads 的线程时应用。但是只要 keepAliveTime 值非 0&#xff0c; allowCoreThreadTimeOut(boolean) 方法也可将此超时策略应用于核心线程。

threadFactory是一个线程集合。线程池可以使用ThreadFactory创建新线程。如果没有另外说明&#xff0c;则在同一个 ThreadGroup 中一律使用 Executors.defaultThreadFactory() 创建线程&#xff0c;并且这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。通过提供不同的 ThreadFactory&#xff0c;可以改变线程的名称、线程组、优先级、守护进程状态&#xff0c;等等。如果从 newThread 返回 null 时 ThreadFactory 未能创建线程&#xff0c;则执行程序将继续运行&#xff0c;但不能执行任何任务。

handler是线程池拒绝策略&#xff0c;RejectedExecutionHandler类型的对象。当 Executor 已经关闭&#xff0c;并且 Executor 将有限边界用于最大线程和工作队列容量&#xff0c;且已经饱和时&#xff0c;在方法 execute(java.lang.Runnable) 中提交的新任务将被拒绝。在以上两种情况下&#xff0c; execute 方法都将调用其 RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四种预定义的处理程序策略&#xff1a;


  • ThreadPoolExecutor.AbortPolicy &#xff0c;默认策略&#xff0c;处理程序遭到拒绝将抛出运行时 RejectedExecutionException。
  • ThreadPoolExecutor.CallerRunsPolicy&#xff0c;线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制&#xff0c;能够减缓新任务的提交速度。
  • ThreadPoolExecutor.DiscardPolicy&#xff0c;不能执行的任务将被删除。
  • ThreadPoolExecutor.DiscardOldestPolicy&#xff0c;如果执行程序尚未关闭&#xff0c;则位于工作队列头部的任务将被删除&#xff0c;然后重试执行程序&#xff08;如果再次失败&#xff0c;则重复此过程&#xff09;。

除了上面的几个属性外&#xff0c;ThreadPoolExecutor还有下面的几个参数。

//ctl是一个AtomicInteger类型的原子对象。ctl记录了线程池中的任务数量和线程池状态。
private final AtomicInteger ctl &#61; new AtomicInteger(ctlOf(RUNNING, 0));
//线程池的锁
private final ReentrantLock mainLock &#61; new ReentrantLock();
//所有工作的线程
private final HashSet workers &#61; new HashSet();
//支持的等待condition
private final Condition termination &#61; mainLock.newCondition();
//线程池中线程数量曾经达到过的最大值
private int largestPoolSize;
//已完成任务数量
private long completedTaskCount;
//是否允许为核心线程设置存活时间
private volatile boolean allowCoreThreadTimeOut;

排队策略

排队有三种通用策略&#xff1a;
SynchronousQueue
它将任务直接传输给工作队列workers&#xff0c;而不保持任务。如果不存在空闲线程&#xff0c;则会新建一个线程来执行任务。比如&#xff0c;在Executors.newCachedThreadPool()方法中使用的就是此策略。

public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue());
}

LinkedBlockingQueue
有界队列&#xff0c;使用此队列会导致在所有corePoolSize线程都忙时新任务在队列中等待。这样&#xff0c;创建的线程就不会超过corePoolSize。比如&#xff0c;在Executors.newFixedThreadPool()和Executors.newSingleThreadExecutor()方法中使用的就是此策略。

public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue());
}
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue()));
}

ArrayBlockingQueue
有界队列&#xff0c;没见到在哪里用到了这种策略。


线程池状态

源码已经告诉了我们线程池有几个状态。

private final AtomicInteger ctl &#61; new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS &#61; Integer.SIZE - 3;
private static final int CAPACITY &#61; (1 <1;// runState is stored in the high-order bits
private static final int RUNNING &#61; -1 <private static final int SHUTDOWN &#61; 0 <private static final int STOP &#61; 1 <private static final int TIDYING &#61; 2 <private static final int TERMINATED &#61; 3 <

可以看出&#xff0c;一共有RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五种状态。ctl对象一共32位&#xff0c;高3位保存线程池状态信息&#xff0c;后29位保存线程池容量信息。线程池的初始化状态是RUNNING&#xff0c;在源码中体现为private final AtomicInteger ctl &#61; new AtomicInteger(ctlOf(RUNNING, 0));


状态高三位工作队列workers中的任务阻塞队列workQueue中的任务未添加的任务
RUNNING111继续处理继续处理添加
SHUTDOWN000继续处理继续处理不添加
STOP001尝试中断不处理不添加
TIDYING010处理完了如果由SHUTDOWN - TIDYING &#xff0c;那就是处理完了&#xff1b;如果由STOP - TIDYING &#xff0c;那就是不处理不添加
TERMINATED011同TIDYING同TIDYING同TIDYING

各个状态的转换图如下所示
MarkdownPhotos/master/CSDNBlogs/concurrency/0602/status.png


执行任务

execute(Runnable)

/** 在将来某个时间执行给定任务。* 可以在新线程中或者在现有池线程中执行该任务。* 如果无法将任务提交执行&#xff0c;或者因为此执行程序已关闭&#xff0c;或者因为已达到其容量&#xff0c;* 则该任务由当前 RejectedExecutionHandler 处理。*/
public void execute(Runnable command) {if (command &#61;&#61; null)throw new NullPointerException();int c &#61; ctl.get();//分三种情况处理//case1&#xff1a;如果线程池中运行的线程数量if (workerCountOf(c) //创建新线程来处理请求&#xff0c;即使其他辅助线程是空闲的if (addWorker(command, true))return;c &#61; ctl.get();}//case2&#xff1a;如果线程池中运行的线程数量>&#61;corePoolSize&#xff0c;且线程池处于RUNNING状态&#xff0c;且把提交的任务成功放入阻塞队列中if (isRunning(c) && workQueue.offer(command)) {int recheck &#61; ctl.get();//再次检查线程池的状态&#xff0c;如果线程池不是RUNNING状态&#xff0c;且成功从阻塞队列中删除任务if (! isRunning(recheck) && remove(command))//该任务由当前 RejectedExecutionHandler 处理reject(command);//如果线程池中运行的线程数量为0else if (workerCountOf(recheck) &#61;&#61; 0)//则通过addWorker(null, false)尝试新建一个线程&#xff0c;新建线程对应的任务为null。addWorker(null, false);}//case3&#xff1a;如果以上两种case不成立&#xff0c;即没能将任务成功放入阻塞队列中&#xff0c;且addWoker新建线程失败else if (!addWorker(command, false))//该任务由当前 RejectedExecutionHandler 处理reject(command);
}

看完后&#xff0c;我们知道execute()分三种情况处理任务

case1&#xff1a;如果线程池中运行的线程数量 case2&#xff1a;如果线程池中运行的线程数量>&#61;corePoolSize&#xff0c;且线程池处于RUNNING状态&#xff0c;且把提交的任务成功放入阻塞队列中&#xff0c;就再次检查线程池的状态&#xff0c;1.如果线程池不是RUNNING状态&#xff0c;且成功从阻塞队列中删除任务&#xff0c;则该任务由当前 RejectedExecutionHandler 处理。2.否则如果线程池中运行的线程数量为0&#xff0c;则通过addWorker(null, false)尝试新建一个线程&#xff0c;新建线程对应的任务为null。
case3&#xff1a;如果以上两种case不成立&#xff0c;即没能将任务成功放入阻塞队列中&#xff0c;且addWoker新建线程失败&#xff0c;则该任务由当前 RejectedExecutionHandler 处理。

submit

/*** 提交一个 Runnable 任务用于执行&#xff0c;并返回一个表示该任务的 Future。* 该 Future 的 get 方法在 成功 完成时将会返回 null。*/
public Future submit(Runnable task) {if (task &#61;&#61; null) throw new NullPointerException();RunnableFuture ftask &#61; newTaskFor(task, null);execute(ftask);return ftask;
}

可以看到此方法是通过调用execute(Runnable)实现的。


关闭线程池

ThreadPoolExecutor提供了shutdown()和shutdownNow()两个方法来关闭线程池。shutdown() 按过去执行已提交任务的顺序发起一个有序的关闭&#xff0c;但是不接受新任务。shutdownNow()尝试停止所有的活动执行任务、暂停等待任务的处理&#xff0c;并返回等待执行的任务列表。

shutdown()

/** 按过去执行已提交任务的顺序发起一个有序的关闭&#xff0c;不接受新任务。* 如果已经关闭&#xff0c;则调用没有其他作用。*/
public void shutdown() {final ReentrantLock mainLock &#61; this.mainLock;//step1.获取独占锁mainLock.lock();try {//step2.如果有安全管理器&#xff0c;使用安全管理器检查当前线程是否有权限关闭线程池checkShutdownAccess();//step3.将线程池状态设为SHUTDOWNadvanceRunState(SHUTDOWN);//step4.中断所有空闲线程interruptIdleWorkers();//step5.钩子函数&#xff0c;没有执行任何操作onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {//step6.释放锁mainLock.unlock();}//step7.将线程池状态设置为TERMINATEDtryTerminate();
}

将shutdown()方法总结如下
step1.获取独占锁
step2.如果有安全管理器&#xff0c;使用安全管理器检查当前线程是否有权限关闭线程池&#xff0c;如果没有权限则抛出SecurityException。
step3.将线程池状态设为SHUTDOWN。一旦将线程池状态设为SHUTDOWN&#xff0c;就不能像线程池中添加新任务了。
step4.中断所有空闲线程
step5.钩子函数&#xff0c;没有执行任何操作
step6.释放锁
step7.将线程池状态设置为TERMINATED

shutdownNow()

/** 尝试停止所有的活动执行任务、暂停等待任务的处理&#xff0c;并返回等待执行的任务列表。* /
public List shutdownNow() {List tasks;final ReentrantLock mainLock &#61; this.mainLock;//step1.获取独占锁mainLock.lock();try {//step2.如果有安全管理器&#xff0c;使用安全管理器检查当前线程是否有权限关闭线程池checkShutdownAccess();//step3.尝试停止所有的活动执行任务advanceRunState(STOP);//step4.暂停等待任务的处理interruptWorkers();//step5.获取等待执行的任务列表tasks &#61; drainQueue();} finally {//step6.释放锁mainLock.unlock();}//step7.将线程池状态设置为TERMINATEDtryTerminate();//step8.返回等待执行的任务列表return tasks;
}

将shutdownNow()方法总结如下
step1.获取独占锁
step2.如果有安全管理器&#xff0c;使用安全管理器检查当前线程是否有权限关闭线程池&#xff0c;如果没有权限则抛出SecurityException。
step3.尝试停止所有的活动执行任务
step4.暂停等待任务的处理
step5.获取等待执行的任务列表
step6.释放锁
step7.将线程池状态设置为TERMINATED
step8.返回等待执行的任务列表

shutdown()和shutdownNow()的区别


  • 调用shutdown()后&#xff0c;线程池状态立刻变为SHUTDOWN&#xff0c;而调用shutdownNow()&#xff0c;线程池状态立刻变为STOP。
  • shutdown()通过中断空闲线程、不接受新任务的方式按过去执行已提交任务的顺序发起一个有序的关闭&#xff0c;shutdownNow()无差别地停止所有的活动执行任务&#xff0c;暂停等待任务的处理。也就是说&#xff0c;shutdown()等待任务执行完才中断线程&#xff0c;而shutdownNow()不等任务执行完就中断了线程。

本文就先讲到这里&#xff0c;想了解Java并发编程更多内容请参考&#xff1a;


  • Java并发编程札记-目录

推荐阅读
  • 深入理解Java虚拟机的并发编程与性能优化
    本文主要介绍了Java内存模型与线程的相关概念,探讨了并发编程在服务端应用中的重要性。同时,介绍了Java语言和虚拟机提供的工具,帮助开发人员处理并发方面的问题,提高程序的并发能力和性能优化。文章指出,充分利用计算机处理器的能力和协调线程之间的并发操作是提高服务端程序性能的关键。 ... [详细]
  • 深入解析Linux下的I/O多路转接epoll技术
    本文深入解析了Linux下的I/O多路转接epoll技术,介绍了select和poll函数的问题,以及epoll函数的设计和优点。同时讲解了epoll函数的使用方法,包括epoll_create和epoll_ctl两个系统调用。 ... [详细]
  • ejava,刘聪dejava
    本文目录一览:1、什么是Java?2、java ... [详细]
  • 如何实现织梦DedeCms全站伪静态
    本文介绍了如何通过修改织梦DedeCms源代码来实现全站伪静态,以提高管理和SEO效果。全站伪静态可以避免重复URL的问题,同时通过使用mod_rewrite伪静态模块和.htaccess正则表达式,可以更好地适应搜索引擎的需求。文章还提到了一些相关的技术和工具,如Ubuntu、qt编程、tomcat端口、爬虫、php request根目录等。 ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
  • Java中包装类的设计原因以及操作方法
    本文主要介绍了Java中设计包装类的原因以及操作方法。在Java中,除了对象类型,还有八大基本类型,为了将基本类型转换成对象,Java引入了包装类。文章通过介绍包装类的定义和实现,解答了为什么需要包装类的问题,并提供了简单易用的操作方法。通过本文的学习,读者可以更好地理解和应用Java中的包装类。 ... [详细]
  • 预备知识可参考我整理的博客Windows编程之线程:https:www.cnblogs.comZhuSenlinp16662075.htmlWindows编程之线程同步:https ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • 本文介绍了操作系统的定义和功能,包括操作系统的本质、用户界面以及系统调用的分类。同时还介绍了进程和线程的区别,包括进程和线程的定义和作用。 ... [详细]
  • 篇首语:本文由编程笔记#小编为大家整理,主要介绍了软件测试知识点之数据库压力测试方法小结相关的知识,希望对你有一定的参考价值。 ... [详细]
  • Centos下安装memcached+memcached教程
    本文介绍了在Centos下安装memcached和使用memcached的教程,详细解释了memcached的工作原理,包括缓存数据和对象、减少数据库读取次数、提高网站速度等。同时,还对memcached的快速和高效率进行了解释,与传统的文件型数据库相比,memcached作为一个内存型数据库,具有更高的读取速度。 ... [详细]
  • Spring框架《一》简介
    Spring框架《一》1.Spring概述1.1简介1.2Spring模板二、IOC容器和Bean1.IOC和DI简介2.三种通过类型获取bean3.给bean的属性赋值3.1依赖 ... [详细]
  • linux进阶50——无锁CAS
    1.概念比较并交换(compareandswap,CAS),是原⼦操作的⼀种,可⽤于在多线程编程中实现不被打断的数据交换操作࿰ ... [详细]
  • 1Lock与ReadWriteLock1.1LockpublicinterfaceLock{voidlock();voidlockInterruptibl ... [详细]
  • 一次上线事故,30岁+的程序员踩坑经验之谈
    本文主要介绍了一位30岁+的程序员在一次上线事故中踩坑的经验之谈。文章提到了在双十一活动期间,作为一个在线医疗项目,他们进行了优惠折扣活动的升级改造。然而,在上线前的最后一天,由于大量数据请求,导致部分接口出现问题。作者通过部署两台opentsdb来解决问题,但读数据的opentsdb仍然经常假死。作者只能查询最近24小时的数据。这次事故给他带来了很多教训和经验。 ... [详细]
author-avatar
刘海龙_高安大城
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有