背景
寻找更高性能的并发队列
JDK内置的线程安全队列
队列 | 有界性 | 锁 | 数据结构 |
---|---|---|---|
ArrayBlockingQueue | bounded(有界的) | 加锁 | arraylist(数组) |
LinkedBlockingQueue | optionally-bounded | 加锁 | linkedlist(链表) |
ConcurrentLinkedQueue | unbounded | 无锁 | linkedlist(链表) |
LinkedTransferQueue | unbounded | 无锁 | linkedlist(链表) |
PriorityBlockingQueue | unbounded | 无锁 | heap |
ConcurrentLinkedQueue | unbounded | 无锁 | heap |
队列的底层一般分成三种:数组、链表和堆。其中,堆一般情况下是为了实现带有优先级特性的队列,暂且不考虑 从数组和链表两种数据结构来看,基于数组线程安全的队列,比较典型的是ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全;基于链表的线程安全队列分成LinkedBlockingQueue和ConcurrentLinkedQueue两大类,前者也通过锁的方式来实现线程安全,而后者以及上面表格中的LinkedTransferQueue都是通过原子变量compare and swap(以下简称“CAS”)这种不加锁的方式来实现的。但是对 volatile类型的变量进行 CAS 操作,存在伪共享问题。伪共享可以参考之前的文章
-
伪共享
通过不加锁的方式实现的队列都是无界的(无法保证队列的长度在确定的范围内);而加锁的方式,可以实现有界队列。在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列;同时,为了减少Java的垃圾回收对系统性能的影响,会尽量选择array/heap格式的数据结构。这样筛选下来,符合条件的队列就只有ArrayBlockingQueue。但是ArrayBlockingQueue
又因为加锁和伪共享问题出现严重的性能问题。那么有没有一款更合适的线程安全的队列呢
Disruptor
简介
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖
github地址
Disruptor
Disruptor使用场景
基本上队列适用的场景他都适合,最常见的就是生产者-消费者
场景,并要求顺序处理
开源项目
有哪些开源框架使用了Disruptor
呢
-
Log4j2 -
incubator-shenyu -
Apache Storm -
…
Disruptor实战demo
maven版本选择

目前最新文档版本是3.4.4
,那我们就使用这个版本吧
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>
实现功能
我们实现一个最简单的例子,生产者生成一个long值然后传递给消费者消费打印
1. 定义事件
-
LongEvent
public class LongEvent {
private long value;
public void set(long value) {
this.value = value;
}
}
2. 定义事件构造工厂
-
LongEventFactory
public class LongEventFactory implements EventFactory<LongEvent> {
@Override
public LongEvent newInstance() {
return new LongEvent();
}
}
3. 定义事件消费者
-
LongEventHandler
public class LongEventHandler implements EventHandler<LongEvent> {
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
System.out.println("Event: " + event);
}
}
4. 定义事件源(事件发布器)
事件都会有一个生成事件的源,这个例子中假设事件是由于磁盘IO或者network读取数据的时候触发的
自从 Disruptor 版本3.0以来,已经可以使用 Lambda 风格的 API 来编写发布者。这是首选的方法,因为它封装了替代方案的大部分复杂性。所以我们这里使用最新的Lambda来编码
-
LongEventMain
public class LongEventMain {
public static void main(String[] args) throws Exception {
// 1. 指定环形缓冲区的大小,必须为2的幂次(Specify the size of the ring buffer, must be power of 2)
int bufferSize = 1024;
// 2. 构建Disruptor(Construct the Disruptor)
Disruptor<LongEvent> disruptor =
new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
// 3. 连接disruptor(Connect the handler)
disruptor.handleEventsWith((event, sequence, endOfBatch) ->
System.out.println("Event: " + event));
// 4. 启动disruptor,启动所有线程运行( Start the Disruptor, starts all threads running)
disruptor.start();
// 5. 从 Disruptor 获取用于发布的环形缓冲区(Get the ring buffer from the Disruptor to be used for publishing)
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++) {
bb.putLong(0, l);
ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
Thread.sleep(1000);
}
}
}
这里需要额外注意publishEvent
方法只能引用传入的参数
如果把代码写出如下方式
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0)));
Thread.sleep(1000);
}
这将创建一个捕获 lambda,这意味着它将需要实例化一个对象来保存 ByteBuffer bb 变量,因为它将 lambda 传递给 PublishEvent ()调用。这将创建额外的(不必要的)垃圾,因此如果需要较低的 GC 压力,那么将参数传递给 lambda 的调用应该是首选的。
5. 运行结果

一些核心概念
Ring Buffer
Ring Buffer(环形缓冲区)通常被认为是Disruptor的核心。然而,从3.0开始,Ring Buffer 只负责存储和更新通过 Disruptor 的数据(事件)。对于某些高级用例,甚至可以完全由用户替换。
Sequence
Disruptor 使用序列作为识别特定组件到达的位置的手段。每个使用者(事件处理器)维护一个序列,干扰器本身也是如此。大多数并发代码依赖于这些序列值的移动,因此序列支持 AtomicLong 的许多当前特性。事实上,两者之间唯一真正的区别是序列包含了额外的功能,以防止序列和其他值之间的错误共享
Sequencer
Sequencer是Disruptor的真正核心。这个接口的两个实现(单生产者,多生产者)实现了所有的并发算法,以便在生产者和消费者之间快速、正确地传递数据
Sequence Barrier
序列屏障: Sequencer produces产生一个Sequence Barrier,其中包含来自序列控制器的主要发布序列和任何依赖消费者的序列的引用。它包含用于确定是否有任何事件可供使用者处理的逻辑。
Wait Strategy
等待策略: 等待策略决定使用者将如何等待事件被生产者放入Disruptor 大致有如下几种实现方式Disruptor 使用的默认 WaitStrategy 是 BlockingWaitStrategy。在内部,BlockingWaitStrategy 使用一个典型的锁和条件变量来处理线程唤醒。BlockingWaitStrategy 是可用的等待策略中速度最慢的,但是在 CPU 使用方面却是最保守的,它将在最广泛的部署选项中提供最一致的行为。
Event
事件,生产者传递到消费者的数据单元。事件没有特定的代码表示形式,因为它完全由用户定义,就像上面的LongEvent
Event Processor
事件处理器,
Event Handler
事件处理程序,由用户实现EventHandler
接口自由编写,像上面例子中的LongEventHandler
Producer
Producer 调用 Disruptor 对 Events 进行排队的用户代码。这个概念在代码中也没有表示
官方调用图

参考
-
Disruptor官网 -
高性能队列——Disruptor -
disruptor 使用和原理 图解
总结
总的来说只是简单了解了一下Disruptor
是什么,然后就是一些简单的使用。后续有空再补上一些实现原理和开源框架实际使用的源码分析吧
原文始发于微信公众号(小奏技术):高性能并发队列Disruptor
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/29894.html