热门标签 | HotTags
当前位置:  开发笔记 > 后端 > 正文

并发和多线程(二十)–LinkedBlockingQueue源码解析

目录类定义QueueBlockingQueue成员变量构造函数offer()put()take()enqueueanddequeuepeek()阻塞队列在日常开发中直接使用比较少,

目录



  • 类定义

    • Queue

    • BlockingQueue



  • 成员变量

  • 构造函数

  • offer()

  • put()

  • take()

  • enqueue and dequeue

  • peek()


阻塞队列在日常开发中直接使用比较少,但是在很多工具类或者框架中有很多应用,例如线程池,消息队列等。所以,深入了解阻塞队列也是很有必要的。所以这里来了解一下LinkedBlockingQueue的相关源码,从命名可以看出来是由链表实现的数据结构。


类定义

public class LinkedBlockingQueue extends AbstractQueue
implements BlockingQueue, java.io.Serializable {
}

从上面可以看到继承Queue,实现BlockingQueue,我们介绍一下两个类里面方法的作用,为了方便记忆和对比放到表格里进行展示。


Queue






























作用方法1方法2区别
新增add()offer()add()队列满的时候抛出异常,offer()队列满的时候返回false
查看并删除remove()poll()remove()队列为空的时候抛出异常,poll()队列为空的时候返回null
查看不删除element()peek()element()队列为空的时候抛出异常,peek()队列为空的时候返回null

BlockingQueue

BlockingQueue顾名思义带有阻塞的队列,方法有所区别,下面方法包含了Queue,因为属于继承关系,下面表格方法名用序号代替。







































作用方法1方法2方法3方法4区别
新增add()offer()put()offer(E e, long timeout, TimeUnit unit)队列满的时候,1和2作用和queue相同,3会一直阻塞,4阻塞一段时间,返回false
查看并删除remove()poll()take()poll(long timeout, TimeUnit unit)队列为空,1和2没有变化,3会一直阻塞,4会阻塞一段时间,返回null
查看不删除element()peek()队列为空,1和2没有变化

成员变量

//链表的容量,默认Integer.MAX_VALUE
private final int capacity;
//当前存在元素数量
private final AtomicInteger count = new AtomicInteger();
//链表的head节点
transient Node head;
//链表的tail节点
private transient Node last;
//主要用于take, poll等方法的加锁
private final ReentrantLock takeLock = new ReentrantLock();
//主要用在取值的阻塞场景
private final Condition notEmpty = takeLock.newCondition();
//主要用于put, offer等方法的加锁
private final ReentrantLock putLock = new ReentrantLock();
//主要用在新增的阻塞场景
private final Condition notFull = putLock.newCondition();

//Node比较简单,一个item,还有指向下个节点,也就是单向链表
static class Node {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node next;
Node(E x) { item = x; }
}

构造函数

//默认队列存储的元素最大为Integer.MAX_VALUE
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

//自定义capacity,初始化head、tail
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node(null);
}

public LinkedBlockingQueue(Collection c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
//加锁,因为是添加,肯定是putLock
putLock.lock();
try {
int n = 0;
//遍历集合,每次生成一个node,添加到链表尾部
for (E e : c) {
if (e == null)
throw new NullPointerException();
//每次判断新增的节点是否超过capacity,如果是,抛出异常
if (n == capacity)
throw new IllegalStateException("Queue full");
//将节点添加到队列tail
enqueue(new Node(e));
++n;
}
//设置当前元素个数count
count.set(n);
//finally解锁
} finally {
putLock.unlock();
}
}

offer()

public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
//初始设置为-1,c <0,表示新增失败
int c = -1;
Node node = new Node(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() enqueue(node);
c = count.getAndIncrement();
if (c + 1 notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}

流程:



  1. 如果e为空,抛出异常。

  2. 如果当前队列已满,返回false。

  3. 将e封装成一个新的节点,加锁,putLock。

  4. 再次判断队列元素数量
  5. CAS将count+1,注意这里调用的是getAndIncrement返回的是+1之前的值。如果队列没满,唤醒某个某个因为添加而阻塞的线程。

  6. finally解锁,如果c == 0,加锁takeLock,唤醒继续添加。

  7. 返回 c >= 0。


put()

相对于offer(),put的代码会判断当前队列是否满了,如果满了,通过Condition阻塞,其他没啥区别。

在这里插入图片描述


take()

public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//注意这里c是先获取,后-1的
if (c == capacity)
signalNotFull();
return x;
}

流程:



  1. 加锁,takeLock。

  2. 如果当前队列为空,直接通过notEmpty阻塞,等待被唤醒。

  3. 取出第一个元素,并删除元素。

  4. 如果c > 1,表示队列还有元素,唤醒别的线程获取。

  5. finally解锁,如果c == capacity,表示队列没满,加锁takeLock,唤醒继续添加。

  6. 返回 x。


enqueue and dequeue

private void enqueue(Node node) {
last = last.next = node;
}
private E dequeue() {
Node h = head;
Node first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}

这里统一讲一下在链表中添加和删除数据的流程,特别是dequeue(),我刚看第一眼的时候有点蒙蔽的,下面举个栗子。

BlockingQueue queue = new LinkedBlockingDeque<>();
queue.offer(1);
queue.offer(2);
queue.offer(3);
Integer take = queue.take();
System.out.println(take);

在这里插入图片描述

这里把每一步都画出来了,还是比较好理解的。其余方法的逻辑都比较相似,下面简单说一下。


peek()

peek()和take()的代码差不多,只是不会删除元素,take()通过dequeue(),而peek()通过一句代码Node first = head.next;获得该节点的数据然后返回。



推荐阅读
  • Tomcat/Jetty为何选择扩展线程池而不是使用JDK原生线程池?
    本文探讨了Tomcat和Jetty选择扩展线程池而不是使用JDK原生线程池的原因。通过比较IO密集型任务和CPU密集型任务的特点,解释了为何Tomcat和Jetty需要扩展线程池来提高并发度和任务处理速度。同时,介绍了JDK原生线程池的工作流程。 ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • Android工程师面试准备及设计模式使用场景
    本文介绍了Android工程师面试准备的经验,包括面试流程和重点准备内容。同时,还介绍了建造者模式的使用场景,以及在Android开发中的具体应用。 ... [详细]
  • 本文介绍了操作系统的定义和功能,包括操作系统的本质、用户界面以及系统调用的分类。同时还介绍了进程和线程的区别,包括进程和线程的定义和作用。 ... [详细]
  • 深入解析Linux下的I/O多路转接epoll技术
    本文深入解析了Linux下的I/O多路转接epoll技术,介绍了select和poll函数的问题,以及epoll函数的设计和优点。同时讲解了epoll函数的使用方法,包括epoll_create和epoll_ctl两个系统调用。 ... [详细]
  • LeetCode笔记:剑指Offer 41. 数据流中的中位数(Java、堆、优先队列、知识点)
    本文介绍了LeetCode剑指Offer 41题的解题思路和代码实现,主要涉及了Java中的优先队列和堆排序的知识点。优先队列是Queue接口的实现,可以对其中的元素进行排序,采用小顶堆的方式进行排序。本文还介绍了Java中queue的offer、poll、add、remove、element、peek等方法的区别和用法。 ... [详细]
  • 本文介绍了Java高并发程序设计中线程安全的概念与synchronized关键字的使用。通过一个计数器的例子,演示了多线程同时对变量进行累加操作时可能出现的问题。最终值会小于预期的原因是因为两个线程同时对变量进行写入时,其中一个线程的结果会覆盖另一个线程的结果。为了解决这个问题,可以使用synchronized关键字来保证线程安全。 ... [详细]
  • 本文介绍了栈和队列的区别及其特点。栈是一种先进后出的线性表,只能在表的一端进行插入和删除操作;队列是一种先进先出的线性表,只能在表的一端进行插入和在另一端进行删除操作。栈和队列是两种广泛使用的线性数据结构,它们的基本操作具有特殊性。栈的遍历需要遍历整个栈才能取出数据,并需要为数据开辟临时空间,而队列基于地址指针进行遍历,可以从头或尾部开始遍历,但不能同时遍历,且无需开辟临时空间。栈和队列在程序设计中具有重要应用。 ... [详细]
  • 重入锁(ReentrantLock)学习及实现原理
    本文介绍了重入锁(ReentrantLock)的学习及实现原理。在学习synchronized的基础上,重入锁提供了更多的灵活性和功能。文章详细介绍了重入锁的特性、使用方法和实现原理,并提供了类图和测试代码供读者参考。重入锁支持重入和公平与非公平两种实现方式,通过对比和分析,读者可以更好地理解和应用重入锁。 ... [详细]
  • 篇首语:本文由编程笔记#小编为大家整理,主要介绍了软件测试知识点之数据库压力测试方法小结相关的知识,希望对你有一定的参考价值。 ... [详细]
  • HashMap的相关问题及其底层数据结构和操作流程
    本文介绍了关于HashMap的相关问题,包括其底层数据结构、JDK1.7和JDK1.8的差异、红黑树的使用、扩容和树化的条件、退化为链表的情况、索引的计算方法、hashcode和hash()方法的作用、数组容量的选择、Put方法的流程以及并发问题下的操作。文章还提到了扩容死链和数据错乱的问题,并探讨了key的设计要求。对于对Java面试中的HashMap问题感兴趣的读者,本文将为您提供一些有用的技术和经验。 ... [详细]
  • 本文介绍了在Android开发中使用软引用和弱引用的应用。如果一个对象只具有软引用,那么只有在内存不够的情况下才会被回收,可以用来实现内存敏感的高速缓存;而如果一个对象只具有弱引用,不管内存是否足够,都会被垃圾回收器回收。软引用和弱引用还可以与引用队列联合使用,当被引用的对象被回收时,会将引用加入到关联的引用队列中。软引用和弱引用的根本区别在于生命周期的长短,弱引用的对象可能随时被回收,而软引用的对象只有在内存不够时才会被回收。 ... [详细]
  • MySQL数据库锁机制及其应用(数据库锁的概念)
    本文介绍了MySQL数据库锁机制及其应用。数据库锁是计算机协调多个进程或线程并发访问某一资源的机制,在数据库中,数据是一种供许多用户共享的资源,如何保证数据并发访问的一致性和有效性是数据库必须解决的问题。MySQL的锁机制相对简单,不同的存储引擎支持不同的锁机制,主要包括表级锁、行级锁和页面锁。本文详细介绍了MySQL表级锁的锁模式和特点,以及行级锁和页面锁的特点和应用场景。同时还讨论了锁冲突对数据库并发访问性能的影响。 ... [详细]
  • 本文介绍了一道经典的状态压缩题目——关灯问题2,并提供了解决该问题的算法思路。通过使用二进制表示灯的状态,并枚举所有可能的状态,可以求解出最少按按钮的次数,从而将所有灯关掉。本文还对状压和位运算进行了解释,并指出了该方法的适用性和局限性。 ... [详细]
  • 深入理解Java虚拟机的并发编程与性能优化
    本文主要介绍了Java内存模型与线程的相关概念,探讨了并发编程在服务端应用中的重要性。同时,介绍了Java语言和虚拟机提供的工具,帮助开发人员处理并发方面的问题,提高程序的并发能力和性能优化。文章指出,充分利用计算机处理器的能力和协调线程之间的并发操作是提高服务端程序性能的关键。 ... [详细]
author-avatar
克阳光沫沫的幸福
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有