大家好,我是快卷不动了的七哥。
这篇文章理解起来不难,相比于 ConcurrentHashMap 比较简单,因为不涉及扩容以及数据迁移等操作,相信你读完一定会有收获的。
本文主角就是 java 并发包中提供的 ConcurrentLinkedQueue
这是一个线程安全且无界的队列(因为是基于链表实现的,所以无界) ,在并发编程中经常需要用到线程安全的队列,面试线程池时,其中的队列也可以采用这个队列来实现,它线程安全,且采用先进先出的规则排序。
通过整个并发系列文章的学习,我们能想到如果要实现一个线程安全的队列那么有两种方式:一种是使用阻塞方式,即锁的形式,在出队、入队方法加上 synchronized 或者获取 lock 锁 等方式实现。另一种是非阻塞方式,使用自旋CAS的方式来实现。而 Java 并发包中你会看到 Concurrent 开头的类都是支持并发的,也就是非阻塞的。
这篇文章我们一起来从源码分析下 并发大师 Doug Lea 是如何使用非阻塞的方式实现线程安全队列 ConcurrentLinkedQueue 的,相信从大师身上,我们能学到不少并发技巧。
ConcurrentLinkedQueue 结构
通过 ConcurrentLinkedQueue 的类图来分析一下它的结构:

可以看出,ConcurrentLinkedQueue 由 head 节点和 tail 节点组成,每个节点,即子类 Node 由 节点属性 item 和 next(指向下一个节点Node的引用)组成。节点之间通过 next 关联,从而组成一张链表。
入队即每次将元素包装成节点放到链表结尾,出队从链表头删除一个元素返回。
了解了整体结构,你应该也看出来了,并发队列 ConcurrentLinkedQueue
最重要的就是俩操作,入队和出队,接下来我们直接从源码层面学习。
入队操作
入队过程其实就是将入队节点添加到队列尾部,这个入队操作,其实 Doug Lea 大师做了一些优化,为了一会看源码更加清晰,这里先看一组入队的过程图,直观的了解一下优化点。假设现在要插入四个节点:

通过上面入队的操作,观察到 head 和 tail 节点的变化,总结其实就是干了两件事:
-
第一是将入队节点设置到当前队尾节点的 next 节点上; -
第二就是更新 tail 节点,如果 tail 节点的 next 节点不为空,则将入队节点设置为 tail 节点,如果 tail 节点的 next 节点为空,则将入队节点设为 tail 节点的 next 节点。
也就是说 tail 节点并不一定是尾结点,一定要清楚记着这一点,这对理解下面的入队源码非常有用。
下面我就不多说了,直接看代码,理解上面的描述,再结合代码注释,相信你一定能看懂:
// 将指定的元素插入到此队列的末尾,因为队列是无界的,所以这个方法永远不会返回 false 。
public boolean offer(E e) {
checkNotNull(e);
// 入队前,创建一个入队节点
final Node<E> newNode = new Node<E>(e);
// 死循环,入队不成功反复入队
// 创建一个指向tail节点的引用,p用来表示队列的尾节点,默认情况下等于tail节点
for (Node<E> t = tail, p = t;;) {
// 获得p节点的下一个节点
Node<E> q = p.next;
// next节点为空,说明p是尾节点
if (q == null) {
// p is last node
// p是尾结点,则设置p节点的next节点为入队节点
if (p.casNext(null, newNode)) {
// 首先要知道入队操作不是每次都设置tail节点为尾结点,为了减少CAS操作提高性能,也就是说tail节点不总是尾节点
// 如果tail节点有大于等于1个next节点,则将入队节点设置成tail节点,
// p != t 这个条件者结合下面 else 分支看,下面在冲突的时候会修改 p 指向 p.next,所以导致p不等于 tail,
// 即tail节点有大于等于1个的next节点
if (p != t) // hop two nodes at a time
// 如果tail节点有大于等于1个next节点,则将入队节点设置成tail节点,
// 这里允许失败,更新失败了也没关系,因为失败了表示有其他线程成功更新了tail节点
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
// 这个分支要想进去,即 p 节点等于 p.next 节点,只有一种可能,就是 p 节点和 p.next 节点都为空
// 表示这队列刚初始化,正准备添加节点,所以需要返回head
else if (p == q)
// 如果tail变了,说明被其他线程添加成功了,则 p 取新的 tail,否则 p 从 head 开始
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
// 进行这个分支说明next节点不为空,说明p不是尾节点,需要更新p后在将它指向next节点
// 执行 p != t 说明尾结点不等于tail,t != (t = tail)) 说明tail做了变动,
// 同时满足说明tail已经重新设置了,即结尾就是tail,否则尾结点取tail.next
p = (p != t && t != (t = tail)) ? t : q;
}
}
从源代码角度来看,整个入队过程主要做两件事情:
-
第一是定位出尾节点; -
第二是使用 CAS 算法将入队节点设置成尾节点的 next 节点,如不成功则重试。
这里我们思考一下,上面我们分析出入队操作在先进先出队列中就是将其设置为尾结点, Doug Lea 大师的代码写的有点复杂,我们可以不可以用下面的代码来替代呢?
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
Node<E> n = new Node<E>(e);
for (;;) {
Node<E> t = tail;
if (t.casNext(null, n) && casTail(t, n)) {
return true;
}
}
}
上面的代码每次入队都将 tail 设置为尾结点,这样能节省很多的代码量,并且更加容易理解。
不过这样做的缺点就是每次都要使用循环 CAS 来进行设置 tail 节点。如果能减少 CAS 更新 tail 节点则能提高入队的效率。
但是我们同样要考虑到,由于并不是 tail 一定等于尾结点,所以在入队定位末尾节点时就要多一次循环操作。
但是这样效率还是高的,因为 tail 节点它是 volatile 变量,本质上来看是通过增加 volatile 变量的读来减少 volatile 变量的写,而 对于 volatile 写的开销是远远大于读操作的,所以入队效率会提升。大神对于性能的追求,真是到了极致,源码读起来还是有用的吧!!
出队操作
说到出队操作,你肯定会想到,出队是不是也要减少更新 head 节点,而直接弹出 队首元素 从而减少 CAS 更新操作以提升性能呢?
带着这个问题,我们一起往下看,实现为了便于读者理解,这里还是先放一组出队操作的快照图。

从图中得知,并不是每次出队都会 更新 head 节点,如果 head 节点的有元素时,则直接弹出 head 中的元素,清空该节点对元素的引用。如果 head 节点中元素为空,才会更新 head 节点。
有了大概的理解,然后就可以去读源码来分析了:
// 从队头出队
public E poll() {
restartFromHead:
for (;;) {
// p 表示头节点,需要出队的节点
for (Node<E> h = head, p = h, q;;) {
// 获取p节点的元素
E item = p.item;
// 如果头节点p中元素不为空,则进行CAS清空p节点对元素的应用,返回p节点的元素
if (item != null && p.casItem(item, null)) {
// CAS 设为成功后进入到这里,需要判断头节点p是否和head节点不是同一个了,即头节点p已经变更了
if (p != h) // hop two nodes at a time
// 更新head节点,将p节点的next节点设为head节点,如果p.next不为空则设置p.next,否则设置p本身
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 到这说明头节点p中的元素为null或者发生冲突CAS失败,CAS失败也说明被别的线程取走了当前元素,所以就该取一个节点即next节点
// 如果队列头节点p的next节点为空,说明队列已空,将则head设为p,返回null
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// 进到这里说明 (q = p.next) == null 返回false,即p的next不为空
// 同时q=p.next,而这个分支是说p=p.next,只有队列初始化时满足条件,两者都为空,则返回head,重头开始赋值
else if (p == q)
continue restartFromHead;
else
// 说明p节点的next不为空,且队列不是初始化状态,所以头节点p指向p.next
p = q;
}
}
}
首先获取头节点的元素,然后判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,如果不为空,则使用CAS的方式将头节点的引用设置成null,如果CAS成功,则直接返回头节点的元素,如果不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取头节点。
出队操作时,也是当 head 节点不等于 头节点 p 时,再次出队,才会将 head 设置为 最新的队头节点,减少了 CAS 操作,提升了效率。
总结
-
ConcurrentLinkedQueue 无界是因为结构是用链表组成的,天生无界,当然受到系统资源大小限制; -
ConcurrentLinkedQueue 在入队和出队时,均采用了减少 CAS 更新 head 和 tail 的操作,提升了性能; -
ConcurrentLinkedQueue 采用非阻塞模式实现,即无锁,通过自旋和 CAS 实现线程安全;
今天学习的并发包中线程安全的无界队列 ConcurrentLinkedQueue 源码也不难,相信会让你有更深的了解,方便以后在工作中使用和应付面试。
笔者水平有限,文章难免会有纰漏,如有错误欢迎一起交流探讨,我会第一时间更正的。都看到这里了,码字不易,可爱的你记得 “点赞” 哦,我需要你的正向反馈。
原文始发于微信公众号(七哥聊编程):并发编程 9:无界线程安全队列ConcurrentLinkedQueue源码解析
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/37116.html