热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

【源码篇】LinkedBlockingQueue源码超详细解读

文章目录⭐️导读🚩基本构造📖核心源码解读put方法polltake方法drainTo方法remove方法⚠️注意事项🚗应用场景⭐️导


文章目录

    • ⭐️导读
    • 🚩基本构造
    • 📖核心源码解读
      • `put`方法
      • `poll/take`方法
      • `drainTo`方法
      • `remove`方法
    • ⚠️注意事项
    • 🚗应用场景


⭐️导读

LinkedBlockingQueue和它的名字一样,它是一个由链表实现的有界阻塞队列,该队列按照先进先出的逻辑对队列进行排序。该队列使用了两把锁分别控制放入和取出,大大提高了并发效率,下面将对它的源码进行详细解读。


🚩基本构造

image-20220421143744032

阻塞队列本质上也可算是集合,因此最上层也实现了Collection接口,并提供一些集合的常用方法,AbstractQueue是提供Queue的基本实现,我们重点关注QueueBlockingQueue提供的api。

Queue类提供最基本的API

// 插入,失败返回异常
boolean add(E e);
// 插入,失败返回false
boolean offer(E e);
// 移除队列头部元素,队列为空异常
E remove();
// 移除队列头部元素,队列为空返回null
E poll();
// 查看头部元素,队列为空异常
E element();
// 查看头部元素,队列为空返回null
E peek();

BlockingQueue提供阻塞操作相关API

// 将元素插入队列,如果队列没有可用空间则等待
void put(E e);
// 将元素插入队列,如果队列没用可用空间则等待设定的时间
boolean offer(E e, long timeout, TimeUnit unit);
// 移除头部元素,如果没有可用则等待
E take();
// 移除头部元素,如果没有可用则等待设定的时间
E poll(long timeout, TimeUnit unit);
// 返回剩余可插入的元素数量
int remainingCapacity();
// 从队列中取出全部的元素并插入到指定集合中
int drainTo(Collection<? super E> c);
// 从队列中取出指定数量的元素并插入到指定集合中
int drainTo(Collection<? super E> c, int maxElements);

&#x1f4d6;核心源码解读

LinkedBlockingQueue分别使用了一个读锁和一个写锁来控制并发&#xff0c;并使用Condition来控制他们的执行过程

// 读锁
private final ReentrantLock takeLock &#61; new ReentrantLock();
// 队列不为空的Condition
private final Condition notEmpty &#61; takeLock.newCondition();
// 写锁
private final ReentrantLock putLock &#61; new ReentrantLock();
// 队列没有满的Condition
private final Condition notFull &#61; putLock.newCondition();

put方法

将元素插入队列&#xff0c;如果队列没有可用空间则等待

public void put(E e) throws InterruptedException {// 如果元素是null抛出异常if (e &#61;&#61; null) throw new NullPointerException();int c &#61; -1;Node<E> node &#61; new Node<E>(e);// 使用写锁final ReentrantLock putLock &#61; this.putLock;// 元素数量计数器final AtomicInteger count &#61; this.count;// 获得锁putLock.lockInterruptibly();try {// 判断队列是否已满,如果已满写线程等待while (count.get() &#61;&#61; capacity) {notFull.await();}// 将尾部元素指向当前元素enqueue(node);// 元素数量&#43;1并返回操作前的数量c &#61; count.getAndIncrement();// 如果元素数量没有满&#xff0c;则唤醒notFull.wait()&#xff0c;表示当前队列未满if (c &#43; 1 < capacity)notFull.signal();} finally {// 解锁putLock.unlock();}if (c &#61;&#61; 0)// 如果操作前元素数量为0&#xff0c;则通知写线程signalNotEmpty();
}

此处signalNotEmpty();就是通知被阻塞的读线程&#xff08;如take/poll方法&#xff09;&#xff0c;队列里有数据了&#xff0c;赶紧消费


poll/take方法

poll 查看头部元素&#xff0c;队列为空异常

take 移除并返回头部元素&#xff0c;如果没有可用则等待

public E poll(long timeout, TimeUnit unit) throws InterruptedException {E x &#61; null;int c &#61; -1;// 等待纳秒数long nanos &#61; unit.toNanos(timeout);final AtomicInteger count &#61; this.count;// 使用读锁final ReentrantLock takeLock &#61; this.takeLock;// 加锁takeLock.lockInterruptibly();try {// 如果当前队列中没有元素则等待指定时长while (count.get() &#61;&#61; 0) {if (nanos <&#61; 0)// 等待超时&#xff0c;直接返回nullreturn null;nanos &#61; notEmpty.awaitNanos(nanos);}// 移除队列头中的节点并返回x &#61; dequeue();// 元素-1c &#61; count.getAndDecrement();// 如果队列中有数据&#xff0c;则通知其他线程该队列不为空if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}// 通知其他线程表示该队列未满if (c &#61;&#61; capacity)signalNotFull();return x;
}

此处signalNotFull();是通知阻塞的写入线程&#xff08;如put/offer&#xff09;&#xff0c;表示队列没满&#xff0c;可以写入

take逻辑与poll类似&#xff0c;只是等待策略不相同&#xff0c;take方法如下

public E take() throws InterruptedException {E x;int c &#61; -1;final AtomicInteger count &#61; this.count;final ReentrantLock takeLock &#61; this.takeLock;takeLock.lockInterruptibly();try {// 等待逻辑与poll不一样&#xff0c;此处表示如果没有数据则一直等待while (count.get() &#61;&#61; 0) {notEmpty.await();}x &#61; dequeue();c &#61; count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c &#61;&#61; capacity)signalNotFull();return x;
}

drainTo方法

从队列中取出全部的元素并插入到指定集合中

public int drainTo(Collection<? super E> c, int maxElements) {if (c &#61;&#61; null)throw new NullPointerException();if (c &#61;&#61; this)throw new IllegalArgumentException();if (maxElements <&#61; 0)return 0;boolean signalNotFull &#61; false;// 使用读锁final ReentrantLock takeLock &#61; this.takeLock;takeLock.lock();try {// 读取数量不能大于剩余元素数量int n &#61; Math.min(maxElements, count.get());// 从头部开始读取&#xff0c;h表示当前头节点Node<E> h &#61; head;int i &#61; 0;try {while (i < n) {Node<E> p &#61; h.next;c.add(p.item);p.item &#61; null;h.next &#61; h;h &#61; p;&#43;&#43;i;}return n;} finally {if (i > 0) {// 有读取出来元素&#xff0c;则更新最新头节点head &#61; h;// 队列是否还有空间signalNotFull &#61; (count.getAndAdd(-i) &#61;&#61; capacity);}}} finally {takeLock.unlock();if (signalNotFull)signalNotFull();}
}

remove方法

public boolean remove(Object o) {if (o &#61;&#61; null) return false;// 加锁fullyLock();try {// 遍历全部元素for (Node<E> trail &#61; head, p &#61; trail.next;p !&#61; null;trail &#61; p, p &#61; p.next) {if (o.equals(p.item)) {// 移除链接unlink(p, trail);return true;}}return false;} finally {fullyUnlock();}
}

注意一下此处的加锁逻辑

void fullyLock() {putLock.lock();takeLock.lock();
}

可以看到&#xff0c;remove方法会将读写锁都上锁&#xff0c;并且会扫描整个链表&#xff0c;时间复杂度为O(n)&#43;悲观锁

一般情况下不建议使用remove方法&#xff0c;该方法性能较差&#xff0c;会阻塞所有核心逻辑。


⚠️注意事项

使用LinkedBlockingQueue时要额外注意影响性能的方法

如&#xff1a;remove/contains/toArray/toString/clear

以上方法的时间复杂度均为O(n)&#43;悲观锁&#xff0c;如非必要最好不要使用


&#x1f697;应用场景

LinkedBlockingQueue本质上就是个内存级队列&#xff0c;它同样可以达到削峰填谷的目的&#xff0c;使用得当可以给系统减轻不小的压力。


  1. 调度外部服务&#xff0c;防止调用过于频繁&#xff0c;可以放入队列中&#xff0c;等待消费&#xff0c;并用drainTo归集然后统一请求。
  2. 令牌桶&#xff0c;可以通过产生令牌和分发令牌的方式控制业务/接口最大并发。
  3. 使用对象池化技术来减轻jvm回收的压力&#xff0c;将池化对象放入队列中。

下面使用LinkedBlockingQueue实现一个对象池&#xff0c;使用对象池可以防止频繁创建/回收对象&#xff0c;减少gc次数&#xff0c;池化对象长期存储在老年代中&#xff0c;对象数量可控

ResourcePool 对象池抽象类&#xff0c;实现该类就能初始化一个对象池

public abstract class ResourcePool<T extends ResourceModel> {private final LinkedBlockingQueue<T> queue;public ResourcePool(int poolMax) {queue &#61; new LinkedBlockingQueue<>(poolMax);for (int i &#61; 0; i < poolMax; i&#43;&#43;) {T model &#61; createResource();model.pool &#61; this;model.invalid &#61; true;queue.add(model);}}public T getResource() {try {do {T t &#61; queue.take();if (t.invalid) {t.invalid &#61; false;return open(t);}} while (true);} catch (InterruptedException e) {throw new RuntimeException(e);}}protected T open(T t) {return t;}protected abstract T createResource();public void free(T t) {if (!t.invalid) {t.invalid &#61; true;queue.offer(close(t));}}protected T close(T t) {return t;}}

ResourceModel抽象对象

public abstract class ResourceModel implements Closeable {ResourcePool pool;boolean invalid;&#64;Overridepublic void close() throws IOException {pool.free(this);}
}

TestModel对象实例

&#64;Setter
public class TestModel extends ResourceModel {public TestModel(String name, int age) {this.name &#61; name;this.age &#61; age;}private String name;private int age;}

TestPool对象池实例

public class TestPool extends ResourcePool<TestModel> {public TestPool(int poolMax) {super(poolMax);}// 创建对象实例&#64;Overrideprotected TestModel createResource() {return new TestModel("", 0);}// 获得对象的前置操作&#64;Overrideprotected TestModel open(TestModel testModel) {return super.open(testModel);}// 对象回收后操作&#64;Overrideprotected TestModel close(TestModel testModel) {testModel.setAge(0);testModel.setName("");return super.close(testModel);}
}

使用方式1

public static void main(String[] args) throws IOException {TestPool testPool &#61; new TestPool(30);// 从池中获得一个对象TestModel model &#61; testPool.getResource();// 回收对象model.close();
}

使用方式2

public static void main(String[] args) throws IOException {TestPool testPool &#61; new TestPool(30);try(TestModel model &#61; testPool.getResource()) {}
}

推荐阅读
  • 重入锁(ReentrantLock)学习及实现原理
    本文介绍了重入锁(ReentrantLock)的学习及实现原理。在学习synchronized的基础上,重入锁提供了更多的灵活性和功能。文章详细介绍了重入锁的特性、使用方法和实现原理,并提供了类图和测试代码供读者参考。重入锁支持重入和公平与非公平两种实现方式,通过对比和分析,读者可以更好地理解和应用重入锁。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • LeetCode笔记:剑指Offer 41. 数据流中的中位数(Java、堆、优先队列、知识点)
    本文介绍了LeetCode剑指Offer 41题的解题思路和代码实现,主要涉及了Java中的优先队列和堆排序的知识点。优先队列是Queue接口的实现,可以对其中的元素进行排序,采用小顶堆的方式进行排序。本文还介绍了Java中queue的offer、poll、add、remove、element、peek等方法的区别和用法。 ... [详细]
  • JVM 学习总结(三)——对象存活判定算法的两种实现
    本文介绍了垃圾收集器在回收堆内存前确定对象存活的两种算法:引用计数算法和可达性分析算法。引用计数算法通过计数器判定对象是否存活,虽然简单高效,但无法解决循环引用的问题;可达性分析算法通过判断对象是否可达来确定存活对象,是主流的Java虚拟机内存管理算法。 ... [详细]
  • Tomcat/Jetty为何选择扩展线程池而不是使用JDK原生线程池?
    本文探讨了Tomcat和Jetty选择扩展线程池而不是使用JDK原生线程池的原因。通过比较IO密集型任务和CPU密集型任务的特点,解释了为何Tomcat和Jetty需要扩展线程池来提高并发度和任务处理速度。同时,介绍了JDK原生线程池的工作流程。 ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • Whatsthedifferencebetweento_aandto_ary?to_a和to_ary有什么区别? ... [详细]
  • STL迭代器的种类及其功能介绍
    本文介绍了标准模板库(STL)定义的五种迭代器的种类和功能。通过图表展示了这几种迭代器之间的关系,并详细描述了各个迭代器的功能和使用方法。其中,输入迭代器用于从容器中读取元素,输出迭代器用于向容器中写入元素,正向迭代器是输入迭代器和输出迭代器的组合。本文的目的是帮助读者更好地理解STL迭代器的使用方法和特点。 ... [详细]
  • OO第一单元自白:简单多项式导函数的设计与bug分析
    本文介绍了作者在学习OO的第一次作业中所遇到的问题及其解决方案。作者通过建立Multinomial和Monomial两个类来实现多项式和单项式,并通过append方法将单项式组合为多项式,并在此过程中合并同类项。作者还介绍了单项式和多项式的求导方法,并解释了如何利用正则表达式提取各个单项式并进行求导。同时,作者还对自己在输入合法性判断上的不足进行了bug分析,指出了自己在处理指数情况时出现的问题,并总结了被hack的原因。 ... [详细]
  • ScrollView嵌套Collectionview无痕衔接四向滚动,支持自定义TitleView
    本文介绍了如何实现ScrollView嵌套Collectionview无痕衔接四向滚动,并支持自定义TitleView。通过使用MainScrollView作为最底层,headView作为上部分,TitleView作为中间部分,Collectionview作为下面部分,实现了滚动效果。同时还介绍了使用runtime拦截_notifyDidScroll方法来实现滚动代理的方法。具体实现代码可以在github地址中找到。 ... [详细]
  • 本文介绍了在iOS开发中使用UITextField实现字符限制的方法,包括利用代理方法和使用BNTextField-Limit库的实现策略。通过这些方法,开发者可以方便地限制UITextField的字符个数和输入规则。 ... [详细]
  • 本文介绍了操作系统的定义和功能,包括操作系统的本质、用户界面以及系统调用的分类。同时还介绍了进程和线程的区别,包括进程和线程的定义和作用。 ... [详细]
  • Java 11相对于Java 8,OptaPlanner性能提升有多大?
    本文通过基准测试比较了Java 11和Java 8对OptaPlanner的性能提升。测试结果表明,在相同的硬件环境下,Java 11相对于Java 8在垃圾回收方面表现更好,从而提升了OptaPlanner的性能。 ... [详细]
  • 本文整理了Java面试中常见的问题及相关概念的解析,包括HashMap中为什么重写equals还要重写hashcode、map的分类和常见情况、final关键字的用法、Synchronized和lock的区别、volatile的介绍、Syncronized锁的作用、构造函数和构造函数重载的概念、方法覆盖和方法重载的区别、反射获取和设置对象私有字段的值的方法、通过反射创建对象的方式以及内部类的详解。 ... [详细]
  • MySQL数据库锁机制及其应用(数据库锁的概念)
    本文介绍了MySQL数据库锁机制及其应用。数据库锁是计算机协调多个进程或线程并发访问某一资源的机制,在数据库中,数据是一种供许多用户共享的资源,如何保证数据并发访问的一致性和有效性是数据库必须解决的问题。MySQL的锁机制相对简单,不同的存储引擎支持不同的锁机制,主要包括表级锁、行级锁和页面锁。本文详细介绍了MySQL表级锁的锁模式和特点,以及行级锁和页面锁的特点和应用场景。同时还讨论了锁冲突对数据库并发访问性能的影响。 ... [详细]
author-avatar
上海悠u7_
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有