Java JUC ConcurrentLinkedQueue解析

介绍

ConcurrentLinkedQueue 是线程安全的无界非阻塞队列,底层使用单向链表实现,对于入队和出队操作使用 CAS 实现线程安全。

Java JUC ConcurrentLinkedQueue解析
类图

ConcurrentLinkedQueue 内部的队列使用单向链表方式实现,其中有两个volatile类型的Node节点分别用来存放队列的头、尾节点。

下面无参构造函数中可以知道,默认头尾节点都是指向 item 为 null 的哨兵节点,新元素插入队尾,获取元素从头部出队。

private transient volatile Node<E> head;
private transient volatile Node<E> tail;

public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}

在 Node 节点中维护一个使用 volatile 修饰的变量 item,用来存放节点的值;next 存放链表的下一个节点;其内部则使用 UNSafe 工具类提供的 CAS 算法来保证出入队时操作链表的原子性。

offer 操作

offer 操作是在队尾增加一个元素,如果传递的参数是 null 则抛出异常,否则由于 ConcurrentLinkedQueue 是无界队列,该方法会一直返回 true,另外,由于使用的是 CAS 算法,所以该方法不会被阻塞挂起调用方线程。下面我们看源码:

public boolean offer(E e) {
        //(1)为null则抛出异常
        checkNotNull(e);
       //(2)构造Node节点,内部调用unsafe.putObject
        final Node<E> newNode = new Node<E>(e);
        //(3)从尾节点插入
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            //(4)如果q == null则说明p是尾节点,执行插入
            if (q == null) {
                //(5)使用CAS设置p节点的next节点
                if (p.casNext(null, newNode)) {
                    //(6)如果p != t,则将入队节点设置成tail节点,更新失败了也没关系,因为失败了表示有其他线程成功更新了tail节点
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
            }
            else if (p == q)
                //(7)多线程操作时,由于poll操作移除元素后可能会把head变成自引用,也就是head的next变成了head,这里需要重新找新的head
                p = (t != (t = tail)) ? t : head;
            else
                //(8)寻找尾节点
                p = (p != t && t != (t = tail)) ? t : q;
        }
}

我们解析一下这个方法的执行流程,首先当调用该方法时,代码(1)对传参进行检查,为 null 则抛出异常,否则执行代码(2)并使用 item 作为构造函数参数创建一个新的节点,然后代码(3)从队列尾部节点开始循环,打算从尾部添加元素,到代码(4)时队列状态如下图。

Java JUC ConcurrentLinkedQueue解析
队列状态

这时候节点 p、t、head、tail 都指向 item 为 null 的哨兵节点,由于哨兵节点 next 节点为 null,所以 q 也是 null。

代码(4)发现 q == null 则执行代码(5),使用 CAS 判断 p 节点的 next 是否为 null,为 null 则使用 newNode 替换 p 的 next 节点,然后执行代码(6),但这时候 p == t 所以没有设置尾部节点,然后退出 offer 方法。目前队列状态如下图:

Java JUC ConcurrentLinkedQueue解析
队列状态

刚才我们讲解的是一个线程调用 offer 方法的情况,但是如果多个线程同时调用,就会存在多个线程同时执行到代码(5)的情况。

假设线程 1 调用了 offer(item1),线程 2 调用了 offer(item2),都同时执行到了代码(5),p.casNext(null,newNode)。由于 CAS 是原子性的,我们假设线程 1 先执行 CAS 操作,发现 p.next 为 null,则更新为 item1,这时候线程 2 也会判断 p.next 是否为 null,发现不为 null,则跳转到代码(3),然后执行代码(4)。这时队列分布如下图:

Java JUC ConcurrentLinkedQueue解析
队列分布

随后线程 2 发现不满足代码(4),则跳转执行代码(8),然后把 q 赋值给了 p。队列状态如下:

Java JUC ConcurrentLinkedQueue解析
队列状态

然后线程 2 再次跳转到代码(3)执行,当执行到代码(4)时队列状态如下图:

Java JUC ConcurrentLinkedQueue解析
队列状态

这时候q == null,所以线程 2 会执行代码(5),通过 CAS 操作判断 p.next 节点是否为 null,是 null 则使用 item2 替换,不是则继续循环尝试。假设 CAS 成功,那么执行代码(6),由于p != t,所以设置 tail 节点为 item2,然后退出 offer 方法。这时候队列分布如下:

Java JUC ConcurrentLinkedQueue解析
队列状态

到现在我们就差代码(7)没有讲过,其实在这一步主要是在执行 poll 操作后才会执行。在执行 poll 可能会发生如下情况:

Java JUC ConcurrentLinkedQueue解析
队列状态

我们分析一下如果是这种状态时调用 offer 添加元素,在执行到代码(4)时状态图如下:

Java JUC ConcurrentLinkedQueue解析
队列状态

这里由于 q 节点不为空并且p == q所以执行到代码(7)由于t == tail所以 p 被赋值为 head,然后重新循环,循环后执行到代码(4),队列状态如下:

Java JUC ConcurrentLinkedQueue解析
队列状态

这时候由于q == null,所以执行代码(5)进行 CAS 操作,如果当前没有其他线程执行 offer 操作,则 CAS 操作会成功,p.next 节点被设置为新增节点。然后执行代码(6),由于p != t所以设置新节点为队列的尾部节点,现在队列状态如下图:

Java JUC ConcurrentLinkedQueue解析
队列状态

📢 需要注意,这里自引用节点会被垃圾回收掉

总结:offer 操作的关键步骤是代码(5),通过 CAS 操作来控制同一时刻只有一个线程可以追加元素到队列末尾,而失败的线程会进行循环尝试 CAS 操作,直到 CAS 成功。

add 操作

add 操作是在链表末尾添加一个元素,内部还是调用的 offer 方法。

public boolean add(E e) {
    return offer(e);
}

poll 操作

poll 操作是在队列头部获取并移除一个元素,如果队列为空则返回 null。下面看看该方法的实现原理。

public E poll() {
    //(1)goto标记
    restartFromHead:
    //(2)无限循环
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            //(3)保存当前节点
            E item = p.item;
       //(4)当前节点有值切CAS变为null
            if (item != null && p.casItem(item, null)) {
                //(5)CAS成功后标记当前节点并移除
                if (p != h) // hop two nodes at a time
                    //更新头节点
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
           //(6)当前队列为空则返回null
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            //(7)如果当前节点被自引用,则重新寻找头节点
            else if (p == q)
                continue restartFromHead;
            else  //(8)如果下一个元素不为空,则将头节点的下一个节点设置成头节点
                p = q;
        }
    }
}
final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}

由于 poll 方法是从头部获取一个元素并移除,所以代码(2)内层循环是从 head 节点开始,代码(3)获取当前队列头节点,队列一开始为空时队列状态如下:

Java JUC ConcurrentLinkedQueue解析
队列状态

由于 head 节点指向的 item 为 null 的哨兵节点,所以会执行到代码(6),假设这个过程中没有线程调用 offer 方法,则此时 q 等于 null,这时队列状态如下:

Java JUC ConcurrentLinkedQueue解析
队列状态

随后执行 updateHead 方法,由于h == p,所以没有设置头节点,然后返回 null。

假设执行到代码(6)时,已经有其它线程调用了 offer 方法并成功添加了一个元素到队列,这时候 q 指向的是新增加的元素节点,如下图:

Java JUC ConcurrentLinkedQueue解析
队列状态

然后不满足代码(6)(q = p.next) == null,执行代码(7)的时候 p != q则执行代码(8),然后将 p 指向了节点 q,队列状态如下:

Java JUC ConcurrentLinkedQueue解析
队列状态

然后程序又去执行代码(3),p 现在指向的不为 null,则执行p.casItem(item, null)通过 CAS 操作尝试设置 p 的 item 值为 null,如果此时 CAS 成功,执行代码(5),此时p != h则设置头节点为 p,并且设置 h 的 next 节点为自己,poll 然后返回被移除的节点 item。队列图如下:

Java JUC ConcurrentLinkedQueue解析
队列状态

目前的队列状态就是我们刚才讲 offer 操作时,执行到代码(7)的状态。

现在我们还有代码(7)没执行过,我们看一下什么时候执行。假设线程 1 执行 poll 操作时,队列状态如下:

Java JUC ConcurrentLinkedQueue解析
队列状态

那么执行p.casItem(item, null)通过 CAS 操作尝试设置 p 的 item 值为 null,假设 CAS 设置成功则标记该节点并从队列中将其移除。队列状态如下:

Java JUC ConcurrentLinkedQueue解析
队列状态

然后,由于p != h,所以会执行 updateHead 方法,假如线程 1 执行 updateHead 前另外一个线程 2 开始 poll 操作,这时候线程 2 的 p 指向 head 节点,但是还没有执行到代码(6),这时候队列状态如下:

Java JUC ConcurrentLinkedQueue解析
队列状态

然后线程 1 执行 updateHead 操作,执行完毕后线程 1 退出,这时候队列状态如下:

Java JUC ConcurrentLinkedQueue解析
队列状态

然后线程 2 继续执行代码(6), q = p.next,由于该节点是自引用节点,所以p == q,所以会执行代码(7)跳到外层循环 restartFromHead,获取当前队列头 head,现在的状态如下:

Java JUC ConcurrentLinkedQueue解析
队列状态

总结:poll 方法在移除一个元素时,只是简单地使用 CAS 操作把当前节点的 item 值设置为 null,然后通过重新设置头节点将该元素从队列里面移除,被移除的节点就成了孤立节点,这个节点会在垃圾回收时被回收掉。另外,如果在执行分支中发现头节点被修改了,要跳到外层循环重新获取新的头节点。

peek 操作

peek 操作是获取队列头部一个元素(只获取不移除),如果队列为空则返回 null。下面看下其实现原理。

public E peek() {
   //1.
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            //2.
            E item = p.item;
            //3.
            if (item != null || (q = p.next) == null) {
                updateHead(h, p);
                return item;
            }
            //4.
            else if (p == q)
                continue restartFromHead;
            //5.
            else
                p = q;
        }
    }
}

peek 操作的代码结构与 poll 操作类似,不同之处在于代码(3)中少了 castItem 操作。其实这很正常,因为 peek 只是获取队列头元素值,并不清空其值。根据前面的介绍我们知道第一次执行 offer 后 head 指向的是哨兵节点(也就是 item 为 null 的节点),那么第一次执行 peek 时在代码(3)中会发现item == null,然后执行q = p.next,这时候 q 节点指向的才是队列里面第一个真正的元素,或者如果队列为 null 则 q 指向 null。

当队列为空时,队列如下:

Java JUC ConcurrentLinkedQueue解析
队列状态

这时候执行 updateHead,由于 h 节点等于 p 节点,所以不进行任何操作,然后 peek 操作会返回 null。

当队列中至少有一个元素时(假设只有一个),队列状态如下:

Java JUC ConcurrentLinkedQueue解析
队列状态

这时候执行代码(5), p 指向了 q 节点,然后执行代码(3),此时队列状态如下:

Java JUC ConcurrentLinkedQueue解析
队列状态

执行代码(3)时发现 item 不为 null,所以执行 updateHead 方法,由于h != p,所以设置头节点,设置后队列状态如下:

Java JUC ConcurrentLinkedQueue解析
队列状态

最后剔除哨兵节点。

总结:peek 操作的代码与 poll 操作类似,只是前者只获取队列头元素但是并不从队列里将它删除,而后者获取后需要从队列里面将它删除。另外,在第一次调用 peek 操作时,会删除哨兵节点,并让队列的 head 节点指向队列里面第一个元素或者为 null

size 操作

计算当前队列元素个数,在并发环境下不是很有用,因为 CAS 没有加锁,所以从调用 size 方法到返回结果期间有可能增删元素,导致统计的元素个数不精确。

public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}
//获取第一个队列元素,(剔除哨兵节点),没有则为null
Node<E> first() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            boolean hasItem = (p.item != null);
            if (hasItem || (q = p.next) == null) {
                updateHead(h, p);
                return hasItem ? p : null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

remove 操作

如果队列里面存在该元素则删除该元素,如果存在多个则删除第一个,并返回 true,否则返回 false。

public boolean remove(Object o) {
    if (o != null) {
        Node<E> next, pred = null;
        for (Node<E> p = first(); p != null; pred = p, p = next) {
            boolean removed = false;
            E item = p.item;
            if (item != null) {
              // 若不匹配,则获取next节点继续匹配
                if (!o.equals(item)) {
                    next = succ(p);
                    continue;
                }
              //相等则使用CAS设置为null
                removed = p.casItem(item, null);
            }
            // 获取删除节点的后继节点
            next = succ(p);
            //如果有前驱节点,并且next节点不为null,则链接前驱节点到next节点
            if (pred != null && next != null// unlink
              // 将被删除的节点通过CAS移除队列
                pred.casNext(p, next);
            if (removed)
                return true;
        }
    }
    return false;
}

contains 操作

判断队列里面是否含有指定对象,由于是遍历整个队列,所以像 size 操作一样结果也不是那么精确,有可能调用该方法时元素还在队列里面,但是遍历过程中其他线程才把该元素删除了,那么就会返回 false。

public boolean contains(Object o) {
    if (o == nullreturn false;
    for (Node<E> p = first(); p != null; p = succ(p)) {
        E item = p.item;
        if (item != null && o.equals(item))
            return true;
    }
    return false;
}

总结

ConcurrentLinkedQueue 的底层使用单向链表数据结构来保存队列元素,每个元素被包装成一个 Node 节点。队列是靠头、尾节点来维护的,创建队列时头、尾节点指向一个 item 为 null 的哨兵节点。第一次执行 peek 或者 first 操作时会把 head 指向第一个真正的队列元素。由于使用非阻塞 CAS 算法,没有加锁,所以在计算 size 时有可能进行了 offer、poll 或者 remove 操作,导致计算的元素个数不精确,所以在并发情况下 size 方法不是很有用。


原文始发于微信公众号(Java菜鸟程序员):Java JUC ConcurrentLinkedQueue解析

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/20988.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!