热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

ThreadPoolExecutor源码解析(线程池的工作原理)

资料使用ThreadPoolExecutor遇到的核心线程被阻塞,非核心线程未按照预期运行问题简单分析ThreadPoolExecutor回收工作线程的原理线程有
资料

使用ThreadPoolExecutor遇到的核心线程被阻塞,非核心线程未按照预期运行问题
简单分析ThreadPoolExecutor回收工作线程的原理

线程有几种状态

public enum State {/*** Thread state for a thread which has not yet started. 尚未启动的线程的线程状态。*/NEW,/*** Thread state for a runnable thread. A thread in the runnable* state is executing in the Java virtual machine but it may* be waiting for other resources from the operating system* such as processor. * 可运行线程的线程状态。处于可运行状态的线程正在 Java 虚拟机中执行,但它可能正在等待来自操作系统的其他资源,例如处理器。*/RUNNABLE,/*** Thread state for a thread blocked waiting for a monitor lock.* A thread in the blocked state is waiting for a monitor lock* to enter a synchronized block/method or* reenter a synchronized block/method after calling* {@link Object#wait() Object.wait}.* 线程阻塞等待监视器锁的线程状态。处于阻塞状态的线程在调用后等待监视器锁进入同步块/方法*/BLOCKED,/*** Thread state for a waiting thread.* A thread is in the waiting state due to calling one of the* following methods:* 等待线程的线程状态。由于调用以下方法之一,线程处于等待状态:*

    *
  • {@link Object#wait() Object.wait} with no timeout
  • *
  • {@link #join() Thread.join} with no timeout
  • *
  • {@link LockSupport#park() LockSupport.park}
  • *
**

A thread in the waiting state is waiting for another thread to* perform a particular action.* 处于等待状态的线程正在等待另一个线程执行特定操作。** For example, a thread that has called {@code Object.wait()}* on an object is waiting for another thread to call* {@code Object.notify()} or {@code Object.notifyAll()} on* that object. A thread that has called {@code Thread.join()}* is waiting for a specified thread to terminate.*/WAITING,/*** Thread state for a waiting thread with a specified waiting time.* A thread is in the timed waiting state due to calling one of* the following methods with a specified positive waiting time:* 具有指定等待时间的等待线程的线程状态。由于以指定的正等待时间调用以下方法之一,线程处于定时等待状态:*

    *
  • {@link #sleep Thread.sleep}
  • *
  • {@link Object#wait(long) Object.wait} with timeout
  • *
  • {@link #join(long) Thread.join} with timeout
  • *
  • {@link LockSupport#parkNanos LockSupport.parkNanos}
  • *
  • {@link LockSupport#parkUntil LockSupport.parkUntil}
  • *
*/TIMED_WAITING,/*** Thread state for a terminated thread.* The thread has completed execution.* 已终止线程的线程状态。线程已完成执行。*/TERMINATED;}

1. ThreadPoolExecutor构造函数

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);}int corePoolSize, // 核心线程
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 持续时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 阻塞线程
ThreadFactory threadFactory //工厂方法

2. public void execute(Runnable command)

public void execute(Runnable command) {if (command &#61;&#61; null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn&#39;t, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c &#61; ctl.get();// 小于核心线程&#xff0c;直接加入if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c &#61; ctl.get();}// 判断是否加入阻塞队列if (isRunning(c) && workQueue.offer(command)) {int recheck &#61; ctl.get();if (! isRunning(recheck) && remove(command))reject(command); // 拒绝策略else if (workerCountOf(recheck) &#61;&#61; 0)addWorker(null, false);}else if (!addWorker(command, false)) // 尝试加入非核心线程reject(command); // 没有加入成功&#xff0c;拒绝策略}

总结&#xff1a;

  1. 先尝试加入核心线程&#xff08;如果还没超过核心线程数&#xff09;
  2. 再尝试加入阻塞队列&#xff0c;如果没有SHUTDOWN&#xff0c;判断核心运行为0&#xff0c;则再次运行
  3. 最后加入非核心线程&#xff0c;没有加入&#xff0c;怎拒绝策略

3. addWorker

/** Methods for creating, running and cleaning up after workers*//*** Checks if a new worker can be added with respect to current* pool state and the given bound (either core or maximum). If so,* the worker count is adjusted accordingly, and, if possible, a* new worker is created and started, running firstTask as its* first task. This method returns false if the pool is stopped or* eligible to shut down. It also returns false if the thread* factory fails to create a thread when asked. If the thread* creation fails, either due to the thread factory returning* null, or due to an exception (typically OutOfMemoryError in* Thread.start()), we roll back cleanly.** &#64;param firstTask the task the new thread should run first (or* null if none). Workers are created with an initial first task* (in method execute()) to bypass queuing when there are fewer* than corePoolSize threads (in which case we always start one),* or when the queue is full (in which case we must bypass queue).* Initially idle threads are usually created via* prestartCoreThread or to replace other dying workers.** &#64;param core if true use corePoolSize as bound, else* maximumPoolSize. (A boolean indicator is used here rather than a* value to ensure reads of fresh values after checking other pool* state).* &#64;return true if successful*/private boolean addWorker(Runnable firstTask, boolean core) {retry:for (int c &#61; ctl.get();;) {// Check if queue empty only if necessary.if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP)|| firstTask !&#61; null|| workQueue.isEmpty()))return false;for (;;) {// 如果不满足核心和非核心线程数&#xff0c;就falseif (workerCountOf(c)>&#61; ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))return false;if (compareAndIncrementWorkerCount(c))break retry;c &#61; ctl.get(); // Re-read ctlif (runStateAtLeast(c, SHUTDOWN))continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted &#61; false;boolean workerAdded &#61; false;Worker w &#61; null;try {// 创建一个Worker工作类w &#61; new Worker(firstTask);final Thread t &#61; w.thread;if (t !&#61; null) {final ReentrantLock mainLock &#61; this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c &#61; ctl.get();if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask &#61;&#61; null)) {if (t.getState() !&#61; Thread.State.NEW)throw new IllegalThreadStateException();workers.add(w);workerAdded &#61; true;int s &#61; workers.size();if (s > largestPoolSize)largestPoolSize &#61; s;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted &#61; true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

总结&#xff1a;
1. 判断是否在核心和非核心线程内
2. 创建一个Worker
3. 添加到workers线程中
4. 开启Worker中Thread的线程

4. Worker

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{/*** This class will never be serialized, but we provide a* serialVersionUID to suppress a javac warning.*/private static final long serialVersionUID &#61; 6138294804551838833L;/** Thread this worker is running in. Null if factory fails. */final Thread thread;/** Initial task to run. Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;// TODO: switch to AbstractQueuedLongSynchronizer and move// completedTasks into the lock word./*** Creates with given first task and thread from ThreadFactory.* &#64;param firstTask the first task (null if none)*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask &#61; firstTask;this.thread &#61; getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker. */public void run() {// 真正运行线程开始runWorker(this);}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.protected boolean isHeldExclusively() {return getState() !&#61; 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >&#61; 0 && (t &#61; thread) !&#61; null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}

总结&#xff1a;

  1. this.thread &#61; getThreadFactory().newThread(this); 要运行Worker里面的run方法
  2. run中运行外面的runWorker方法
  3. firstTask也就起到一个保存&#xff0c;和被调用的作用

5. runWorker

/*** Main worker run loop. Repeatedly gets tasks from queue and* executes them, while coping with a number of issues:** 1. We may start out with an initial task, in which case we* don&#39;t need to get the first one. Otherwise, as long as pool is* running, we get tasks from getTask. If it returns null then the* worker exits due to changed pool state or configuration* parameters. Other exits result from exception throws in* external code, in which case completedAbruptly holds, which* usually leads processWorkerExit to replace this thread.** 2. Before running any task, the lock is acquired to prevent* other pool interrupts while the task is executing, and then we* ensure that unless pool is stopping, this thread does not have* its interrupt set.** 3. Each task run is preceded by a call to beforeExecute, which* might throw an exception, in which case we cause thread to die* (breaking loop with completedAbruptly true) without processing* the task.** 4. Assuming beforeExecute completes normally, we run the task,* gathering any of its thrown exceptions to send to afterExecute.* We separately handle RuntimeException, Error (both of which the* specs guarantee that we trap) and arbitrary Throwables.* Because we cannot rethrow Throwables within Runnable.run, we* wrap them within Errors on the way out (to the thread&#39;s* UncaughtExceptionHandler). Any thrown exception also* conservatively causes thread to die.** 5. After task.run completes, we call afterExecute, which may* also throw an exception, which will also cause thread to* die. According to JLS Sec 14.20, this exception is the one that* will be in effect even if task.run throws.** The net effect of the exception mechanics is that afterExecute* and the thread&#39;s UncaughtExceptionHandler have as accurate* information as we can provide about any problems encountered by* user code.** &#64;param w the worker*/final void runWorker(Worker w) {Thread wt &#61; Thread.currentThread();Runnable task &#61; w.firstTask;w.firstTask &#61; null;w.unlock(); // allow interruptsboolean completedAbruptly &#61; true;try {// 1. task不为空// 2. getTask 从阻塞列表中获取while (task !&#61; null || (task &#61; getTask()) !&#61; null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);try {task.run();afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task &#61; null;w.completedTasks&#43;&#43;;w.unlock();}}completedAbruptly &#61; false;} finally {processWorkerExit(w, completedAbruptly);}}

总结;

  1. task不为空&#xff0c;就直接执行task
  2. task为空(firstTask&#61;&#61;null)&#xff0c;通过getTask获取任务
  3. 此时runWorker在Thread线程里
  4. 运行完之后调用processWorkerExit(…

getTask

/*** Performs blocking or timed wait for a task, depending on* current configuration settings, or returns null if this worker* must exit because of any of:* 1. There are more than maximumPoolSize workers (due to* a call to setMaximumPoolSize).* 2. The pool is stopped.* 3. The pool is shutdown and the queue is empty.* 4. This worker timed out waiting for a task, and timed-out* workers are subject to termination (that is,* {&#64;code allowCoreThreadTimeOut || workerCount > corePoolSize})* both before and after the timed wait, and if the queue is* non-empty, this worker is not the last thread in the pool.** &#64;return task, or null if the worker must exit, in which case* workerCount is decremented*/private Runnable getTask() {boolean timedOut &#61; false; // Did the last poll() time out?for (;;) {int c &#61; ctl.get();// Check if queue empty only if necessary.if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc &#61; workerCountOf(c);// Are workers subject to culling?boolean timed &#61; allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 从阻塞队列中获取&#xff0c; 此处有用到延迟消息// poll(keepAliveTime 是延迟获取// take() 是阻塞获取Runnable r &#61; timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r !&#61; null)return r;timedOut &#61; true;} catch (InterruptedException retry) {timedOut &#61; false;}}}

总结:

  1. 从阻塞队列中获取&#xff0c; 此处有用到延迟消息
  2. poll(keepAliveTime 是延迟获取
  3. take() 是阻塞获取

processWorkerExit(w, completedAbruptly);

/*** Performs cleanup and bookkeeping for a dying worker. Called* only from worker threads. Unless completedAbruptly is set,* assumes that workerCount has already been adjusted to account* for exit. This method removes thread from worker set, and* possibly terminates the pool or replaces the worker if either* it exited due to user task exception or if fewer than* corePoolSize workers are running or queue is non-empty but* there are no workers.** &#64;param w the worker* &#64;param completedAbruptly if the worker died due to user exception*/private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn&#39;t adjusteddecrementWorkerCount();final ReentrantLock mainLock &#61; this.mainLock;mainLock.lock();try {completedTaskCount &#43;&#61; w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}tryTerminate();int c &#61; ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min &#61; allowCoreThreadTimeOut ? 0 : corePoolSize;if (min &#61;&#61; 0 && ! workQueue.isEmpty())min &#61; 1;if (workerCountOf(c) >&#61; min)return; // replacement not needed}addWorker(null, false);}}

总结&#xff1a;

  1. 回收Worker
  2. 通过addWorker添加到worker集合中

怎样回收线程

addWorkder(null, false);
添加一个null的runnable, 当为空时候,会从队列中去取.


推荐阅读
  • IjustinheritedsomewebpageswhichusesMooTools.IneverusedMooTools.NowIneedtoaddsomef ... [详细]
  • Spring常用注解(绝对经典),全靠这份Java知识点PDF大全
    本文介绍了Spring常用注解和注入bean的注解,包括@Bean、@Autowired、@Inject等,同时提供了一个Java知识点PDF大全的资源链接。其中详细介绍了ColorFactoryBean的使用,以及@Autowired和@Inject的区别和用法。此外,还提到了@Required属性的配置和使用。 ... [详细]
  • 纠正网上的错误:自定义一个类叫java.lang.System/String的方法
    本文纠正了网上关于自定义一个类叫java.lang.System/String的错误答案,并详细解释了为什么这种方法是错误的。作者指出,虽然双亲委托机制确实可以阻止自定义的System类被加载,但通过自定义一个特殊的类加载器,可以绕过双亲委托机制,达到自定义System类的目的。作者呼吁读者对网上的内容持怀疑态度,并带着问题来阅读文章。 ... [详细]
  • 重入锁(ReentrantLock)学习及实现原理
    本文介绍了重入锁(ReentrantLock)的学习及实现原理。在学习synchronized的基础上,重入锁提供了更多的灵活性和功能。文章详细介绍了重入锁的特性、使用方法和实现原理,并提供了类图和测试代码供读者参考。重入锁支持重入和公平与非公平两种实现方式,通过对比和分析,读者可以更好地理解和应用重入锁。 ... [详细]
  • 本文介绍了在Android开发中使用软引用和弱引用的应用。如果一个对象只具有软引用,那么只有在内存不够的情况下才会被回收,可以用来实现内存敏感的高速缓存;而如果一个对象只具有弱引用,不管内存是否足够,都会被垃圾回收器回收。软引用和弱引用还可以与引用队列联合使用,当被引用的对象被回收时,会将引用加入到关联的引用队列中。软引用和弱引用的根本区别在于生命周期的长短,弱引用的对象可能随时被回收,而软引用的对象只有在内存不够时才会被回收。 ... [详细]
  • 如何自行分析定位SAP BSP错误
    The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
  • 生成式对抗网络模型综述摘要生成式对抗网络模型(GAN)是基于深度学习的一种强大的生成模型,可以应用于计算机视觉、自然语言处理、半监督学习等重要领域。生成式对抗网络 ... [详细]
  • 本文介绍了九度OnlineJudge中的1002题目“Grading”的解决方法。该题目要求设计一个公平的评分过程,将每个考题分配给3个独立的专家,如果他们的评分不一致,则需要请一位裁判做出最终决定。文章详细描述了评分规则,并给出了解决该问题的程序。 ... [详细]
  • 本文详细介绍了Linux中进程控制块PCBtask_struct结构体的结构和作用,包括进程状态、进程号、待处理信号、进程地址空间、调度标志、锁深度、基本时间片、调度策略以及内存管理信息等方面的内容。阅读本文可以更加深入地了解Linux进程管理的原理和机制。 ... [详细]
  • Python正则表达式学习记录及常用方法
    本文记录了学习Python正则表达式的过程,介绍了re模块的常用方法re.search,并解释了rawstring的作用。正则表达式是一种方便检查字符串匹配模式的工具,通过本文的学习可以掌握Python中使用正则表达式的基本方法。 ... [详细]
  • 本文探讨了C语言中指针的应用与价值,指针在C语言中具有灵活性和可变性,通过指针可以操作系统内存和控制外部I/O端口。文章介绍了指针变量和指针的指向变量的含义和用法,以及判断变量数据类型和指向变量或成员变量的类型的方法。还讨论了指针访问数组元素和下标法数组元素的等价关系,以及指针作为函数参数可以改变主调函数变量的值的特点。此外,文章还提到了指针在动态存储分配、链表创建和相关操作中的应用,以及类成员指针与外部变量的区分方法。通过本文的阐述,读者可以更好地理解和应用C语言中的指针。 ... [详细]
  • 本文讨论了如何使用IF函数从基于有限输入列表的有限输出列表中获取输出,并提出了是否有更快/更有效的执行代码的方法。作者希望了解是否有办法缩短代码,并从自我开发的角度来看是否有更好的方法。提供的代码可以按原样工作,但作者想知道是否有更好的方法来执行这样的任务。 ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • 本文介绍了在iOS开发中使用UITextField实现字符限制的方法,包括利用代理方法和使用BNTextField-Limit库的实现策略。通过这些方法,开发者可以方便地限制UITextField的字符个数和输入规则。 ... [详细]
  • 本文介绍了Oracle存储过程的基本语法和写法示例,同时还介绍了已命名的系统异常的产生原因。 ... [详细]
author-avatar
ruigh
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有