热门标签 | HotTags
当前位置:  开发笔记 > 运维 > 正文

JavaThreadPoolExecutor线程池的使用介绍

Executors是一个Java中的工具类.提供工厂方法来创建不同类型的线程池,这篇文章主要介绍了JavaThreadPoolExecutor线程池的使用介绍,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

Executors

Executors 是一个Java中的工具类. 提供工厂方法来创建不同类型的线程池.

从上图中也可以看出, Executors的创建线程池的方法, 创建出来的线程池都实现了 ExecutorService接口. 常用方法有以下几个:

  • newFixedThreadPool(int Threads): 创建固定数目线程的线程池, 超出的线程会在队列中等待.
  • newCachedThreadPool(): 创建一个可缓存线程池, 如果线程池长度超过处理需要, 可灵活回收空闲线程(60秒), 若无可回收,则新建线程.
  • newSingleThreadExecutor(): 创建一个单线程化的线程池, 它只会用唯一的工作线程来执行任务, 保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行. 如果某一个任务执行出错, 将有另一个线程来继续执行.
  • newScheduledThreadPool(int corePoolSize): 创建一个支持定时及周期性的任务执行的线程池, 多数情况下可用来替代Timer类.

Executors 例子

newCachedThreadPool

线程最大数为 Integer.MAX_VALUE, 当我们往线程池添加了 n 个任务, 这 n 个任务都是一起执行的.

  ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    cachedThreadPool.execute(new Runnable() {
      @Override
      public void run() {
        for (;;) {
          try {
            Thread.currentThread().sleep(1000);
            System.out.println(Thread.currentThread().getName());
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });

    cachedThreadPool.execute(new Runnable() {
      @Override
      public void run() {
        for (;;) {
          try {
            Thread.currentThread().sleep(1000);
            System.out.println(Thread.currentThread().getName());
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });

newFixedThreadPool

  ExecutorService cachedThreadPool = Executors.newFixedThreadPool(1);
    cachedThreadPool.execute(new Runnable() {
      @Override
      public void run() {
        for (;;) {
          try {
            Thread.currentThread().sleep(1000);
            System.out.println(Thread.currentThread().getName());
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });

    cachedThreadPool.execute(new Runnable() {
      @Override
      public void run() {
        for (;;) {
          try {
            Thread.currentThread().sleep(1000);
            System.out.println(Thread.currentThread().getName());
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });

newScheduledThreadPool

三秒执行一次, 只有执行完这一次后, 才会执行.

 ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
    scheduledExecutorService.schedule(new Runnable() {
      @Override
      public void run() {
        for (;;) {
          try {
            Thread.currentThread().sleep(2000);
            System.out.println(Thread.currentThread().getName());
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    }, 3, TimeUnit.SECONDS);

newSingleThreadExecutor

顺序执行各个任务, 第一个任务执行完, 才会执行下一个.

ExecutorService executorService = Executors.newSingleThreadExecutor();
    executorService.execute(new Runnable() {
      @Override
      public void run() {
        for (;;) {
          try {
            System.out.println(Thread.currentThread().getName());
            Thread.currentThread().sleep(10000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });

    executorService.execute(new Runnable() {
      @Override
      public void run() {
        for (;;) {
          try {
            System.out.println(Thread.currentThread().getName());
            Thread.currentThread().sleep(2);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });

Executors存在什么问题

在阿里巴巴Java开发手册中提到,使用Executors创建线程池可能会导致OOM(OutOfMemory ,内存溢出),但是并没有说明为什么,那么接下来我们就来看一下到底为什么不允许使用Executors?

我们先来一个简单的例子,模拟一下使用Executors导致OOM的情况.

/**
 * @author Hollis
 */
public class ExecutorsDemo {
  private static ExecutorService executor = Executors.newFixedThreadPool(15);
  public static void main(String[] args) {
    for (int i = 0; i 

通过指定JVM参数:-Xmx8m -Xms8m 运行以上代码,会抛出OOM:

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
at com.hollis.ExecutorsDemo.main(ExecutorsDemo.java:16)

以上代码指出,ExecutorsDemo.java 的第16行,就是代码中的 executor.execute(new SubThread());

Java中的 BlockingQueue 主要有两种实现, 分别是 ArrayBlockingQueueLinkedBlockingQueue.

ArrayBlockingQueue 是一个用数组实现的有界阻塞队列, 必须设置容量.

public ArrayBlockingQueue(int capacity, boolean fair) {
  if (capacity <= 0)
    throw new IllegalArgumentException();
  this.items = new Object[capacity];
  lock = new ReentrantLock(fair);
  notEmpty = lock.newCondition();
  notFull = lock.newCondition();
}

LinkedBlockingQueue 是一个用链表实现的有界阻塞队列, 容量可以选择进行设置, 不设置的话, 将是一个无边界的阻塞队列, 最大长度为 Integer.MAX_VALUE.

public LinkedBlockingQueue() {
  this(Integer.MAX_VALUE);
}

这里的问题就出在如果我们不设置 LinkedBlockingQueue 的容量的话, 其默认容量将会是 Integer.MAX_VALUE.

newFixedThreadPool 中创建 LinkedBlockingQueue 时, 并未指定容量. 此时, LinkedBlockingQueue 就是一个无边界队列, 对于一个无边界队列来说, 是可以不断的向队列中加入任务的, 这种情况下就有可能因为任务过多而导致内存溢出问题.

newCachedThreadPoolnewScheduledThreadPool 这两种方式创建的最大线程数可能是Integer.MAX_VALUE, 而创建这么多线程, 必然就有可能导致OOM.

ThreadPoolExecutor 创建线程池

避免使用 Executors 创建线程池, 主要是避免使用其中的默认实现, 那么我们可以自己直接调用 ThreadPoolExecutor 的构造函数来自己创建线程池. 在创建的同时, 给 BlockQueue 指定容量就可以了.

ExecutorService executor = new ThreadPoolExecutor(10, 10,
    60L, TimeUnit.SECONDS,
    new ArrayBlockingQueue(10));

这种情况下, 一旦提交的线程数超过当前可用线程数时, 就会抛出 java.util.concurrent.RejectedExecutionException, 这是因为当前线程池使用的队列是有边界队列, 队列已经满了便无法继续处理新的请求.

除了自己定义 ThreadPoolExecutor 外. 还有其他方法. 如apache和guava等.

四个构造函数

  public ThreadPoolExecutor(int corePoolSize,
               int maximumPoolSize,
               long keepAliveTime,
               TimeUnit unit,
               BlockingQueue workQueue)
       
  public ThreadPoolExecutor(int corePoolSize,
               int maximumPoolSize,
               long keepAliveTime,
               TimeUnit unit,
               BlockingQueue workQueue,
               ThreadFactory threadFactory)
       
  public ThreadPoolExecutor(int corePoolSize,
               int maximumPoolSize,
               long keepAliveTime,
               TimeUnit unit,
               BlockingQueue workQueue,
               RejectedExecutionHandler handler)
       
  public ThreadPoolExecutor(int corePoolSize,
               int maximumPoolSize,
               long keepAliveTime,
               TimeUnit unit,
               BlockingQueue workQueue,
               ThreadFactory threadFactory,
               RejectedExecutionHandler handler)

int corePoolSize => 该线程池中核心线程数最大值

线程池新建线程的时候,如果当前线程总数小于corePoolSize, 则新建的是核心线程, 如果超过corePoolSize, 则新建的是非核心线程

核心线程默认情况下会一直存活在线程池中, 即使这个核心线程啥也不干(闲置状态).

如果指定 ThreadPoolExecutor 的 allowCoreThreadTimeOut 这个属性为 true, 那么核心线程如果不干活(闲置状态)的话, 超过一定时间(时长下面参数决定), 就会被销毁掉

很好理解吧, 正常情况下你不干活我也养你, 因为我总有用到你的时候, 但有时候特殊情况(比如我自己都养不起了), 那你不干活我就要把你干掉了

int maximumPoolSize
该线程池中线程总数最大值

线程总数 = 核心线程数 + 非核心线程数.

long keepAliveTime
该线程池中非核心线程闲置超时时长

一个非核心线程, 如果不干活(闲置状态)的时长超过这个参数所设定的时长, 就会被销毁掉

如果设置 allowCoreThreadTimeOut = true, 则会作用于核心线程

TimeUnit unit

keepAliveTime的单位, TimeUnit是一个枚举类型, 其包括:

TimeUnit.DAYS;        //天
TimeUnit.HOURS;       //小时
TimeUnit.MINUTES;      //分钟
TimeUnit.SECONDS;      //秒
TimeUnit.MILLISECONDS;   //毫秒
TimeUnit.MICROSECONDS;   //微妙
TimeUnit.NANOSECONDS;    //纳秒

BlockingQueue workQueue

一个阻塞队列, 用来存储等待执行的任务. 也就是说现在有10个任务, 核心线程 有四个, 非核心线程有六个, 那么这六个线程会被添加到 workQueue 中, 等待执行.

这个参数的选择也很重要, 会对线程池的运行过程产生重大影响, 一般来说, 这里的阻塞队列有以下几种选择:

SynchronousQueue: 这个队列接收到任务的时候, 会直接提交给线程处理, 而不保留它, 如果所有线程都在工作怎么办? 那就*新建一个线程来处理这个任务!所以为了保证不出现<线程数达到了maximumPoolSize而不能新建线程>的错误, 使用这个类型队列的时候, maximumPoolSize 一般指定成 Integer.MAX_VALUE, 即无限大.

LinkedBlockingQueue: 这个队列接收到任务的时候, 如果当前线程数小于核心线程数, 则核心线程处理任务; 如果当前线程数等于核心线程数, 则进入队列等待. 由于这个队列最大值为 Integer.MAX_VALUE , 即所有超过核心线程数的任务都将被添加到队列中,这也就导致了 maximumPoolSize 的设定失效, 因为总线程数永远不会超过 corePoolSize.

ArrayBlockingQueue: 可以限定队列的长度, 接收到任务的时候, 如果没有达到 corePoolSize 的值, 则核心线程执行任务, 如果达到了, 则入队等候, 如果队列已满, 则新建线程(非核心线程)执行任务, 又如果总线程数到了maximumPoolSize, 并且队列也满了, 则发生错误.

DelayQueue: 队列内元素必须实现 Delayed 接口, 这就意味着你传进去的任务必须先实现Delayed接口. 这个队列接收到任务时, 首先先入队, 只有达到了指定的延时时间, 才会执行任务.

ThreadFactory threadFactory

它是ThreadFactory类型的变量, 用来创建新线程.

默认使用 Executors.defaultThreadFactory() 来创建线程. 使用默认的 ThreadFactory 来创建线程时, 会使新创建的线程具有相同的 NORM_PRIORITY 优先级并且是非守护线程, 同时也设置了线程的名称.

RejectedExecutionHandler handler

表示当拒绝处理任务时的策略, 有以下四种取值:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常(默认).
ThreadPoolExecutor.DiscardPolicy:直接丢弃任务, 但是不抛出异常.
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务, 然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:用调用者所在的线程来执行任务.

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。


推荐阅读
  • 本文讨论了Alink回归预测的不完善问题,指出目前主要针对Python做案例,对其他语言支持不足。同时介绍了pom.xml文件的基本结构和使用方法,以及Maven的相关知识。最后,对Alink回归预测的未来发展提出了期待。 ... [详细]
  • 本文介绍了在Win10上安装WinPythonHadoop的详细步骤,包括安装Python环境、安装JDK8、安装pyspark、安装Hadoop和Spark、设置环境变量、下载winutils.exe等。同时提醒注意Hadoop版本与pyspark版本的一致性,并建议重启电脑以确保安装成功。 ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • mac php错误日志配置方法及错误级别修改
    本文介绍了在mac环境下配置php错误日志的方法,包括修改php.ini文件和httpd.conf文件的操作步骤。同时还介绍了如何修改错误级别,以及相应的错误级别参考链接。 ... [详细]
  • 一句话解决高并发的核心原则
    本文介绍了解决高并发的核心原则,即将用户访问请求尽量往前推,避免访问CDN、静态服务器、动态服务器、数据库和存储,从而实现高性能、高并发、高可扩展的网站架构。同时提到了Google的成功案例,以及适用于千万级别PV站和亿级PV网站的架构层次。 ... [详细]
  • 如何提高PHP编程技能及推荐高级教程
    本文介绍了如何提高PHP编程技能的方法,推荐了一些高级教程。学习任何一种编程语言都需要长期的坚持和不懈的努力,本文提醒读者要有足够的耐心和时间投入。通过实践操作学习,可以更好地理解和掌握PHP语言的特异性,特别是单引号和双引号的用法。同时,本文也指出了只走马观花看整体而不深入学习的学习方式无法真正掌握这门语言,建议读者要从整体来考虑局部,培养大局观。最后,本文提醒读者完成一个像模像样的网站需要付出更多的努力和实践。 ... [详细]
  • 本文介绍了使用Spark实现低配版高斯朴素贝叶斯模型的原因和原理。随着数据量的增大,单机上运行高斯朴素贝叶斯模型会变得很慢,因此考虑使用Spark来加速运行。然而,Spark的MLlib并没有实现高斯朴素贝叶斯模型,因此需要自己动手实现。文章还介绍了朴素贝叶斯的原理和公式,并对具有多个特征和类别的模型进行了讨论。最后,作者总结了实现低配版高斯朴素贝叶斯模型的步骤。 ... [详细]
  • Activiti7流程定义开发笔记
    本文介绍了Activiti7流程定义的开发笔记,包括流程定义的概念、使用activiti-explorer和activiti-eclipse-designer进行建模的方式,以及生成流程图的方法。还介绍了流程定义部署的概念和步骤,包括将bpmn和png文件添加部署到activiti数据库中的方法,以及使用ZIP包进行部署的方式。同时还提到了activiti.cfg.xml文件的作用。 ... [详细]
  • Android日历提醒软件开源项目分享及使用教程
    本文介绍了一款名为Android日历提醒软件的开源项目,作者分享了该项目的代码和使用教程,并提供了GitHub项目地址。文章详细介绍了该软件的主界面风格、日程信息的分类查看功能,以及添加日程提醒和查看详情的界面。同时,作者还提醒了读者在使用过程中可能遇到的Android6.0权限问题,并提供了解决方法。 ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • 本文讨论了在shiro java配置中加入Shiro listener后启动失败的问题。作者引入了一系列jar包,并在web.xml中配置了相关内容,但启动后却无法正常运行。文章提供了具体引入的jar包和web.xml的配置内容,并指出可能的错误原因。该问题可能与jar包版本不兼容、web.xml配置错误等有关。 ... [详细]
  • 目录浏览漏洞与目录遍历漏洞的危害及修复方法
    本文讨论了目录浏览漏洞与目录遍历漏洞的危害,包括网站结构暴露、隐秘文件访问等。同时介绍了检测方法,如使用漏洞扫描器和搜索关键词。最后提供了针对常见中间件的修复方式,包括关闭目录浏览功能。对于保护网站安全具有一定的参考价值。 ... [详细]
  • Java如何导入和导出Excel文件的方法和步骤详解
    本文详细介绍了在SpringBoot中使用Java导入和导出Excel文件的方法和步骤,包括添加操作Excel的依赖、自定义注解等。文章还提供了示例代码,并将代码上传至GitHub供访问。 ... [详细]
  • Apache Shiro 身份验证绕过漏洞 (CVE202011989) 详细解析及防范措施
    本文详细解析了Apache Shiro 身份验证绕过漏洞 (CVE202011989) 的原理和影响,并提供了相应的防范措施。Apache Shiro 是一个强大且易用的Java安全框架,常用于执行身份验证、授权、密码和会话管理。在Apache Shiro 1.5.3之前的版本中,与Spring控制器一起使用时,存在特制请求可能导致身份验证绕过的漏洞。本文还介绍了该漏洞的具体细节,并给出了防范该漏洞的建议措施。 ... [详细]
  • Sleuth+zipkin链路追踪SpringCloud微服务的解决方案
    在庞大的微服务群中,随着业务扩展,微服务个数增多,系统调用链路复杂化。Sleuth+zipkin是解决SpringCloud微服务定位和追踪的方案。通过TraceId将不同服务调用的日志串联起来,实现请求链路跟踪。通过Feign调用和Request传递TraceId,将整个调用链路的服务日志归组合并,提供定位和追踪的功能。 ... [详细]
author-avatar
Alistar1991_281
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有