热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

java中的Executors简介与多线程在网站上逐步优化的运用案例

提供Executor的工厂类忽略了自定义的ThreadFactory、callable和unconf

提供Executor的工厂类

java 中的 Executors 简介与多线程在网站上逐步优化的运用案例

忽略了自定义的ThreadFactory、callable和unconfigurable相关的方法

  • newFixedxxx:在任意时刻,最多有nThreads个线程在处理task;如果所有线程都在运行时来了新的任务,它会被扔入队列;如果有线程在执行期间因某种原因终止了运行,如果需要执行后续任务,新的线程将取代它

    return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue());
    
  • newCachedxxx:新任务到来如果线程池中有空闲的线程就复用,否则新建一个线程。如果一个线程超过60秒没有使用,它就会被关闭移除线程池

    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue());
    
  • newSingleThreadExecutor:仅使用一个线程来处理任务,如果这线程挂了,会产生一个新的线程来代替它。每一个任务被保证按照顺序执行,而且一次只执行一个

    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue()));
        }
    

    使用newFixedxxx方法也能实现类似的作用,但是ThreadPoolExecutor会提供修改线程数的方法,FinalizableDelegatedExecutorService则没有修改的途径,它在DelegatedExecutorService的基础 上仅提供了执行finalize时候去关闭线程,而DelegatedExecutorService仅暴漏ExecutorService自身的方法

  • newScheduledThreadPool:提供一个线程池来延迟或者定期执行任务

    public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
                  new DelayedWorkQueue());
        }
    
  • newSingleThreadScheduledExecutor:提供单个线程来延迟或者定期执行任务,如果执行的线程挂了,会生成新的。

    return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1));
    

    同样,它保证返回的Executor自身的线程数不可修改

从上述的实现可以看出,核心在于三个部分

  • ThreadPoolExecutor:提供线程数相关的控制
  • DelegatedExecutorService:仅暴露ExecutorService自身的方法,保证线程数不变来实现语义场景
  • ScheduledExecutorService:提供延迟或者定期执行的功能

对应的,相应也有不同的队列去实现不同的场景

  • LinkedBlockingQueue:无界阻塞队列
  • SynchronousQueue:没有消费者消费时,新的任务就会被阻塞
  • DelayQueue:队列中的任务过期之后才可以执行,否则无法查询到队列中的元素

DelegatedExecutorService

它仅仅是包装了ExecutorService的方法,交由传入的ExecutorService来执行,所谓的UnConfigurable实际也就是它没有暴漏配置各种参数调整的方法

static class DelegatedExecutorService extends AbstractExecutorService {
        private final ExecutorService e;
        DelegatedExecutorService(ExecutorService executor) { e = executor; }
        public void execute(Runnable command) { e.execute(command); }
        public void shutdown() { e.shutdown(); }
        public List shutdownNow() { return e.shutdownNow(); }
        public boolean isShutdown() { return e.isShutdown(); }
        public boolean isTerminated() { return e.isTerminated(); }
        public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
            return e.awaitTermination(timeout, unit);
        }
        public Future submit(Runnable task) {
            return e.submit(task);
        }
        public  Future submit(Callable task) {
            return e.submit(task);
        }
        public  Future submit(Runnable task, T result) {
            return e.submit(task, result);
        }
        public  List> invokeAll(Collection> tasks)
            throws InterruptedException {
            return e.invokeAll(tasks);
        }
        public  List> invokeAll(Collection> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException {
            return e.invokeAll(tasks, timeout, unit);
        }
        public  T invokeAny(Collection> tasks)
            throws InterruptedException, ExecutionException {
            return e.invokeAny(tasks);
        }
        public  T invokeAny(Collection> tasks,
                               long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            return e.invokeAny(tasks, timeout, unit);
        }
    }

ScheduledExecutorService

提供一系列的schedule方法,使得任务可以延迟或者周期性的执行,对应schedule方法会返回ScheduledFuture以供确认是否执行以及是否要取消。它的实现ScheduledThreadPoolExecutor也支持立即执行由submit提交的任务

仅支持相对延迟时间,比如距离现在5分钟后执行。类似Timer也可以管理延迟任务和周期任务,但是存在一些缺陷:

ScheduledExecutorService的多线程机制可弥补
1:可以使用try-catch-finally对相应执行快处理;2:通过execute执行的方法可以设置UncaughtExceptionHandler来接收未捕获的异常,并作出处理;3:通过submit执行的,将被封装层ExecutionException重新抛出

ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize <0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize  
 
  • corePoolSize、maximumPoolSize:ThreadPoolExecutor会根据这两自动调整线程池的大小,当一个新任务通过execute提交的时候:
    如果当前运行的线程数小于corePoolSize就新建线程;
    如果当前线程数在corePoolSize与maximumPoolSize之间,则只有在队列满的时候才会创建新的线程;
    如果已经达到最大线程数,并且队列都满了,在这种饱和状态下就会执行拒绝策略

    默认情况下,只有新任务到达的时候才会启动线程,可通过prestartCoreThread方法实现事先启动

    1. corePoolSize:默认线程池所需要维护的最小的worker的数量,就算是worker过期了也会保留。如果想要不保留,则需要设置allowCoreThreadTimeOut,此时最小的就是0
    2. maximumPoolSize:线程池最大的线程数。java限制最多为 2^29-1,大约5亿个
  • keepAliveTime、unit:如果当前线程池有超过corePoolSize的线程数,只要有线程空闲时间超过keepAliveTime的设定,就会被终止;unit则是它的时间单位
  • workQueue:任何BlockingQueue都可以使用,基本上有三种
    1. Direct handoffs,直接交付任务。比如 SynchronousQueue,如果没有线程消费,提交任务会失败,当然可以新建一个线程来处理。它适合处理有依赖关系的任务,一般它的maximumPoolSizes会被设置成最大的
    2. Unbounded queues,无界队列。比如LinkedBlockingQueue,这意味着如果有corePoolSize个线程在执行,那么其他的任务都只能等待。它适合于处理任务都是互相独立的,
    3. Bounded queues,有界队列。比如ArrayBlockingQueue,需要考虑队列大小和最大线程数之间的关系,来达到更好的资源利用率和吞吐量
  • threadFactory:没有指定的时候,使用 Executors.defaultThreadFactory
  • RejectedExecutionHandler:通过execute添加的任务,如果Executor已经关闭或者已经饱和了(线程数达到了maximumPoolSize,并且队列满了),就会执行,java提供了4种策略:
    1. AbortPolicy,拒绝的时候抛出运行时异常RejectedExecutionException;
    2. CallerRunsPolicy,如果executor没有关闭,那么由调用execute的线程来执行它;
    3. DiscardPolicy,直接扔掉新的任务;
    4. DiscardOldestPolicy,如果executor没有关闭,那么扔掉队列头部的任务,再次尝试;

ThreadPoolExecutor可自定义beforeExecutor、afterExecutor可以用来添加日志统计、计时、件事或统计信息收集功能,无论run是正常返回还是抛出异常,afterExecutor都会被执行。如果beforeExecutor抛出RuntimeException,任务和afterExecutor都不会被执行。terminated在所有任务都已经完成,并且所有工作者线程关闭后会调用,此时也可以用来执行发送通知、记录日志等等。

如何估算线程池的大小

  1. 计算密集型,通常在拥有 个处理器的系统上,线程池大小设置为 能够实现最优的利用率;

    cpu的个数

  2. I/O密集型或者其它阻塞型的任务,定义 为CPU的个数, 为CPU的利用率, 为等待时间与计算时间的比率,此时线程池的最优大小为

场景说明

将一个网站的业务抽象成如下几块

  • 接收客户端请求与处理请求
  • 页面渲染返回的文本和图片
  • 获取页面的广告

接收请求与处理请求

理论模型

理论上,服务端通过实现约定的接口就可以实现接收请求和处理连续不断的请求过来

ServerSocket socket = new ServerSocket(80);
while(true){
    Socket cOnn= socket.accept();
    handleRequest(conn)
}

缺点:每次只能处理一个请求,新请求到来时,必须等到正在处理的请求处理完成,才能接收新的请求

显示的创建多线程

为每个请求创建新的线程提供服务

ServerSocket socket = new ServerSocket(80);
while(true){
    final Socket cOnn= socket.accept();
    Runnable task = new Runnable(){
        public void run(){
            handleRequest(conn);        
        }
    }
    new Thread(task).start();
}

缺点:

  • 线程的创建和销毁都有一定的开销,延迟对请求的处理;
  • 创建后的线程多于可用处理器的数量,造成线程闲置,这会给垃圾回收带来压力
  • 存活的大量线程竞争CPU资源会产生很多性能开销
  • 系统上对可创建的线程数存在限制

使用线程池

使用 java 自带的Executor框架。

private static final Executor exec = Executors.newFixedThreadPool(100);
...
ServerSocket socket = new ServerSocket(80);
while(true){
    final Socket cOnn= socket.accept();
    Runnable task = new Runnable(){
        public void run(){
            handleRequest(conn);        
        }
    }
    exec.execute(task);
}
...

线程池策略通过实现预估好的线程需求,限制并发任务的数量,重用现有的线程,解决每次创建线程的资源耗尽、竞争过于激烈和频繁创建的问题,也囊括了线程的优势,解耦了任务提交和任务执行。

页面渲染返回的文本和图片

串行渲染

renderText(source);
List imageData = new ArrayList();
for(ImageInfo info:scaForImageInfo(source)){
    imageData.add(info.downloadImage());
}
for(ImageData data:imageData){
    renderImage(data);
}

缺点:图像的下载大部分时间在等待I/O操作执行完成,这期间CPU几乎不做任何工作,使得用户看到最终页面之前要等待过长的时间

并行化

渲染过程可以分成两个部分,1是渲染文本,1是下载图像

private static final ExecutorService exec = Executors.newFixedThreadPool(100);
...
final List infos=scaForImageInfo(source);
Callable> task=new Callable>(){
    public List call(){
        List r = new ArrayList();
        for(ImageInfo info:infos){
            r.add(info.downloadImage());
        }
        return r;
    }
};
Future> future = exec.submit(task);
renderText(source);
try{
    List imageData = future.get();
    for(ImageData data:imageData){
        renderImage(data);
    }
}catch(InterruptedException e){
    Thread.currentThread().interrupt();
    future.cancel(true);
}catche(ExecutionException e){
    throw launderThrowable(e.getCause());
}

使用Callable来返回下载的图片结果,使用future来获得下载的图片,这样将减少用户所需要的等待时间。

缺点:图片的下载很明显时间要比文本要慢,这样的并行化很可能速度可能只提升了1%

并行性能提升

使用CompletionService。

private static final ExecutorService exec;
...
final List infos=scaForImageInfo(source);
CompletionService cService =  new ExecutorCompletionService(exec);
for(final ImageInfo info:infos){
    cService.submit(new Callable(){
        public ImageData call(){
            return info.downloadImage();
        }
    });
}
renderText(source);
try{
    for(int i=0,n=info.size();t f = cService.take();
        ImageData imageData=f.get();
        renderImage(imageData)
    }
}catch(InterruptedException e){
    Thread.currentThread().interrupt();
}catche(ExecutionException e){
    throw launderThrowable(e.getCause());
}

核心思路为为每一幅图像下载都创建一个独立的任务,并在线程池中执行他们,从而将串行的下载过程转换为并行的过程

获取页面的广告

广告展示如果在一定的时间以内没有获取,可以不再展示,并取消超时的任务。

ExecutorService exe = Executors.newFixedThreadPool(3);
        List myTasks = new ArrayList<>();
        for (int i=0;i<3;i++){
          myTasks.add(new MyTask(3-i));
        }

        try {

            List> futures = exe.invokeAll(myTasks, 1, TimeUnit.SECONDS);
            for (int i=0;i 
 

invokeAll方法对于没有完成的任务会被取消,通过CancellationException可以捕获,invokeAll返回的序列顺序和传入的task保持一致。结果如下:

task sleep 3 not execute ,because java.util.concurrent.CancellationException
task sleep 2 not execute ,because java.util.concurrent.CancellationException
task execut 1 s

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 我们


推荐阅读
  • JVM 学习总结(三)——对象存活判定算法的两种实现
    本文介绍了垃圾收集器在回收堆内存前确定对象存活的两种算法:引用计数算法和可达性分析算法。引用计数算法通过计数器判定对象是否存活,虽然简单高效,但无法解决循环引用的问题;可达性分析算法通过判断对象是否可达来确定存活对象,是主流的Java虚拟机内存管理算法。 ... [详细]
  • Android工程师面试准备及设计模式使用场景
    本文介绍了Android工程师面试准备的经验,包括面试流程和重点准备内容。同时,还介绍了建造者模式的使用场景,以及在Android开发中的具体应用。 ... [详细]
  • 重入锁(ReentrantLock)学习及实现原理
    本文介绍了重入锁(ReentrantLock)的学习及实现原理。在学习synchronized的基础上,重入锁提供了更多的灵活性和功能。文章详细介绍了重入锁的特性、使用方法和实现原理,并提供了类图和测试代码供读者参考。重入锁支持重入和公平与非公平两种实现方式,通过对比和分析,读者可以更好地理解和应用重入锁。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • LeetCode笔记:剑指Offer 41. 数据流中的中位数(Java、堆、优先队列、知识点)
    本文介绍了LeetCode剑指Offer 41题的解题思路和代码实现,主要涉及了Java中的优先队列和堆排序的知识点。优先队列是Queue接口的实现,可以对其中的元素进行排序,采用小顶堆的方式进行排序。本文还介绍了Java中queue的offer、poll、add、remove、element、peek等方法的区别和用法。 ... [详细]
  • JavaSE笔试题-接口、抽象类、多态等问题解答
    本文解答了JavaSE笔试题中关于接口、抽象类、多态等问题。包括Math类的取整数方法、接口是否可继承、抽象类是否可实现接口、抽象类是否可继承具体类、抽象类中是否可以有静态main方法等问题。同时介绍了面向对象的特征,以及Java中实现多态的机制。 ... [详细]
  • Tomcat/Jetty为何选择扩展线程池而不是使用JDK原生线程池?
    本文探讨了Tomcat和Jetty选择扩展线程池而不是使用JDK原生线程池的原因。通过比较IO密集型任务和CPU密集型任务的特点,解释了为何Tomcat和Jetty需要扩展线程池来提高并发度和任务处理速度。同时,介绍了JDK原生线程池的工作流程。 ... [详细]
  • JDK源码学习之HashTable(附带面试题)的学习笔记
    本文介绍了JDK源码学习之HashTable(附带面试题)的学习笔记,包括HashTable的定义、数据类型、与HashMap的关系和区别。文章提供了干货,并附带了其他相关主题的学习笔记。 ... [详细]
  • 本文记录了在vue cli 3.x中移除console的一些采坑经验,通过使用uglifyjs-webpack-plugin插件,在vue.config.js中进行相关配置,包括设置minimizer、UglifyJsPlugin和compress等参数,最终成功移除了console。同时,还包括了一些可能出现的报错情况和解决方法。 ... [详细]
  • Android系统源码分析Zygote和SystemServer启动过程详解
    本文详细解析了Android系统源码中Zygote和SystemServer的启动过程。首先介绍了系统framework层启动的内容,帮助理解四大组件的启动和管理过程。接着介绍了AMS、PMS等系统服务的作用和调用方式。然后详细分析了Zygote的启动过程,解释了Zygote在Android启动过程中的决定作用。最后通过时序图展示了整个过程。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • 本文介绍了在Android开发中使用软引用和弱引用的应用。如果一个对象只具有软引用,那么只有在内存不够的情况下才会被回收,可以用来实现内存敏感的高速缓存;而如果一个对象只具有弱引用,不管内存是否足够,都会被垃圾回收器回收。软引用和弱引用还可以与引用队列联合使用,当被引用的对象被回收时,会将引用加入到关联的引用队列中。软引用和弱引用的根本区别在于生命周期的长短,弱引用的对象可能随时被回收,而软引用的对象只有在内存不够时才会被回收。 ... [详细]
  • STL迭代器的种类及其功能介绍
    本文介绍了标准模板库(STL)定义的五种迭代器的种类和功能。通过图表展示了这几种迭代器之间的关系,并详细描述了各个迭代器的功能和使用方法。其中,输入迭代器用于从容器中读取元素,输出迭代器用于向容器中写入元素,正向迭代器是输入迭代器和输出迭代器的组合。本文的目的是帮助读者更好地理解STL迭代器的使用方法和特点。 ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • 本文介绍了Swing组件的用法,重点讲解了图标接口的定义和创建方法。图标接口用来将图标与各种组件相关联,可以是简单的绘画或使用磁盘上的GIF格式图像。文章详细介绍了图标接口的属性和绘制方法,并给出了一个菱形图标的实现示例。该示例可以配置图标的尺寸、颜色和填充状态。 ... [详细]
author-avatar
wen260693700
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有