热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

多线程编程学习六(Java中的阻塞队列).

介绍阻塞队列(BlockingQueue)是指当队列满时,队列会阻塞插入元素的线程,直到队列不满;当队列空时,队列会阻塞获得元素的线程,直到队列变非空。阻塞队列就是生产者用来存放元

介绍

阻塞队列(BlockingQueue)是指当队列满时,队列会阻塞插入元素的线程,直到队列不满;当队列空时,队列会阻塞获得元素的线程,直到队列变非空。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

当线程 插入/获取 动作由于队列 满/空 阻塞后,队列也提供了一些机制去处理,或抛出异常,或返回特殊值,或者线程一直等待...

方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e, timeout, unit)
移除方法 remove(o) poll() take() poll(timeout, unit)
检查方法 element() peek() — 不移除元素 不可用 不可用

tips: 如果是***阻塞队列,则 put 方法永远不会被阻塞;offer 方法始终返回 true。

Java 中的阻塞队列:

ArrayBlockingQueue

ArrayBlockingQueue 是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序,默认情况下不保证线程公平的访问。

通过可重入的独占锁 ReentrantLock 来控制并发,Condition 来实现阻塞。

public class ArrayBlockingQueueTest {

    /**
     * 1. 由于是有界阻塞队列,需要设置初始大小
     * 2. 默认不保证阻塞线程的公平访问,可设置公平性
     */
    private static ArrayBlockingQueue QUEUE = new ArrayBlockingQueue<>(2, true);

    public static void main(String[] args) throws InterruptedException {

        Thread put = new Thread(() -> {
            // 3. 尝试插入元素
            try {
                QUEUE.put("java");
                QUEUE.put("Javascript");
                // 4. 元素已满,会阻塞线程
                QUEUE.put("c++");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        put.start();
        Thread take = new Thread(() -> {
            try {
                // 5. 获取一个元素
                System.out.println(QUEUE.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        take.start();
        // 6 Javascript、c++
        System.out.println(QUEUE.take());
        System.out.println(QUEUE.take());
    }
}

LinkedBlockingQueue

LinkedBlockingQueue 是一个用单向链表实现的有界阻塞队列。此队列的默认和最大长度为 Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。

和 ArrayBlockingQueue 一样,采用 ReentrantLock 来控制并发,不同的是它使用了两个独占锁来控制消费和生产,通过 takeLock 和 putLock 两个锁来控制生产和消费,互不干扰,只要队列未满,生产线程可以一直生产;只要队列不空,消费线程可以一直消费,不会相互因为独占锁而阻塞。

tips:因为使用了双锁,避免并发计算不准确,使用了一个 AtomicInteger 变量统计元素总量。

LinkedBlockingDeque

LinkedBlockingDeque 是一个由双向链表结构组成的有界阻塞队列,可以从队列的两端插入和移出元素。它实现了BlockingDeque接口,多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以 First 单词结尾的方法,表示插入、获取或移除双端队列的第一个元素。以 Last 单词结尾的方法,表示插入、获取或移除双端队列的最后一个元素。

LinkedBlockingDeque 的 Node 实现多了指向前一个节点的变量 prev,以此实现双向队列。并发控制上和 ArrayBlockingQueue 类似,采用单个 ReentrantLock 来控制并发。因为双端队列头尾都可以消费和生产,所以使用了一个共享锁。

双向阻塞队列可以运用在“工作窃取”模式中。

public class LinkedBlockingDequeTest {

    private static LinkedBlockingDeque DEQUE = new LinkedBlockingDeque<>(2);

    public static void main(String[] args) {
        DEQUE.addFirst("java");
        DEQUE.addFirst("c++");
        // java
        System.out.println(DEQUE.peekLast());
        // java
        System.out.println(DEQUE.pollLast());
        DEQUE.addLast("php");
        // c++
        System.out.println(DEQUE.pollFirst());
    }
}

tips: take() 方法调用的是 takeFirst(),使用时候需注意。

PriorityBlockingQueue

PriorityBlockingQueue 是一个底层由数组实现的***阻塞队列,并带有排序功能。由于是***队列,所以插入永远不会被阻塞。默认情况下元素采取自然顺序升序排列。也可以自定义类实现 compareTo()方法来指定元素排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序。

底层同样采用 ReentrantLock 来控制并发,由于只有获取会阻塞,所以只采用一个Condition(只通知消费)来实现。

public class PriorityBlockingQueueTest {

    private static PriorityBlockingQueue QUEUE = new PriorityBlockingQueue<>();

    public static void main(String[] args) {
        QUEUE.add("java");
        QUEUE.add("Javascript");
        QUEUE.add("c++");
        QUEUE.add("python");
        QUEUE.add("php");
        Iterator it = QUEUE.iterator();
        while (it.hasNext()) {
            // c++  Javascript  java  python  php
            // 同优先级不保证排序顺序
            System.out.print(it.next() + "  ");
        }
    }
}

DelayQueue

DelayQueue 是一个支持延时获取元素的***阻塞队列。队列使用 PriorityQueue 来实现。队列中的元素必须实现 Delayed 接口,元素按延迟优先级排序,延迟时间短的排在前面,只有在延迟期满时才能从队列中提取元素。

和 PriorityBlockingQueue 相似,底层也是数组,采用一个 ReentrantLock 来控制并发。

应用场景:

  1. 缓存系统的设计:可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询 DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了。
  2. 定时任务调度:使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从 DelayQueue 中获取到任务就开始执行,比如 TimerQueue 就是使用 DelayQueue 实现的。
public class DelayElement implements Delayed, Runnable {

    private static final AtomicLong SEQUENCER = new AtomicLong();
    /**
     * 标识元素先后顺序
     */
    private final long sequenceNumber;
    /**
     * 延迟时间,单位纳秒
     */
    private long time;

    public DelayElement(long time) {
        this.time = System.nanoTime() + time;
        this.sequenceNumber = SEQUENCER.getAndIncrement();
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(time - System.nanoTime(), NANOSECONDS);
    }

    @Override
    public int compareTo(Delayed other) {
        // compare zero if same object
        if (other == this) {
            return 0;
        }
        if (other instanceof DelayElement) {
            DelayElement x = (DelayElement) other;
            long diff = time - x.time;
            if (diff <0) {
                return -1;
            } else if (diff > 0) {
                return 1;
            } else if (sequenceNumber  0) ? 1 : 0;
    }

    @Override
    public void run() {
        System.out.println("sequenceNumber" + sequenceNumber);
    }

    @Override
    public String toString() {
        return "DelayElement{" + "sequenceNumber=" + sequenceNumber + ", time=" + time + '}';
    }
}
public class DelayQueueTest {

    private static DelayQueue QUEUE = new DelayQueue<>();

    public static void main(String[] args) {
        // 1. 添加 10 个参数
        for (int i = 1; i <10; i++) {
            // 2. 5 秒内随机延迟
            int nextInt = new Random().nextInt(5);
            long cOnvert= TimeUnit.NANOSECONDS.convert(nextInt, TimeUnit.SECONDS);
            QUEUE.offer(new DelayElement(convert));
        }
        // 3. 查询元素排序 —— 延迟短的排在前面
        Iterator iterator = QUEUE.iterator();
        while (iterator.hasNext()) {
            System.out.println(iterator.next());
        }
        // 4. 可观察到元素延迟输出
        while (!QUEUE.isEmpty()) {
            Thread thread = new Thread(QUEUE.poll());
            thread.start();
        }
    }
}

LinkedTransferQueue

LinkedTransferQueue是一个由链表结构组成的***阻塞TransferQueue队列。

并发控制上采用了大量的 CAS 操作,没有使用锁。

相对于其他阻塞队列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。

  1. transfer : Transfers the element to a consumer, waiting if necessary to do so. 存入的元素必须等到有消费者消费才返回。
  2. tryTransfer:Transfers the element to a waiting consumer immediately, if possible. 如果有消费者正在等待消费元素,则把传入的元素传给消费者。否则立即返回 false,不用等到消费。

SynchronousQueue

SynchronousQueue 是一个不存储元素的阻塞队列。每一个 put 操作必须等待一个 take 操作,否则继续 put 操作会被阻塞。

SynchronousQueue 默认情况下线程采用非公平性策略访问队列,未使用锁,全部通过 CAS 操作来实现并发,吞吐量非常高,高于 LinkedBlockingQueue 和 ArrayBlockingQueue,非常适合用来处理一些高效的传递性场景。Executors.newCachedThreadPool() 就使用了 SynchronousQueue 进行任务传递。

public class SynchronousQueueTest {

    private static class SynchronousQueueProducer implements Runnable {

        private BlockingQueue blockingQueue;

        private SynchronousQueueProducer(BlockingQueue queue) {
            this.blockingQueue = queue;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    String data = UUID.randomUUID().toString();
                    System.out.println(Thread.currentThread().getName() + " Put: " + data);
                    blockingQueue.put(data);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    private static class SynchronousQueueConsumer implements Runnable {

        private BlockingQueue blockingQueue;

        private SynchronousQueueConsumer(BlockingQueue queue) {
            this.blockingQueue = queue;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    System.out.println(Thread.currentThread().getName() + " take(): " + blockingQueue.take());
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public static void main(String[] args) {

        final BlockingQueue synchrOnousQueue= new SynchronousQueue<>();
        SynchronousQueueProducer queueProducer = new SynchronousQueueProducer(synchronousQueue);
        new Thread(queueProducer, "producer - 1").start();
        SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer(synchronousQueue);
        new Thread(queueConsumer1, "consumer — 1").start();
        SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer(synchronousQueue);
        new Thread(queueConsumer2, "consumer — 2").start();
    }
}

 
 

  1. 参考书籍:《Java 并发编程的艺术》
  2. 参考博文:https://www.cnblogs.com/konck/p/9473677.html

推荐阅读
  • Tomcat/Jetty为何选择扩展线程池而不是使用JDK原生线程池?
    本文探讨了Tomcat和Jetty选择扩展线程池而不是使用JDK原生线程池的原因。通过比较IO密集型任务和CPU密集型任务的特点,解释了为何Tomcat和Jetty需要扩展线程池来提高并发度和任务处理速度。同时,介绍了JDK原生线程池的工作流程。 ... [详细]
  • 本文讨论了clone的fork与pthread_create创建线程的不同之处。进程是一个指令执行流及其执行环境,其执行环境是一个系统资源的集合。在调用系统调用fork创建一个进程时,子进程只是完全复制父进程的资源,这样得到的子进程独立于父进程,具有良好的并发性。但是二者之间的通讯需要通过专门的通讯机制,另外通过fork创建子进程系统开销很大。因此,在某些情况下,使用clone或pthread_create创建线程可能更加高效。 ... [详细]
  • 本文介绍了Windows操作系统的版本及其特点,包括Windows 7系统的6个版本:Starter、Home Basic、Home Premium、Professional、Enterprise、Ultimate。Windows操作系统是微软公司研发的一套操作系统,具有人机操作性优异、支持的应用软件较多、对硬件支持良好等优点。Windows 7 Starter是功能最少的版本,缺乏Aero特效功能,没有64位支持,最初设计不能同时运行三个以上应用程序。 ... [详细]
  • 本文介绍了操作系统的定义和功能,包括操作系统的本质、用户界面以及系统调用的分类。同时还介绍了进程和线程的区别,包括进程和线程的定义和作用。 ... [详细]
  • MySQL中的MVVC多版本并发控制机制的应用及实现
    本文介绍了MySQL中MVCC的应用及实现机制。MVCC是一种提高并发性能的技术,通过对事务内读取的内存进行处理,避免写操作堵塞读操作的并发问题。与其他数据库系统的MVCC实现机制不尽相同,MySQL的MVCC是在undolog中实现的。通过undolog可以找回数据的历史版本,提供给用户读取或在回滚时覆盖数据页上的数据。MySQL的大多数事务型存储引擎都实现了MVCC,但各自的实现机制有所不同。 ... [详细]
  • 本文讨论了微软的STL容器类是否线程安全。根据MSDN的回答,STL容器类包括vector、deque、list、queue、stack、priority_queue、valarray、map、hash_map、multimap、hash_multimap、set、hash_set、multiset、hash_multiset、basic_string和bitset。对于单个对象来说,多个线程同时读取是安全的。但如果一个线程正在写入一个对象,那么所有的读写操作都需要进行同步。 ... [详细]
  • C++语言入门:数组的基本知识和应用领域
    本文介绍了C++语言的基本知识和应用领域,包括C++语言与Python语言的区别、C++语言的结构化特点、关键字和控制语句的使用、运算符的种类和表达式的灵活性、各种数据类型的运算以及指针概念的引入。同时,还探讨了C++语言在代码效率方面的优势和与汇编语言的比较。对于想要学习C++语言的初学者来说,本文提供了一个简洁而全面的入门指南。 ... [详细]
  • 本文介绍了H5游戏性能优化和调试技巧,包括从问题表象出发进行优化、排除外部问题导致的卡顿、帧率设定、减少drawcall的方法、UI优化和图集渲染等八个理念。对于游戏程序员来说,解决游戏性能问题是一个关键的任务,本文提供了一些有用的参考价值。摘要长度为183字。 ... [详细]
  • Spring框架《一》简介
    Spring框架《一》1.Spring概述1.1简介1.2Spring模板二、IOC容器和Bean1.IOC和DI简介2.三种通过类型获取bean3.给bean的属性赋值3.1依赖 ... [详细]
  • 深入理解Java虚拟机的并发编程与性能优化
    本文主要介绍了Java内存模型与线程的相关概念,探讨了并发编程在服务端应用中的重要性。同时,介绍了Java语言和虚拟机提供的工具,帮助开发人员处理并发方面的问题,提高程序的并发能力和性能优化。文章指出,充分利用计算机处理器的能力和协调线程之间的并发操作是提高服务端程序性能的关键。 ... [详细]
  • 本文介绍了在Oracle数据库中创建序列时如何选择cache或nocache参数。cache参数可以提高序列的存取速度,但可能会导致序列丢失;nocache参数可以避免序列丢失,但在高并发访问时可能导致性能问题。文章详细解释了两者的区别和使用场景。 ... [详细]
  • HashMap的相关问题及其底层数据结构和操作流程
    本文介绍了关于HashMap的相关问题,包括其底层数据结构、JDK1.7和JDK1.8的差异、红黑树的使用、扩容和树化的条件、退化为链表的情况、索引的计算方法、hashcode和hash()方法的作用、数组容量的选择、Put方法的流程以及并发问题下的操作。文章还提到了扩容死链和数据错乱的问题,并探讨了key的设计要求。对于对Java面试中的HashMap问题感兴趣的读者,本文将为您提供一些有用的技术和经验。 ... [详细]
  • STL迭代器的种类及其功能介绍
    本文介绍了标准模板库(STL)定义的五种迭代器的种类和功能。通过图表展示了这几种迭代器之间的关系,并详细描述了各个迭代器的功能和使用方法。其中,输入迭代器用于从容器中读取元素,输出迭代器用于向容器中写入元素,正向迭代器是输入迭代器和输出迭代器的组合。本文的目的是帮助读者更好地理解STL迭代器的使用方法和特点。 ... [详细]
  • 本文介绍了一道经典的状态压缩题目——关灯问题2,并提供了解决该问题的算法思路。通过使用二进制表示灯的状态,并枚举所有可能的状态,可以求解出最少按按钮的次数,从而将所有灯关掉。本文还对状压和位运算进行了解释,并指出了该方法的适用性和局限性。 ... [详细]
  • Jboss的EJB部署描述符standardjaws.xml配置步骤详解
    本文详细介绍了Jboss的EJB部署描述符standardjaws.xml的配置步骤,包括映射CMP实体EJB、数据源连接池的获取以及数据库配置等内容。 ... [详细]
author-avatar
小歆歆
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有