大纲
1.Disruptor简介
2.Disruptor和BlockingQueue的压测对比
3.Disruptor的编程模型
4.Disruptor的数据结构与生产消费模型
5.RingBuffer + Disruptor + Sequence相关类
6.Disruptor的WaitStrategy消费者等待策略
7.EventProcessor + EventHandler等类
8.Disruptor的运行原理图
9.复杂业务需求下的编码方案和框架
10.Disruptor的串行操作
11.Disruptor的并行操作
12.Disruptor的多边形操作
13.Disruptor的多生产者和多消费者
1.Disruptor简介
(1)Disruptor是什么
(2)Disruptor的特点
(3)Disruptor的核心
(1)Disruptor是什么
Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,能够以很低的延迟产生大量的交易。LMAX是建立在JVM平台上,其核心是一个业务逻辑处理器,能够在一个线程里每秒处理6百万订单。LMAX业务逻辑处理器完全是运行在内存中,使用事件驱动方式,其核心是Disruptor。
(2)Disruptor的特点
大大简化了并发程序开发的难度,性能上比Java提供的一些并发包还好。
Disruptor是一个高性能异步处理框架,实现了观察者模式。Disruptor是无锁的、是CPU友好的。Disruptor不会清除缓存中的数据,只会覆盖缓存中的数据,不需要进行垃圾回收。Disruptor业务逻辑是纯内存操作,使用事件驱动方式。
(3)Disruptor的核心
Disruptor核心是一个RingBuffer,RingBuffer是一个数组,没有首尾指针。RingBuffer是一个首尾相接的环,用于在不同线程之间传递数据。
如果RingBuffer满了,是继续覆盖还是等待消费,由生产者和消费者决定。假设RingBuffer满了,生产者有两个选择:选择一是等待RingBuffer有空位再填充,选择二是直接覆盖。同时消费者也有两种选择:选择一是等待RingBuffer满了再消费,选择二是RingBuffer填充一个就消费一个。
RingBuffer有一个序号Sequence,这个序号指向数组中下一个可用元素。随着数据不断地填充这个数组,这个序号会一直增长,直到绕过这个环。序号指向的元素,可以通过mod计算:序号 % 长度 = 索引。建议将长度设为2的n次方,有利于二进制计算:序号 & (长度 - 1) = 索引。
Sequence通过顺序递增的序号来进行编号,以此管理正在进行交换的数据(事件)。对数据处理的过程总是沿着需要逐个递增处理,从而实现线程安全。一个Sequence用于跟踪标识某个特定的事件处理者的处理进度。
2.Disruptor和BlockingQueue的压测对比
Disruptor的性能是ArrayBlockingQueue的3倍+,这里的测试代码都是基于单线程的单生产者单消费者模式运行的。但是Disruptor本身就支持多生产者多消费者模型,测试中使用单线程明显降低了其性能。而ArrayBlockingQueue在多生产者多消费者场景下,其性能又会比单生产者单消费者场景下更低。因此,在实际应用中,Disruptor的性能会是ArrayBlockingQueue的3倍+。
public interface Constants { int EVENT_NUM_OHM = 100000000; int EVENT_NUM_FM = 50000000; int EVENT_NUM_OM = 10000000; } public class ArrayBlockingQueue4Test { public static void main(String[] args) { //初始化一个大小为100000000的有界队列ArrayBlockingQueue,为了避免在测试时由于扩容影响性能,所以一开始就初始化大小为1亿 final ArrayBlockingQueue<Data> queue = new ArrayBlockingQueue<Data>(100000000); //开始时间 final long startTime = System.currentTimeMillis(); //向容器中添加元素 new Thread(new Runnable() { public void run() { long i = 0; //首先把数据投递到有界队列ArrayBlockingQueue,单线程的生产者 while (i < Constants.EVENT_NUM_OHM) { Data data = new Data(i, "c" + i); try { queue.put(data); } catch (InterruptedException e) { e.printStackTrace(); } i++; } } }).start(); //从容器中取出元素 new Thread(new Runnable() { public void run() { int k = 0; //然后才开始消费有界队列中的数据,单线程的消费者 while (k < Constants.EVENT_NUM_OHM) { try { queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } k++; } //结束时间 long endTime = System.currentTimeMillis(); //整个main函数就是单线程运行,处理1千万数据,大概耗时3.6秒 System.out.println("ArrayBlockingQueue costTime = " + (endTime - startTime) + "ms"); } }).start(); } } public class DisruptorSingle4Test { public static void main(String[] args) { int ringBufferSize = 65536; final Disruptor<Data> disruptor = new Disruptor<Data>( new EventFactory<Data>() { public Data newInstance() { return new Data(); } }, ringBufferSize, //设置为单线程运行 Executors.newSingleThreadExecutor(), //单生产者模式 ProducerType.SINGLE, //new BlockingWaitStrategy() new YieldingWaitStrategy() ); //创建一个消费者事件处理器 DataConsumer consumer = new DataConsumer(); //消费数据 disruptor.handleEventsWith(consumer); disruptor.start(); //单线程的消费者 new Thread(new Runnable() { public void run() { RingBuffer<Data> ringBuffer = disruptor.getRingBuffer(); for (long i = 0; i < Constants.EVENT_NUM_OHM; i++) { long seq = ringBuffer.next(); Data data = ringBuffer.get(seq); data.setId(i); data.setName("c" + i); //发布一个数据被消费的事件 ringBuffer.publish(seq); } } }).start(); } } public class DataConsumer implements EventHandler<Data> { private long startTime; private int i; public DataConsumer() { this.startTime = System.currentTimeMillis(); } public void onEvent(Data data, long seq, boolean bool) throws Exception { i++; if (i == Constants.EVENT_NUM_OHM) { long endTime = System.currentTimeMillis(); //处理1千万的数据,大概耗时1.1秒 System.out.println("Disruptor costTime = " + (endTime - startTime) + "ms"); //可见Disruptor的性能是ArrayBlockingQueue的3倍+ } } }
3.Disruptor的编程模型
(1)Disruptor的使用步骤
(2)Disruptor的使用演示
(1)Disruptor的使用步骤
步骤一:建立一个Event工厂类,用于创建数据(Event类实例对象) 步骤二:建立一个监听事件类(Event处理器),用于处理数据(Event类实例对象) 步骤三:创建Disruptor实例,配置一系列参数 步骤四:编写生产者组件,向Disruptor容器投递数据
(2)Disruptor的使用演示
一.引入pom依赖
<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.3.2</version> </dependency>
二.建立Event工厂类用于创建数据
Event工厂类创建的数据就是Event类实例对象。
public class OrderEvent { //订单的价格 private long value; public long getValue() { return value; } public void setValue(long value) { this.value = value; } } public class OrderEventFactory implements EventFactory<OrderEvent> { //返回一个空的数据对象(OrderEvent对象实例) public OrderEvent newInstance() { return new OrderEvent(); } }
三.建立监听事件类用于处理数据
监听事件类就是Event处理器,处理的数据就是Event类实例对象。
public class OrderEventHandler implements EventHandler<OrderEvent> { public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception { Thread.sleep(1000); System.err.println("消费者: " + event.getValue()); } }
四.创建Disruptor对象实例
public class Main { public static void main(String[] args) { //参数准备 OrderEventFactory orderEventFactory = new OrderEventFactory(); int ringBufferSize = 4; ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); //参数一:eventFactory,消息(Event)工厂对象 //参数二:ringBufferSize,容器的长度 //参数三:executor,线程池(建议使用自定义线程池),RejectedExecutionHandler //参数四:ProducerType,单生产者还是多生产者 //参数五:waitStrategy,等待策略 //1.实例化Disruptor对象 Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>( orderEventFactory, ringBufferSize, executor, ProducerType.SINGLE,//单生产者 new BlockingWaitStrategy() ); //2.添加Event处理器,用于处理事件 //也就是构建Disruptor与消费者的一个关联关系 disruptor.handleEventsWith(new OrderEventHandler()); //3.启动disruptor disruptor.start(); ... } }
五.编写生产者组件向Disruptor容器投递数据
public class Main { public static void main(String[] args) { //参数准备 OrderEventFactory orderEventFactory = new OrderEventFactory(); int ringBufferSize = 4; ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); //参数一:eventFactory,消息(Event)工厂对象 //参数二:ringBufferSize,容器的长度 //参数三:executor,线程池(建议使用自定义线程池),RejectedExecutionHandler //参数四:ProducerType,单生产者还是多生产者 //参数五:waitStrategy,等待策略 //1.实例化Disruptor对象 Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>( orderEventFactory, ringBufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy() ); //2.添加Event处理器,用于处理事件 //也就是构建Disruptor与消费者的一个关联关系 disruptor.handleEventsWith(new OrderEventHandler()); //3.启动disruptor disruptor.start(); //4.获取实际存储数据的容器: RingBuffer RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer(); OrderEventProducer producer = new OrderEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long i = 0; i < 5; i++) { bb.putLong(0, i); //向容器中投递数据 producer.sendData(bb); } disruptor.shutdown(); executor.shutdown(); } } public class OrderEventProducer { private RingBuffer<OrderEvent> ringBuffer; public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void sendData(ByteBuffer data) { //1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号 long sequence = ringBuffer.next(); try { //2.根据这个序号, 找到具体的"OrderEvent"元素 //注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象" OrderEvent event = ringBuffer.get(sequence); //3.进行实际的赋值处理 event.setValue(data.getLong(0)); } finally { //4.提交发布操作 ringBuffer.publish(sequence); } } }
4.Disruptor的数据结构与生产消费模型
(1)Disruptor的核心与原理
(2)Disruptor的RingBuffer数据结构
(3)Disruptor的生产消费模型
(1)Disruptor的核心与原理
Disruptor的核心是RingBuffer,生产者向RingBuffer中写入元素,消费者从RingBuffer中消费元素。
(2)Disruptor的RingBuffer数据结构
RingBuffer是一个首尾相接的环(数组),用于在不同上下文(线程)之间传递数据。
RingBuffer拥有一个序号,这个序号指向数组中下一个可用的元素。随着生产者不停地往RingBuffer写入元素,这个序号也会一直增长,直到这个序号绕过这个环。
要找到RingBuffer数组中当前序号指向的元素,可以通过mod操作:序号 % 数组长度 = 数组索引。建议将长度设为2的n次方,有利于二进制计算:序号 & (长度 - 1) = 索引。
(3)Disruptor的生产消费模型
一.消费快生产慢
如果消费者从RingBuffer消费元素的速度大于生产者写入元素的速度,那么当消费者发现RingBuffer没有元素时,就要停下等待生产者写入元素。
二.生产快消费慢
如果生产者向RingBuffer写入元素的速度大于消费者消费元素的速度,那么当生产者发现RingBuffer已经满了,就要停下等待消费者消费元素。
因为RingBuffer数组的长度是有限的,生产者写入到RingBuffer的末尾时,会从RingBuffer的开始位置继续写入,这时候生产者就可能会追上消费者。
5.RingBuffer + Disruptor + Sequence相关类
(1)RingBuffer类
(2)Disruptor类
(3)Sequence类
(4)Sequencer接口
(5)SequenceBarrier类
(1)RingBuffer类
RingBuffer不仅是基于数组的缓存,也是创建Sequencer与定义WaitStrategy的入口。
(2)Disruptor类
Disruptor类可认为是一个持有RingBuffer、消费者线程池、消费者集合等引用的辅助类。
(3)Sequence类
通过顺序递增的序号来编号,管理正在进行交换的数据(事件)。对数据(事件)的处理总是沿着序号逐个递增,所以能够实现多线程下的并发安全与原子性。
一个Sequence用于跟踪标识某个特定的事件处理者的处理进度,也就是事件处理者在RingBuffer中的处理进度。每一个Producer和Consumer都有一个自己的Sequence。
Sequence可以看成是一个AtomicLong类型字段,用于标识进度。Sequence还可以防止不同Sequence之间CPU缓存的伪共享问题。
Sequence的两个作用:
作用一:用于递增标识进度
作用二:用于消除伪共享
(4)Sequencer接口
一.Sequencer包含Sequence
二.Sequencer接口有两个实现类
第一个实现类是SingleProducerSequencer
第二个实现类是MultiProducerSequencer
(5)SequenceBarrier类
作用一:用于保持对RingBuffer的生产者和消费者之间的平衡关系,比如让生产者或消费者进行等待、唤醒生产者或消费者
作用二:决定消费者是否还有可处理的事件
6.Disruptor的WaitStrategy消费者等待策略
(1)WaitStrategy接口的作用
(2)消费者等待策略的种类
(3)BlockingWaitStrategy
(4)SleepingWaitStrategy
(5)YieldingWaitStrategy
(1)WaitStrategy接口的作用
决定一个消费者将会如何等待生产者将Event投递到Disruptor。
(2)消费者等待策略的种类
BlockingWaitStrategy,通过阻塞的方式进行等待 SleepingWaitStrategy,通过休眠的方式进行等待 YieldingWaitStrategy,通过线程间的切换的方式进行等待
(3)BlockingWaitStrategy
BlockingWaitStrategy是最低效的等待策略,但是对CPU的消耗最小,并且在各种不同部署环境中能提供一致的性能表现。该策略需要使用到Java中的锁,也就是会通过ReentrantLock来阻塞消费者线程。而Disruptor本身是一个无锁并发框架,所以如果追求高性能,就不要选择这种策略。
(4)SleepingWaitStrategy
SleepingWaitStrategy是性能一般的等待策略,其性能表现和BlockingWaitStrategy差不多。但由于SleepingWaitStrategy是无锁的,所以对生产者线程的影响最小。该策略对CPU的消耗一般,通过在单个线程循环 + yield切换线程实现,所以这种策略特别适合于异步日志类似的场景。
(5)YieldingWaitStrategy
YieldingWaitStrategy的性能是最好的,适合于低延迟的系统。不过该策略对CPU的消耗最高,因为完全基于yield切换线程来实现。推荐用于要求高性能且事件处理线程数小于CPU逻辑核心数的场景中,尤其是当CPU开启了超线程特性的时候。
7.EventProcessor + EventHandler等类
(1)Event对象
(2)EventProcessor接口
(3)EventHandler接口
(4)WorkProcessor类
(1)Event对象
Disruptor中的Event指的是从生产者到消费者过程中所处理的数据对象。Disruptor中没有代码表示Event,它用泛型表示,完全由用户定义。比如创建一个RingBuffer对象时,其中的泛型就表示着这个Event对象。
(2)EventProcessor接口
EventProcessor用于处理Disruptor中的Event,拥有消费者的Sequence,它有一个实现类叫BatchEventProcessor。
由于EventProcessor接口继承自Runnable接口,所以BatchEventProcessor类会实现Runnable接口的run()方法。
其实BatchEventProcessor类是Disruptor框架中最核心的类,因为它的run()方法会不断轮询并获取数据对象,然后把数据对象(Event)交给消费者去处理,也就是即回调EventHandler接口的实现类对象的onEvent()方法。
(3)EventHandler接口
EventHandler是由用户实现的并且代表了Disruptor中的一个消费者接口,也就是消费者逻辑需要在EventHandler接口的onEvent()方法实现。
(4)WorkProcessor类
WorkProcessor类可确保每个Sequence只被一个Processor消费。注意:在单消费者模式下,使用的是EventHandler,对应于EventProcessor。在多消费者模式下,使用的是WorkHandler,对应于WorkProcessor。
8.Disruptor的运行原理图
