无锁队列MpscQueue源码分析

前言

之前的文章在分析 NioEventLoop 源码的时候,有提到过 Netty 没有用 JDK 提供的阻塞队列,而是使用了高性能无锁队列 MpscQueue。因为篇幅原因,那篇文章并没有详细介绍 MpscQueue,今天,它来啦!!!

在 Netty 较早的版本中,使用的是自己实现的任务队列,后来全部替换为 JCTools 工具的无锁化队列了,为啥呢?没别的,因为它的效率实在是太高了。

何为 Mpsc???JCTools 提供了很多队列,大家需要针对不同的应用场景选择合适的队列,避免发生潜在的问题。这里解释一下「MSPC」的含义,如下:

  • M:Multiple,多个的。
  • S:Single,单个的。
  • P:Producer,生产者。
  • C:Consumer,消费者。

因此 MpscQueue 其实就是指:适用于多生产者,单消费者的高性能无锁队列!

之前的文章有说过,NioEventLoop 是个单线程的线程池,提交到 EventLoop 的任务会被线程串行化执行。因此 EventLoop 的任务队列的生产消费模型是:多生产者,单消费者。

所以本篇文章只会重点分析 MpscQueue,其他队列大家自行研究。

MpscQueue 源码分析

MpscQueue 不是 Netty 提供的,因此在 Netty 项目里是看不到它的源码的,为了阅读方便,还是建议大家去单独拉 JCTools 的源码,地址:https://github.com/JCTools/JCTools。

Netty 默认使用的队列是org.jctools.queues.MpscUnboundedArrayQueue,这里只分析它。MpscUnboundedArrayQueue 是一个适用于「多生产者单消费者」的无界队列,这意味着它没有容量限制,你可以不断的往里面提交任务,即便没有消费者消费数据。

MpscUnboundedArrayQueue 类的继承关系比较多,类图比较复杂,但是没关系,我们不用分析所有代码,只重点关注它的直接父类BaseMpscLinkedArrayQueue即可,核心逻辑都在这里了,看懂BaseMpscLinkedArrayQueue基本就知道它大概的一个实现思想了。无锁队列MpscQueue源码分析

这里说明一下,MpscQueue 的代码中存在大量类似如下代码:

byte b000,b001,b002,b003,b004,b005,b006,b007;//  8b
byte b010,b011,b012,b013,b014,b015,b016,b017;// 16b
byte b020,b021,b022,b023,b024,b025,b026,b027;// 24b

读者不用过分关注,忽略它即可,这些属性没有什么特别的用处,就是做字节填充用的,这涉及到 CPU 硬件缓存 Cache Line,这里简单说下吧。

计算机除了有 Memory 主存,CPU 每个核心还有自己单独的缓存,这些缓存按等级分为一级缓存、二级缓存等等,距离 CPU 核心越近的缓存效率越高。为啥 CPU 要有缓存?就是因为 CPU 的计算速度实在是太快了,相比之下内存的读写速度实在是太慢了,为了填补二者速度上的鸿沟,CPU 被加入了多级缓存,CPU 会将主存中的数据进行缓存,然后进行运算,运算完成后会在未来的某个时刻写入到主存。

CPU 缓存数据的最小单位是 Cache Line,在大多数 CPU 上,它的大小是 64 字节。只要 Cache Line 中的任一数据失效,整个 Cache Line 就会被认为是失效的,需要从主存中重新加载。

因此,如果 Java 对象/属性被缓存到同一个 Cache Line 上了,那就有可能因为其他线程修改了这一块的某个数据,导致所有线程的 Cache Line 全部失效,进而导致所有线程重新从主存中 load,这会导致不必要的开销。

综上所述,MpscQueue 做了优化,将可能会被频繁读写的数据,分配到不同的 Cache Line,避免相互影响。


言归正传,回到 MpscUnboundedArrayQueue,先说一下它的一个实现思路吧。

MpscUnboundedArrayQueue 基本的数据结构由「数组+链表」组成,它有两个指针:producerBuffer 和 consumerBuffer,分别指向生产者生产和消费者消费对应的数组。它还有两个索引指针:producerIndex 和 consumerIndex,分别代表生产者生产和消费者消费的索引,这两个索引会以 2 为步长不断递增。另外对于生产者,它还有一个 producerLimit 指针,它代表生产者生产消息的上限,达到该上限,Queue 就要扩容了,扩容的方式是创建一个长度一样的新数组,然后旧数组的最后一个元素指向新数组,形成单向链表。

笔者画了一个简图,描述了 MpscQueue 的数据结构变化:无锁队列MpscQueue源码分析

整体流程说的差不多了,下面开始分析源码,先看几个比较重要的属性:

// 生产者索引
private volatile long producerIndex;
// 元素生产的限制,当producerIndex == producerLimit,代表队列需要扩容
private volatile long producerLimit;
protected long producerMask;
// 当前生产者指向的数组
protected E[] producerBuffer;

// 消费者索引
private volatile long consumerIndex;
protected long consumerMask;
// 当前消费者指向的数组
protected E[] consumerBuffer;

// 数组被生产者填满后,会填充一个JUMP,代表队列扩容了,消费者遇到JUMP会消费下一个数组。
private static final Object JUMP = new Object();

// 消费者消费完一个完整的数组后,会将最后一个元素设为BUFFER_CONSUMED。
private static final Object BUFFER_CONSUMED = new Object();

再看构造函数,需要给定一个 chunkSize,指定块大小,MpscQueue 由一系列数组构成,chunkSize 就是数组的大小,它必须是一个 2 的幂次方数。

public MpscUnboundedArrayQueue(int chunkSize) {
    super(chunkSize);
}

在父类的构造函数中,计算了 mask,初始化了一个数组,并将 producerBuffer 和 consumerBuffer 都指向了同一个数组,然后根据 mask 设置 producerLimit。

假设 initialCapacity 为 8,数组的长度就是 9,因为最后一个元素会用来存放扩容数组的地址,形成链表。每个数组还会预留一个槽位存放JUMP元素,代表队列扩容了,消费者遇到JUMP元素就会通过最后一个元素找到扩容后的数组继续消费,因此一个数组最多保留 7 个元素。

/**
 * 初始化
 * @param initialCapacity 数组容量,要求是2的幂次方数
 */

public BaseMpscLinkedArrayQueue(final int initialCapacity) {
 // initialCapacity必须大于等于2
 RangeUtil.checkGreaterThanOrEqual(initialCapacity, 2"initialCapacity");

 // 容量确保是2的幂次方数,找到initialCapacity下一个2的幂次方数
 int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity);

 // index以2为步长递增,预留一个元素存JUMP,所以limit为:(capacity-1)*2
 long mask = (p2capacity - 1) << 1;
 // need extra element to point at next array
 // 需要一个额外元素来链接下一个数组
 E[] buffer = allocateRefArray(p2capacity + 1);
 // 生产者和消费者Buffer指向同一个数组
 producerBuffer = buffer;
 producerMask = mask;
 consumerBuffer = buffer;
 consumerMask = mask;
 // 设置producerLimit = mask
 soProducerLimit(mask);
}

Queue 初始化完成后,就是不断的生产数据和消费数据了,所以接下来重点分析offer()poll()方法。

offer()分析

offer(e)会将元素 e 添加到队列中,即生产数据。在 MpscQueue 中,队列是不能存放空数据的,所以首先会检查非空。然后线程通过 CAS 的方式以步长为 2 递增 producerIndex,CAS 会保证只有一个线程操作成功,CAS 成功就代表线程抢到了数组中的槽位,它可以将元素 e 添加到数组的指定槽位。CAS 失败代表并发失败了,会自旋重试。

上面说的是 producerIndex 还没有达到 producerLimit 的情况,如果达到 producerLimit,代表生产达到上限,队列可能需要扩容了。offerSlowPath()方法会判断队列是否需要扩容,如果需要扩容,也只会交给一个线程去扩容,这里又是一个 CAS 操作,线程以 1 为步长递增 producerIndex,只有 CAS 成功的线程才会去执行扩容逻辑。

因此,在offer(e)的逻辑中,还会判断 producerIndex 是否是奇数,如果为奇数就代表队列正在扩容。因为 MpscQueue 的扩容非常快速,它不需要迁移元素,只需要创建一个新数组,再和旧数组建立连接就可以了,所以没有必要让其他线程挂起,线程发现队列在扩容时,会进行自旋重试,直到扩容完成。

/**
 * 向队列中添加一个元素e,生产数据
 * @param e
 * @return
 */

@Override
public boolean offer(final E e) {
 if (null == e) { // 非空校验
  throw new NullPointerException();
 }

 long mask;
 E[] buffer;//生产者指向的数组
 long pIndex;//生产索引

 while (true) {
  long producerLimit = lvProducerLimit();
        // 获取生产者索引
  pIndex = lvProducerIndex();
  // 生产索引以2为步长递增,一般不会是奇数,在offerSlowPath()中扩容线程会将其设为奇数
  if ((pIndex & 1) == 1) {
   // 奇数代表正在扩容,自旋,等待扩容完成
   continue;
  }

  mask = this.producerMask;
  buffer = this.producerBuffer;
  // 生产索引达到producerLimit,代表可能需要扩容。
  if (producerLimit <= pIndex) {
   int result = offerSlowPath(mask, pIndex, producerLimit);
   switch (result) {
    case CONTINUE_TO_P_INDEX_CAS:
     //producerLimit虽然达到了limit,但是当前数组已经被消费了部分数据,暂时不会扩容,会使用已被消费的槽位。
     break;
    case RETRY://CAS失败,重试
     continue;
    case QUEUE_FULL://队列满,offer失败
     return false;
    case QUEUE_RESIZE://需要扩容
     resize(mask, buffer, pIndex, e, null);
     return true;
   }
  }

  if (casProducerIndex(pIndex, pIndex + 2)) {
   // CAS递增producerIndex成功,抢到槽位,跳出自旋
   break;
  }
 }
 final long offset = modifiedCalcCircularRefElementOffset(pIndex, mask);
 // 将buffer数组的指定位置替换为e,不是根据下标来设置的,是根据槽位的地址偏移量offset,UNSAFE操作。
 soRefElement(buffer, offset, e); // release element e
 return true;
}

offerSlowPath()会告诉线程队列是满了,还是需要扩容,还是需要自旋重试。虽然 producerIndex 达到了 producerLimit,但不代表队列就非得扩容,如果消费者已经消费了部分生产者指向的数组元素,就意味着当前数组还是有槽位可以继续用的,暂时不用扩容。

/**
 * @param mask
 * @param pIndex 生产者索引
 * @param producerLimit 生产者limit
 * @return
 */

private int offerSlowPath(long mask, long pIndex, long producerLimit) {
 // 消费者索引
 final long cIndex = lvConsumerIndex();
 // 数组缓冲的容量,(长度-1) * 2
 long bufferCapacity = getCurrentBufferCapacity(mask);

 // 消费索引+当前数组的容量 > 生产索引,代表当前数组已有部分元素被消费了,不会扩容,会使用已被消费的槽位。
 if (cIndex + bufferCapacity > pIndex) {
  if (!casProducerLimit(producerLimit, cIndex + bufferCapacity)) {
   // CAS失败,自旋重试
   return RETRY;
  } else {
   // 重试CAS修改 生产索引
   return CONTINUE_TO_P_INDEX_CAS;
  }
 }
 // 根据生产者和消费者索引判断Queue是否已满,无解队列永不会满
 else if (availableInQueue(pIndex, cIndex) <= 0) {
  return QUEUE_FULL;
 }
 // grab index for resize -> set lower bit
 // CAS的方式将producerIndex加1,奇数代表正在resize
 else if (casProducerIndex(pIndex, pIndex + 1)) {
  return QUEUE_RESIZE;
 } else {
  // resize失败,重试
  return RETRY;
 }
}

如果需要扩容,线程会 CAS 操作将 producerIndex 改为奇数,让其它线程能感知到队列正在扩容,要生产数据的线程先自旋,等待扩容完成再继续操作。

resize()是扩容的核心方法,它首先会创建一个相同长度的新数组,将 producerBuffer 指向新数组,然后将元素 e 放到新数组中,旧元素的最后一个元素指向新数组,形成链表。还会将旧元素的槽位填充JUMP元素,代表队列扩容了。

// 扩容:新建一个E[],将oldBuffer和newBuffer建立连接。
private void resize(long oldMask, E[] oldBuffer, long pIndex, E e, Supplier<E> s) {
 assert (e != null && s == null) || (e == null || s != null);
 // 下一个Buffer的长度,MpscQueue会构建一个相同长度的Buffer
 int newBufferLength = getNextBufferSize(oldBuffer);
 final E[] newBuffer;
 try {
  // 创建一个新的E[]
  newBuffer = allocateRefArray(newBufferLength);
 } catch (OutOfMemoryError oom) {
  assert lvProducerIndex() == pIndex + 1;
  soProducerIndex(pIndex);
  throw oom;
 }

 // 生产者Buffer指向新的E[]
 producerBuffer = newBuffer;
 // 计算新的Mask,Buffer长度不变的情况下,Mask也不变
 final int newMask = (newBufferLength - 2) << 1;
 producerMask = newMask;

 // 根据该偏移量设置oldBuffer的JUMP元素,会递增然后重置,不断循环
 final long offsetInOld = modifiedCalcCircularRefElementOffset(pIndex, oldMask);
 // Mask不变的情况下,oldBuffer的JUMP对应的位置,就是newBuffer中要消费的位置.
 final long offsetInNew = modifiedCalcCircularRefElementOffset(pIndex, newMask);

 // 元素e放到新数组中
 soRefElement(newBuffer, offsetInNew, e == null ? s.get() : e);
 // 旧数组和新数组建立连接,旧数组的最后一个元素保存新数组的地址。
 soRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);

 // 消费者索引
 final long cIndex = lvConsumerIndex();
 // 根据消费者和生产者索引,校验Queue是否已满。对于无界队列,返回Integer.MAX_VALUE,永远都不会满。
 final long availableInQueue = availableInQueue(pIndex, cIndex);
 RangeUtil.checkPositive(availableInQueue, "availableInQueue");

 // 设置新的producerLimit
 soProducerLimit(pIndex + Math.min(newMask, availableInQueue));

 /*
 扩容的时候会将producerIndex设为pIndex+1,奇数代表正在扩容,非扩容线程会自旋重试,等待扩容完成。
 现在元素已经放入队列,将producerIndex设为pIndex+2,让其他线程知道扩容完成。
  */

 soProducerIndex(pIndex + 2);

 /*
 将旧数组的指定位置设为JUMP,消费者遇到JUMP就知道队列扩容了,会寻找next连接的数组。
  */

 soRefElement(oldBuffer, offsetInOld, JUMP);
}

offer()主要流程就这样,CAS 抢槽位,确保只有单个线程能生产,CAS 失败的线程自旋重试。如果遇到队列需要扩容,则将 producerIndex 设为奇数,其他线程自旋等待扩容完成,扩容后再设为偶数,通知其它线程继续生产。

poll()分析

元素生产好了,就是为了调用poll()来进行消费的。

poll()首先还是找到 consumerBuffer 指向的当前消费数组,根据消费索引 consumerIndex 计算要消费的元素相较于 Array 的内存地址偏移量,根据这个偏移量来获取元素。

如果元素为 null,并不代表队列是空的,还要比较 consumerIndex 和 producerIndex,如果两者索引不同,那么 producerIndex 肯定是大于 consumerIndex 的,说明生产者已经在生产了,移动了 producerIndex,只是还没来得及将元素填充到数组而已。因为生产者是先 CAS 递增 producerIndex,再将元素填充到数组的,两步之间存在一个非常短的时间差,如果消费者恰好在这个时间差内去消费数据,那么就自旋等待一下,等待生产者填充元素到数组。

如果元素为JUMP,说明队列扩容了,消费者需要根据数组的最后一个元素找到扩容后的新数组,消费新数组的元素。

// MpscQueue是多生产单消费者的Queue,因此poll()没有做并发控制。
@Override
public E poll() {
 /*
 consumerBuffer和producerBuffer会在Queue的构造函数中被初始化,
 初始化时,两者会指向同一个数组。随着生产者不断生产数据,Queue扩容,producerBuffer会慢慢指向新的数组。
  */

 final E[] buffer = consumerBuffer;
 // 消费者索引
 final long index = lpConsumerIndex();
 final long mask = consumerMask;

 // 计算消费者需要消费的元素在数组中的地址偏移量
 final long offset = modifiedCalcCircularRefElementOffset(index, mask);
 // 根据offset取出元素e
 Object e = lvRefElement(buffer, offset);
 if (e == null) {
  if (index != lvProducerIndex()) {
   /*
   offer()时生产者先CAS改producerIndex,再设置元素。
   中间会有一个时间差,此时会自旋,等待元素设置完成。
    */

   do {
    e = lvRefElement(buffer, offset);
   }
   while (e == null);
  } else {//元素已经消费完
   return null;
  }
 }

 if (e == JUMP) {// 代表队列扩容了
  /*
  通过当前数组的最后一个元素,获取下一个待消费的数组,
  同时,消费者还会将最后一个元素设为BUFFER_CONSUMED,代表当前数组已经消费完毕。
   */

  final E[] nextBuffer = nextBuffer(buffer, mask);
  // 从新数组中消费元素
  return newBufferPoll(nextBuffer, index);
 }

 // 取出元素后,将原来的槽位设为null
 soRefElement(buffer, offset, null);
 // 递增consumerIndex
 soConsumerIndex(index + 2);
 return (E) e;
}

如果队列扩容了,nextBuffer()会找到扩容后的新数组,同时它还会将旧数组的最后一个元素设为BUFFER_CONSUMED,代表当前数组已经被消费完了,也就从链表中剔除了。

// 找到扩容后的新数组
private E[] nextBuffer(final E[] buffer, final long mask) {
    // 计算数组最后一个元素的地址偏移量
    final long offset = nextArrayOffset(mask);
    // 找到下一个数组
    final E[] nextBuffer = (E[]) lvRefElement(buffer, offset);
    // 消费者Buffer指向新数组
    consumerBuffer = nextBuffer;
    // 重新计算Mask,数组长度不变的则Mask不变
    consumerMask = (length(nextBuffer) - 2) << 1;
    // 将旧数组的最后一个元素设为BUFFER_CONSUMED,代表消费完毕。
    soRefElement(buffer, offset, BUFFER_CONSUMED);
    return nextBuffer;
}

得到新数组后,会调用newBufferPoll()从新数组中消费数据:

// 从扩容后的新数组里消费数据,索引下标不变
private E newBufferPoll(E[] nextBuffer, long index) {
 // 根据consumerIndex计算要消费的元素相较于Array的内存偏移量
 final long offset = modifiedCalcCircularRefElementOffset(index, consumerMask);
 // 根据offset取出这个元素
 final E n = lvRefElement(nextBuffer, offset);
 if (n == null) {//offer()的元素不可能为null,一般不会进这个if
  throw new IllegalStateException("new buffer must have at least one element");
 }
 // 元素取出后将那个那个槽位设为null
 soRefElement(nextBuffer, offset, null);
 // 递增consumerIndex
 soConsumerIndex(index + 2);
 return n;
}

消费者取出数据后,会将数组原来的槽位中填充 null,其实也就代表这个槽位没有使用,可以被复用。

至此,poll()的消费流程也全部分析结束了,可以看到,全程都没有挂起线程,顶多就是自旋等待。

总结

MpscQueue 是一个「多生产者单消费者」的高性能无锁队列,符合 Netty EventLoop 的任务消费模型。它用到了大量的 CAS 操作,对于需要做并发控制的地方,确保只有一个线程会执行成功,其他 CAS 失败的线程会自旋重试,全程都是无锁非阻塞的。不管是扩容,还是等待元素被填充到数组,这些过程都是会极快完成的,因此短暂的自旋会比线程挂起再唤醒效率更高。MpscQueue 由一系列数组构成,数组的最后一个元素指向下一个数组,形成单向链表。数组扩容后会在原槽位填充JUMP元素,消费者遇到该元素就知道要寻找新数组继续消费了。

MpscQueue 全程无锁化,非阻塞,相较于 JDK 提供的同步阻塞队列,性能有很好的提升,这也是 Netty 后来的版本将任务队列替换为 JCtools 的重要原因。


原文始发于微信公众号(程序员小潘):无锁队列MpscQueue源码分析

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/29388.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!