Disruptor—3.核心源码实现分析

大纲

1.Disruptor的生产者源码分析

2.Disruptor的消费者源码分析

3.Disruptor的WaitStrategy等待策略分析

4.Disruptor的高性能原因

5.Disruptor高性能之数据结构(内存预加载机制)

6.Disruptor高性能之内核(使用单线程写)

7.Disruptor高性能之系统内存优化(内存屏障)

8.Disruptor高性能之系统缓存优化(消除伪共享)

9.Disruptor高性能之序号获取优化(自旋 + CAS)

 

1.Disruptor的生产者源码分析

(1)通过Sequence序号发布消息

(2)通过Translator事件转换器发布消息

 

(1)通过Sequence序号发布消息

生产者可以先从RingBuffer中获取一个可用的Sequence序号,然后再根据该Sequence序号从RingBuffer的环形数组中获取对应的元素,接着对该元素进行赋值替换,最后调用RingBuffer的publish()方法设置当前生产者的Sequence序号来完成事件消息的发布。

//注意:这里使用的版本是3.4.4 //单生产者单消费者的使用示例 public class Main {     public static void main(String[] args) {         //参数准备         OrderEventFactory orderEventFactory = new OrderEventFactory();         int ringBufferSize = 4;         ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());            //参数一:eventFactory,消息(Event)工厂对象         //参数二:ringBufferSize,容器的长度         //参数三:executor,线程池(建议使用自定义线程池),RejectedExecutionHandler         //参数四:ProducerType,单生产者还是多生产者         //参数五:waitStrategy,等待策略         //1.实例化Disruptor对象         Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(             orderEventFactory,             ringBufferSize,             executor,             ProducerType.SINGLE,             new BlockingWaitStrategy()         );            //2.添加Event处理器,用于处理事件         //也就是构建Disruptor与消费者的一个关联关系         disruptor.handleEventsWith(new OrderEventHandler());            //3.启动Disruptor         disruptor.start();            //4.获取实际存储数据的容器: RingBuffer         RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();         OrderEventProducer producer = new OrderEventProducer(ringBuffer);         ByteBuffer bb = ByteBuffer.allocate(8);         for (long i = 0; i < 5; i++) {             bb.putLong(0, i);             //向容器中投递数据             producer.sendData(bb);         }         disruptor.shutdown();         executor.shutdown();     } }  public class OrderEventProducer {     private RingBuffer<OrderEvent> ringBuffer;          public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {         this.ringBuffer = ringBuffer;     }          public void sendData(ByteBuffer data) {         //1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号         long sequence = ringBuffer.next();         try {             //2.根据这个序号, 找到具体的"OrderEvent"元素             //注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象"             OrderEvent event = ringBuffer.get(sequence);             //3.进行实际的赋值处理             event.setValue(data.getLong(0));         } finally {             //4.提交发布操作             ringBuffer.publish(sequence);         }     } }  public class OrderEventHandler implements EventHandler<OrderEvent> {     public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {         Thread.sleep(1000);         System.err.println("消费者: " + event.getValue());     } }
//多生产者多消费者的使用示例 public class Main {     public static void main(String[] args) throws InterruptedException {         //1.创建RingBuffer         RingBuffer<Order> ringBuffer = RingBuffer.create(             ProducerType.MULTI,//多生产者             new EventFactory<Order>() {                 public Order newInstance() {                     return new Order();                 }             },             1024 * 1024,             new YieldingWaitStrategy()         );          //2.通过ringBuffer创建一个屏障         SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();          //3.创建消费者数组,每个消费者Consumer都需要实现WorkHandler接口         Consumer[] consumers = new Consumer[10];         for (int i = 0; i < consumers.length; i++) {             consumers[i] = new Consumer("C" + i);         }          //4.构建多消费者工作池WorkerPool,因为多消费者模式下需要使用WorkerPool         WorkerPool<Order> workerPool = new WorkerPool<Order>(             ringBuffer,             sequenceBarrier,             new EventExceptionHandler(),             consumers         );          //5.设置多个消费者的sequence序号,用于单独统计每个消费者的消费进度, 并且设置到RingBuffer中         ringBuffer.addGatingSequences(workerPool.getWorkerSequences());          //6.启动workerPool         workerPool.start(Executors.newFixedThreadPool(5));          final CountDownLatch latch = new CountDownLatch(1);         for (int i = 0; i < 100; i++) {             final Producer producer = new Producer(ringBuffer);             new Thread(new Runnable() {                 public void run() {                     try {                         latch.await();                     } catch (Exception e) {                         e.printStackTrace();                     }                     for (int j = 0; j < 100; j++) {                         producer.sendData(UUID.randomUUID().toString());                     }                 }             }).start();         }          Thread.sleep(2000);         System.err.println("----------线程创建完毕,开始生产数据----------");         latch.countDown();         Thread.sleep(10000);         System.err.println("任务总数:" + consumers[2].getCount());     } }  public class Producer {     private RingBuffer<Order> ringBuffer;          public Producer(RingBuffer<Order> ringBuffer) {         this.ringBuffer = ringBuffer;     }      public void sendData(String uuid) {         //1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号         long sequence = ringBuffer.next();         try {             //2.根据这个序号, 找到具体的"Order"元素             //注意:此时获取的Order对象是一个没有被赋值的"空对象"             Order order = ringBuffer.get(sequence);             //3.进行实际的赋值处理             order.setId(uuid);         } finally {             //4.提交发布操作             ringBuffer.publish(sequence);         }     } }  public class Consumer implements WorkHandler<Order> {     private static AtomicInteger count = new AtomicInteger(0);     private String consumerId;     private Random random = new Random();      public Consumer(String consumerId) {         this.consumerId = consumerId;     }      public void onEvent(Order event) throws Exception {         Thread.sleep(1 * random.nextInt(5));         System.err.println("当前消费者: " + this.consumerId + ", 消费信息ID: " + event.getId());         count.incrementAndGet();     }      public int getCount() {         return count.get();     } }

其中,RingBuffer的publish(sequence)方法会调用Sequencer接口的publish()方法来设置当前生产者的Sequence序号。

abstract class RingBufferPad {     protected long p1, p2, p3, p4, p5, p6, p7; }  abstract class RingBufferFields<E> extends RingBufferPad {     ...     private static final Unsafe UNSAFE = Util.getUnsafe();     private final long indexMask;      //环形数组存储事件消息     private final Object[] entries;     protected final int bufferSize;      //RingBuffer的sequencer属性代表了当前线程对应的生产者     protected final Sequencer sequencer;          RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {         this.sequencer = sequencer;         this.bufferSize = sequencer.getBufferSize();         if (bufferSize < 1) {             throw new IllegalArgumentException("bufferSize must not be less than 1");         }         if (Integer.bitCount(bufferSize) != 1) {             throw new IllegalArgumentException("bufferSize must be a power of 2");         }         this.indexMask = bufferSize - 1;         //初始化数组         this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];         //内存预加载         fill(eventFactory);     }          private void fill(EventFactory<E> eventFactory) {         for (int i = 0; i < bufferSize; i++) {             entries[BUFFER_PAD + i] = eventFactory.newInstance();         }     }          protected final E elementAt(long sequence) {         return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));     }     ... }  public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {     protected long p1, p2, p3, p4, p5, p6, p7;     ...          //Increment and return the next sequence for the ring buffer.       //Calls of this method should ensure that they always publish the sequence afterward.       //E.g.     //long sequence = ringBuffer.next();     //try {     //    Event e = ringBuffer.get(sequence);     //    //Do some work with the event.     //} finally {     //    ringBuffer.publish(sequence);     //}     //@return The next sequence to publish to.     //@see RingBuffer#publish(long)     //@see RingBuffer#get(long)     @Override     public long next() {         return sequencer.next();     }          //Publish the specified sequence.     //This action marks this particular message as being available to be read.     //@param sequence the sequence to publish.     @Override     public void publish(long sequence) {         sequencer.publish(sequence);     }          //Get the event for a given sequence in the RingBuffer.     //This call has 2 uses.       //Firstly use this call when publishing to a ring buffer.     //After calling RingBuffer#next() use this call to get hold of the preallocated event to fill with data before calling RingBuffer#publish(long).     //Secondly use this call when consuming data from the ring buffer.       //After calling SequenceBarrier#waitFor(long) call this method with any value greater than that      //your current consumer sequence and less than or equal to the value returned from the SequenceBarrier#waitFor(long) method.     //@param sequence for the event     //@return the event for the given sequence     @Override     public E get(long sequence) {         //调用父类RingBufferFields的elementAt()方法         return elementAt(sequence);     }     ... }

RingBuffer的sequencer属性会在创建RingBuffer对象时传入,而创建RingBuffer对象的时机则是在初始化Disruptor的时候。

 

在Disruptor的构造方法中,会调用RingBuffer的create()方法,RingBuffer的create()方法会根据不同的生产者类型来初始化sequencer属性。

 

由生产者线程通过new创建的Sequencer接口实现类的实例就是一个生产者。单生产者的线程执行上面的main()方法时,会创建一个单生产者Sequencer实例来代表生产者。多生产者的线程执行如下的main()方法时,会创建一个多生产者Sequencer实例来代表生产者。

public class Disruptor<T> {     private final RingBuffer<T> ringBuffer;     private final Executor executor;     private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();     private final AtomicBoolean started = new AtomicBoolean(false);     private ExceptionHandler<? super T> exceptionHandler;     ...          //Create a new Disruptor.     //@param eventFactory   the factory to create events in the ring buffer.     //@param ringBufferSize the size of the ring buffer, must be power of 2.     //@param executor       an Executor to execute event processors.     //@param producerType   the claim strategy to use for the ring buffer.     //@param waitStrategy   the wait strategy to use for the ring buffer.     public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor, final ProducerType producerType, final WaitStrategy waitStrategy) {         this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor);     }          private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor) {         this.ringBuffer = ringBuffer;         this.executor = executor;     }     ... }  public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {     protected long p1, p2, p3, p4, p5, p6, p7;     ...          //Create a new Ring Buffer with the specified producer type (SINGLE or MULTI)     //@param producerType producer type to use ProducerType.     //@param factory used to create events within the ring buffer.     //@param bufferSize number of elements to create within the ring buffer.     //@param waitStrategy used to determine how to wait for new elements to become available.     public static <E> RingBuffer<E> create(ProducerType producerType, EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {         switch (producerType) {             case SINGLE:                 //单生产者模式下的当前生产者是一个SingleProducerSequencer实例                 return createSingleProducer(factory, bufferSize, waitStrategy);             case MULTI:                 //多生产者模式下的当前生产者是一个MultiProducerSequencer实例                 return createMultiProducer(factory, bufferSize, waitStrategy);             default:                 throw new IllegalStateException(producerType.toString());         }     }          //Create a new single producer RingBuffer with the specified wait strategy.     //@param <E> Class of the event stored in the ring buffer.     //@param factory      used to create the events within the ring buffer.     //@param bufferSize   number of elements to create within the ring buffer.     //@param waitStrategy used to determine how to wait for new elements to become available.     //@return a constructed ring buffer.     public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {         SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);         return new RingBuffer<E>(factory, sequencer);     }          //Create a new multiple producer RingBuffer with the specified wait strategy.     //@param <E> Class of the event stored in the ring buffer.     //@param factory      used to create the events within the ring buffer.     //@param bufferSize   number of elements to create within the ring buffer.     //@param waitStrategy used to determine how to wait for new elements to become available.     //@return a constructed ring buffer.     public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {         MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);         return new RingBuffer<E>(factory, sequencer);     }          //Construct a RingBuffer with the full option set.     //@param eventFactory to newInstance entries for filling the RingBuffer     //@param sequencer    sequencer to handle the ordering of events moving through the RingBuffer.     RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) {         super(eventFactory, sequencer);     }     ... }  abstract class RingBufferPad {     protected long p1, p2, p3, p4, p5, p6, p7; }  abstract class RingBufferFields<E> extends RingBufferPad {     ...     private final long indexMask;     //环形数组存储事件消息     private final Object[] entries;     protected final int bufferSize;     //RingBuffer的sequencer属性代表了当前线程对应的生产者     protected final Sequencer sequencer;          RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {         this.sequencer = sequencer;         this.bufferSize = sequencer.getBufferSize();         if (bufferSize < 1) {             throw new IllegalArgumentException("bufferSize must not be less than 1");         }         if (Integer.bitCount(bufferSize) != 1) {             throw new IllegalArgumentException("bufferSize must be a power of 2");         }         this.indexMask = bufferSize - 1;         //初始化数组         this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];         //内存预加载         fill(eventFactory);     }          private void fill(EventFactory<E> eventFactory) {         for (int i = 0; i < bufferSize; i++) {             entries[BUFFER_PAD + i] = eventFactory.newInstance();         }     }     ... }

SingleProducerSequencer的publish()方法在发布事件消息时,首先会设置当前生产者的Sequence,然后会通过等待策略通知阻塞的消费者。

public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {     ...     //Publish the specified sequence.     //This action marks this particular message as being available to be read.     //@param sequence the sequence to publish.     @Override     public void publish(long sequence) {         sequencer.publish(sequence);     }     ... }  public abstract class AbstractSequencer implements Sequencer {     private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =         AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");     //环形数组的大小     protected final int bufferSize;     //等待策略     protected final WaitStrategy waitStrategy;     //当前生产者的进度     protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);     //每一个Sequence都对应着一个消费者(一个EventHandler或者一个WorkHandler)     //这些Sequence会通过SEQUENCE_UPDATER在执行Disruptor的handleEventsWith()等方法时,     //由RingBuffer的addGatingSequences()方法进行添加     protected volatile Sequence[] gatingSequences = new Sequence[0];     ...          //Create with the specified buffer size and wait strategy.     //@param bufferSize The total number of entries, must be a positive power of 2.     //@param waitStrategy     public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy) {         if (bufferSize < 1) {             throw new IllegalArgumentException("bufferSize must not be less than 1");         }         if (Integer.bitCount(bufferSize) != 1) {             throw new IllegalArgumentException("bufferSize must be a power of 2");         }         this.bufferSize = bufferSize;         this.waitStrategy = waitStrategy;     }     ... }  abstract class SingleProducerSequencerPad extends AbstractSequencer {     protected long p1, p2, p3, p4, p5, p6, p7;          public SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy) {         super(bufferSize, waitStrategy);     } }  abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad {     public SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy) {         super(bufferSize, waitStrategy);     }          //表示生产者的当前序号,值为-1     protected long nextValue = Sequence.INITIAL_VALUE;     //表示消费者的最小序号,值为-1     protected long cachedValue = Sequence.INITIAL_VALUE; }  public final class SingleProducerSequencer extends SingleProducerSequencerFields {     protected long p1, p2, p3, p4, p5, p6, p7;          //Construct a Sequencer with the selected wait strategy and buffer size.     //@param bufferSize   the size of the buffer that this will sequence over.     //@param waitStrategy for those waiting on sequences.     public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy) {         super(bufferSize, waitStrategy);     }      @Override     public void publish(long sequence) {         //设置当前生产者的进度,cursor代表了当前生产者的Sequence         cursor.set(sequence);         //通过等待策略通知阻塞的消费者         waitStrategy.signalAllWhenBlocking();     }          @Override     public long next() {         return next(1);     }          @Override     public long next(int n) {         if (n < 1) {             throw new IllegalArgumentException("n must be > 0");         }         long nextValue = this.nextValue;         long nextSequence = nextValue + n;         long wrapPoint = nextSequence - bufferSize;         long cachedGatingSequence = this.cachedValue;         if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {             long minSequence;             while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {                 LockSupport.parkNanos(1L);              }             this.cachedValue = minSequence;         }         this.nextValue = nextSequence;         return nextSequence;     }     ... }  class LhsPadding {     protected long p1, p2, p3, p4, p5, p6, p7; }  class Value extends LhsPadding {     protected volatile long value; }  class RhsPadding extends Value {     protected long p9, p10, p11, p12, p13, p14, p15; }  //Concurrent sequence class used for tracking the progress of the ring buffer and event processors.   //Support a number of concurrent operations including CAS and order writes. //Also attempts to be more efficient with regards to false sharing by adding padding around the volatile field. public class Sequence extends RhsPadding {     static final long INITIAL_VALUE = -1L;     private static final Unsafe UNSAFE;     private static final long VALUE_OFFSET;      static {         UNSAFE = Util.getUnsafe();         VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));     }      //Create a sequence initialised to -1.     public Sequence() {         this(INITIAL_VALUE);     }      //Create a sequence with a specified initial value.     //@param initialValue The initial value for this sequence.     public Sequence(final long initialValue) {         UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);     }      //Perform a volatile read of this sequence's value.     //@return The current value of the sequence.     public long get() {         return value;     }      //Perform an ordered write of this sequence.       //The intent is a Store/Store barrier between this write and any previous store.     //@param value The new value for the sequence.     public void set(final long value) {         UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);     }      //Performs a volatile write of this sequence.       //The intent is a Store/Store barrier between this write and      //any previous write and a Store/Load barrier between this write and      //any subsequent volatile read.     //@param value The new value for the sequence.     public void setVolatile(final long value) {         UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);     }      //Perform a compare and set operation on the sequence.     //@param expectedValue The expected current value.     //@param newValue The value to update to.     //@return true if the operation succeeds, false otherwise.     public boolean compareAndSet(final long expectedValue, final long newValue) {         return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);     }      //Atomically increment the sequence by one.     //@return The value after the increment     public long incrementAndGet() {         return addAndGet(1L);     }      //Atomically add the supplied value.     //@param increment The value to add to the sequence.     //@return The value after the increment.     public long addAndGet(final long increment) {         long currentValue;         long newValue;         do {             currentValue = get();             newValue = currentValue + increment;         } while (!compareAndSet(currentValue, newValue));         return newValue;     }      @Override     public String toString() {         return Long.toString(get());     } }

MultiProducerSequencer的publish()方法在发布事件消息时,则会通过UnSafe设置sequence在int数组中对应元素的值。

public final class MultiProducerSequencer extends AbstractSequencer {     private static final Unsafe UNSAFE = Util.getUnsafe();     private static final long BASE = UNSAFE.arrayBaseOffset(int[].class);     private static final long SCALE = UNSAFE.arrayIndexScale(int[].class);     private final int[] availableBuffer;     private final int indexMask;     private final int indexShift;          //Construct a Sequencer with the selected wait strategy and buffer size.     //@param bufferSize   the size of the buffer that this will sequence over.     //@param waitStrategy for those waiting on sequences.     public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy) {         super(bufferSize, waitStrategy);         availableBuffer = new int[bufferSize];         indexMask = bufferSize - 1;         indexShift = Util.log2(bufferSize);         initialiseAvailableBuffer();     }          private void initialiseAvailableBuffer() {         for (int i = availableBuffer.length - 1; i != 0; i--) {             setAvailableBufferValue(i, -1);         }         setAvailableBufferValue(0, -1);     }          private void setAvailableBufferValue(int index, int flag) {         long bufferAddress = (index * SCALE) + BASE;         UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);     }      @Override     public void publish(final long sequence) {         setAvailable(sequence);         waitStrategy.signalAllWhenBlocking();     }          //The below methods work on the availableBuffer flag.     //The prime reason is to avoid a shared sequence object between publisher threads.     //(Keeping single pointers tracking start and end would require coordination between the threads).     //--  Firstly we have the constraint that the delta between the cursor and minimum gating sequence      //will never be larger than the buffer size (the code in next/tryNext in the Sequence takes care of that).     //-- Given that; take the sequence value and mask off the lower portion of the sequence      //as the index into the buffer (indexMask). (aka modulo operator)     //-- The upper portion of the sequence becomes the value to check for availability.     //ie: it tells us how many times around the ring buffer we've been (aka division)     //-- Because we can't wrap without the gating sequences moving forward      //(i.e. the minimum gating sequence is effectively our last available position in the buffer),      //when we have new data and successfully claimed a slot we can simply write over the top.     private void setAvailable(final long sequence) {         setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));     }      private int calculateIndex(final long sequence) {         return ((int) sequence) & indexMask;     }          private int calculateAvailabilityFlag(final long sequence) {         return (int) (sequence >>> indexShift);     }          @Override     public long next() {         return next(1);     }          @Override     public long next(int n) {         if (n < 1) {             throw new IllegalArgumentException("n must be > 0");         }                   long current;         long next;         do {             current = cursor.get();             next = current + n;             long wrapPoint = next - bufferSize;             long cachedGatingSequence = gatingSequenceCache.get();                if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {                 long gatingSequence = Util.getMinimumSequence(gatingSequences, current);                 if (wrapPoint > gatingSequence) {                     LockSupport.parkNanos(1);                      continue;                 }                 gatingSequenceCache.set(gatingSequence);             } else if (cursor.compareAndSet(current, next)) {                 break;             }         } while (true);         return next;     }     ... }

(2)通过Translator事件转换器发布消息

生产者还可以直接调用RingBuffer的tryPublishEvent()方法来完成发布事件消息到RingBuffer。该方法首先会调用Sequencer接口的tryNext()方法获取sequence序号,然后根据该sequence序号从RingBuffer的环形数组中获取对应的元素,接着再调用RingBuffer的translateAndPublish()方法将事件消息赋值替换到该元素中,最后调用Sequencer接口的publish()方法设置当前生产者的sequence序号来完成事件消息的发布。

abstract class RingBufferPad {     protected long p1, p2, p3, p4, p5, p6, p7; }  abstract class RingBufferFields<E> extends RingBufferPad {     ...     private static final Unsafe UNSAFE = Util.getUnsafe();     private final long indexMask;     //环形数组存储事件消息     private final Object[] entries;     protected final int bufferSize;     //RingBuffer的sequencer属性代表了当前线程对应的生产者     protected final Sequencer sequencer;          RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {         this.sequencer = sequencer;         this.bufferSize = sequencer.getBufferSize();         if (bufferSize < 1) {             throw new IllegalArgumentException("bufferSize must not be less than 1");         }         if (Integer.bitCount(bufferSize) != 1) {             throw new IllegalArgumentException("bufferSize must be a power of 2");         }         this.indexMask = bufferSize - 1;         //初始化数组         this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];         //内存预加载         fill(eventFactory);     }          private void fill(EventFactory<E> eventFactory) {         for (int i = 0; i < bufferSize; i++) {             entries[BUFFER_PAD + i] = eventFactory.newInstance();         }     }          protected final E elementAt(long sequence) {         return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));     }     ... }  public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {     //值为-1     public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;     protected long p1, p2, p3, p4, p5, p6, p7;      //Construct a RingBuffer with the full option set.     //@param eventFactory to newInstance entries for filling the RingBuffer     //@param sequencer    sequencer to handle the ordering of events moving through the RingBuffer.     RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) {         super(eventFactory, sequencer);     }          @Override     public boolean tryPublishEvent(EventTranslator<E> translator) {         try {             final long sequence = sequencer.tryNext();             translateAndPublish(translator, sequence);             return true;         } catch (InsufficientCapacityException e) {             return false;         }     }          private void translateAndPublish(EventTranslator<E> translator, long sequence) {         try {             translator.translateTo(get(sequence), sequence);         } finally {             sequencer.publish(sequence);         }     }          //Get the event for a given sequence in the RingBuffer.     //This call has 2 uses.       //Firstly use this call when publishing to a ring buffer.     //After calling RingBuffer#next() use this call to get hold of the preallocated event to fill with data before calling RingBuffer#publish(long).     //Secondly use this call when consuming data from the ring buffer.       //After calling SequenceBarrier#waitFor(long) call this method with any value greater than that      //your current consumer sequence and less than or equal to the value returned from the SequenceBarrier#waitFor(long) method.     //@param sequence for the event     //@return the event for the given sequence     @Override     public E get(long sequence) {         //调用父类RingBufferFields的elementAt()方法         return elementAt(sequence);     }     ... }  public abstract class AbstractSequencer implements Sequencer {     private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =         AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");     //环形数组的大小     protected final int bufferSize;     //等待策略     protected final WaitStrategy waitStrategy;     //当前生产者的进度     protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);     //每一个Sequence都对应着一个消费者(一个EventHandler或者一个WorkHandler)     //这些Sequence会通过SEQUENCE_UPDATER在执行Disruptor的handleEventsWith()等方法时,     //由RingBuffer的addGatingSequences()方法进行添加     protected volatile Sequence[] gatingSequences = new Sequence[0];     ...          //Create with the specified buffer size and wait strategy.     //@param bufferSize The total number of entries, must be a positive power of 2.     //@param waitStrategy     public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy) {         if (bufferSize < 1) {             throw new IllegalArgumentException("bufferSize must not be less than 1");         }         if (Integer.bitCount(bufferSize) != 1) {             throw new IllegalArgumentException("bufferSize must be a power of 2");         }         this.bufferSize = bufferSize;         this.waitStrategy = waitStrategy;     }     ... }  public final class SingleProducerSequencer extends SingleProducerSequencerFields {     protected long p1, p2, p3, p4, p5, p6, p7;          //Construct a Sequencer with the selected wait strategy and buffer size.     //@param bufferSize   the size of the buffer that this will sequence over.     //@param waitStrategy for those waiting on sequences.     public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy) {         super(bufferSize, waitStrategy);     }     ...          @Override     public long tryNext() throws InsufficientCapacityException {         return tryNext(1);     }      @Override     public long tryNext(int n) throws InsufficientCapacityException {         if (n < 1) {             throw new IllegalArgumentException("n must be > 0");         }         if (!hasAvailableCapacity(n, true)) {             throw InsufficientCapacityException.INSTANCE;         }         long nextSequence = this.nextValue += n;         return nextSequence;     }          private boolean hasAvailableCapacity(int requiredCapacity, boolean doStore) {         long nextValue = this.nextValue;         long wrapPoint = (nextValue + requiredCapacity) - bufferSize;         long cachedGatingSequence = this.cachedValue;            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {             if (doStore) {                 cursor.setVolatile(nextValue);//StoreLoad fence             }             long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);             this.cachedValue = minSequence;             if (wrapPoint > minSequence) {                 return false;             }         }         return true;     }          @Override     public void publish(long sequence) {         //设置当前生产者的sequence         cursor.set(sequence);         //通过等待策略通知阻塞的消费者         waitStrategy.signalAllWhenBlocking();     }     ... }  abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad {     SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy) {         super(bufferSize, waitStrategy);     }          //表示生产者的当前序号,值为-1     protected long nextValue = Sequence.INITIAL_VALUE;     //表示消费者的最小序号,值为-1     protected long cachedValue = Sequence.INITIAL_VALUE; }  abstract class SingleProducerSequencerPad extends AbstractSequencer {     protected long p1, p2, p3, p4, p5, p6, p7;          SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy) {         super(bufferSize, waitStrategy);     } }  public final class MultiProducerSequencer extends AbstractSequencer {     ...     @Override     public long tryNext() throws InsufficientCapacityException {         return tryNext(1);     }      @Override     public long tryNext(int n) throws InsufficientCapacityException {         if (n < 1) {             throw new IllegalArgumentException("n must be > 0");         }         long current;         long next;         do {             current = cursor.get();             next = current + n;             if (!hasAvailableCapacity(gatingSequences, n, current)) {                 throw InsufficientCapacityException.INSTANCE;             }         } while (!cursor.compareAndSet(current, next));         return next;     }          private boolean hasAvailableCapacity(Sequence[] gatingSequences, final int requiredCapacity, long cursorValue) {         long wrapPoint = (cursorValue + requiredCapacity) - bufferSize;         long cachedGatingSequence = gatingSequenceCache.get();            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue) {             long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue);             gatingSequenceCache.set(minSequence);             if (wrapPoint > minSequence) {                 return false;             }         }         return true;     }          @Override     public void publish(final long sequence) {         setAvailable(sequence);         waitStrategy.signalAllWhenBlocking();     }     ... }  //Implementations translate (write) data representations into events claimed from the RingBuffer. //When publishing to the RingBuffer, provide an EventTranslator.  //The RingBuffer will select the next available event by sequence and provide it to the EventTranslator (which should update the event),  //before publishing the sequence update. //@param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event. public interface EventTranslator<T> {     //Translate a data representation into fields set in given event     //@param event    into which the data should be translated.     //@param sequence that is assigned to event.     void translateTo(T event, long sequence); }

 

2.Disruptor的消费者源码分析

Disruptor的消费者主要由BatchEventProcessor类和WorkProcessor类来实现,并通过Disruptor的handleEventsWith()方法或者handleEventsWithWorkerPool()方法和start()方法来启动。

 

执行Disruptor的handleEventsWith()方法绑定消费者时,会创建BatchEventProcessor对象,并将其添加到Disruptor的consumerRepository属性。

 

执行Disruptor的handleEventsWithWorkerPool()方法绑定消费者时,则会创建WorkProcessor对象,并将该对象添加到Disruptor的consumerRepository属性。

 

执行Disruptor的start()方法启动Disruptor实例时,便会通过线程池执行BatchEventProcessor里的run()方法,或者通过线程池执行WorkProcessor里的run()方法。

 

执行BatchEventProcessor的run()方法时,会通过修改BatchEventProcessor的sequence来实现消费RingBuffer的数据。

 

执行WorkProcessor的run()方法时,会通过修改WorkProcessor的sequence来实现消费RingBuffer的数据。

public class Main {     public static void main(String[] args) {         //参数准备         OrderEventFactory orderEventFactory = new OrderEventFactory();         int ringBufferSize = 4;         ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());            //参数一:eventFactory,消息(Event)工厂对象         //参数二:ringBufferSize,容器的长度         //参数三:executor,线程池(建议使用自定义线程池),RejectedExecutionHandler         //参数四:ProducerType,单生产者还是多生产者         //参数五:waitStrategy,等待策略         //1.实例化Disruptor对象         Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(             orderEventFactory,             ringBufferSize,             executor,             ProducerType.SINGLE,             new BlockingWaitStrategy()         );            //2.添加Event处理器,用于处理事件         //也就是构建Disruptor与消费者的一个关联关系         //方式一:使用handleEventsWith()方法         disruptor.handleEventsWith(new OrderEventHandler());         //方式二:使用handleEventsWithWorkerPool()方法         //disruptor.handleEventsWithWorkerPool(workHandlers);            //3.启动disruptor         disruptor.start();            //4.获取实际存储数据的容器: RingBuffer         RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();         OrderEventProducer producer = new OrderEventProducer(ringBuffer);         ByteBuffer bb = ByteBuffer.allocate(8);         for (long i = 0; i < 5; i++) {             bb.putLong(0, i);             //向容器中投递数据             producer.sendData(bb);         }         disruptor.shutdown();         executor.shutdown();     } }
public class Disruptor<T> {     private final RingBuffer<T> ringBuffer;     private final Executor executor;     private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();     private final AtomicBoolean started = new AtomicBoolean(false);     private ExceptionHandler<? super T> exceptionHandler;     ...          //绑定消费者,设置EventHandler,创建EventProcessor     //Set up event handlers to handle events from the ring buffer.      //These handlers will process events as soon as they become available, in parallel.     //This method can be used as the start of a chain.      //For example if the handler A must process events before handler B: dw.handleEventsWith(A).then(B);      //@param handlers the event handlers that will process events.     //@return a EventHandlerGroup that can be used to chain dependencies.     @SuppressWarnings("varargs")     public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {         return createEventProcessors(new Sequence[0], handlers);     }          //创建BatchEventProcessor,添加到consumerRepository中     EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers) {         checkNotStarted();         final Sequence[] processorSequences = new Sequence[eventHandlers.length];         final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);         for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {             final EventHandler<? super T> eventHandler = eventHandlers[i];             //创建BatchEventProcessor对象             final BatchEventProcessor<T> batchEventProcessor =                  new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);             if (exceptionHandler != null) {                 batchEventProcessor.setExceptionHandler(exceptionHandler);             }             //添加BatchEventProcessor对象到consumerRepository中             consumerRepository.add(batchEventProcessor, eventHandler, barrier);             //一个消费者线程对应一个batchEventProcessor             //每个batchEventProcessor都会持有一个Sequence对象来表示当前消费者线程的消费进度             processorSequences[i] = batchEventProcessor.getSequence();         }         //将每个消费者线程持有的Sequence对象添加到生产者Sequencer的gatingSequences属性中(Sequence[]属性)         updateGatingSequencesForNextInChain(barrierSequences, processorSequences);         return new EventHandlerGroup<>(this, consumerRepository, processorSequences);     }      private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) {         if (processorSequences.length > 0) {             ringBuffer.addGatingSequences(processorSequences);             for (final Sequence barrierSequence : barrierSequences) {                 ringBuffer.removeGatingSequence(barrierSequence);             }             consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);         }     }          private void checkNotStarted() {         //线程的开关会使用CAS实现         if (started.get()) {             throw new IllegalStateException("All event handlers must be added before calling starts.");         }     }     ...          //Starts the event processors and returns the fully configured ring buffer.     //The ring buffer is set up to prevent overwriting any entry that is yet to be processed by the slowest event processor.     //This method must only be called once after all event processors have been added.     //@return the configured ring buffer.     public RingBuffer<T> start() {         checkOnlyStartedOnce();         for (final ConsumerInfo consumerInfo : consumerRepository) {              //在执行Disruptor.handleEventsWith()方法,调用Disruptor.createEventProcessors()方法时,              //会将新创建的BatchEventProcessor对象封装成EventProcessorInfo对象(即ConsumerInfo对象),              //然后通过add()方法添加到consumerRepository中              //所以下面会调用EventProcessorInfo.start()方法              consumerInfo.start(executor);         }         return ringBuffer;     }          private void checkOnlyStartedOnce() {         //线程的开关使用CAS实现         if (!started.compareAndSet(false, true)) {             throw new IllegalStateException("Disruptor.start() must only be called once.");         }     }     ... }  //Provides a repository mechanism to associate EventHandlers with EventProcessors class ConsumerRepository<T> implements Iterable<ConsumerInfo> {     private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler = new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>();     private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence = new IdentityHashMap<Sequence, ConsumerInfo>();     private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>();          //添加BatchEventProcessor对象到consumerRepository中     public void add(final EventProcessor eventprocessor, final EventHandler<? super T> handler, final SequenceBarrier barrier) {         //将传入的BatchEventProcessor对象封装成EventProcessorInfo对象,即ConsumerInfo对象         final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(eventprocessor, handler, barrier);         eventProcessorInfoByEventHandler.put(handler, consumerInfo);         eventProcessorInfoBySequence.put(eventprocessor.getSequence(), consumerInfo);         consumerInfos.add(consumerInfo);     }     ... }  class EventProcessorInfo<T> implements ConsumerInfo {     private final EventProcessor eventprocessor;     private final EventHandler<? super T> handler;     private final SequenceBarrier barrier;     private boolean endOfChain = true;      EventProcessorInfo(final EventProcessor eventprocessor, final EventHandler<? super T> handler, final SequenceBarrier barrier) {         this.eventprocessor = eventprocessor;         this.handler = handler;         this.barrier = barrier;     }     ...          @Override     public void start(final Executor executor) {         //通过传入的线程池,执行BatchEventProcessor对象的run()方法         //传入的线程池,其实就是初始化Disruptor时指定的线程池         executor.execute(eventprocessor);     }     ... }  //Convenience class for handling the batching semantics of consuming entries from  //a RingBuffer and delegating the available events to an EventHandler. //If the EventHandler also implements LifecycleAware it will be notified just after  //the thread is started and just before the thread is shutdown. //@param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event. public final class BatchEventProcessor<T> implements EventProcessor {     private final AtomicBoolean running = new AtomicBoolean(false);     private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();     private final DataProvider<T> dataProvider;     private final SequenceBarrier sequenceBarrier;     private final EventHandler<? super T> eventHandler;     private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);     private final TimeoutHandler timeoutHandler;      //Construct a EventProcessor that will automatically track the progress by      //updating its sequence when the EventHandler#onEvent(Object, long, boolean) method returns.     //@param dataProvider to which events are published.     //@param sequenceBarrier on which it is waiting.     //@param eventHandler is the delegate to which events are dispatched.     public BatchEventProcessor(final DataProvider<T> dataProvider, final SequenceBarrier sequenceBarrier, final EventHandler<? super T> eventHandler) {         //传入的dataProvider其实就是Disruptor的ringBuffer         this.dataProvider = dataProvider;         this.sequenceBarrier = sequenceBarrier;         this.eventHandler = eventHandler;         if (eventHandler instanceof SequenceReportingEventHandler) {             ((SequenceReportingEventHandler<?>)eventHandler).setSequenceCallback(sequence);         }         timeoutHandler = (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;     }     ...          //It is ok to have another thread rerun this method after a halt().     //通过对sequence进行修改来实现消费RingBuffer里的数据     @Override     public void run() {         if (running.compareAndSet(IDLE, RUNNING)) {             sequenceBarrier.clearAlert();             notifyStart();             try {                 if (running.get() == RUNNING) {                     processEvents();                 }             } finally {                 notifyShutdown();                 running.set(IDLE);             }         } else {             //This is a little bit of guess work.               //The running state could of changed to HALTED by this point.               //However, Java does not have compareAndExchange which is the only way to get it exactly correct.             if (running.get() == RUNNING) {                 throw new IllegalStateException("Thread is already running");             } else {                 earlyExit();             }         }     }          private void processEvents() {         T event = null;         long nextSequence = sequence.get() + 1L;          while (true) {             try {                 //通过sequenceBarrier.waitFor()方法看看消费者是否需要等待生产者投递消息                 final long availableSequence = sequenceBarrier.waitFor(nextSequence);                 if (batchStartAware != null) {                     batchStartAware.onBatchStart(availableSequence - nextSequence + 1);                 }                 while (nextSequence <= availableSequence) {                     //从RingBuffer中获取要消费的数据                     event = dataProvider.get(nextSequence);                     //执行消费者实现的onEvent()方法来消费数据                     eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);                     nextSequence++;                 }                 //设置消费者当前的消费进度                 sequence.set(availableSequence);             } catch (final TimeoutException e) {                 notifyTimeout(sequence.get());             } catch (final AlertException ex) {                 if (running.get() != RUNNING) {                     break;                 }             } catch (final Throwable ex) {                 handleEventException(ex, nextSequence, event);                 sequence.set(nextSequence);                 nextSequence++;             }         }     }      private void earlyExit() {         notifyStart();         notifyShutdown();     }      private void notifyTimeout(final long availableSequence) {         try {             if (timeoutHandler != null) {                 timeoutHandler.onTimeout(availableSequence);             }         } catch (Throwable e) {             handleEventException(e, availableSequence, null);         }     }      //Notifies the EventHandler when this processor is starting up     private void notifyStart() {         if (eventHandler instanceof LifecycleAware) {             try {                 ((LifecycleAware) eventHandler).onStart();             } catch (final Throwable ex) {                 handleOnStartException(ex);             }         }     }      //Notifies the EventHandler immediately prior to this processor shutting down     private void notifyShutdown() {         if (eventHandler instanceof LifecycleAware) {             try {                 ((LifecycleAware) eventHandler).onShutdown();             } catch (final Throwable ex) {                 handleOnShutdownException(ex);             }         }     }     ... }
public class Disruptor<T> {     private final RingBuffer<T> ringBuffer;     private final Executor executor;     private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();     private final AtomicBoolean started = new AtomicBoolean(false);     private ExceptionHandler<? super T> exceptionHandler;     ...          //设置WorkHandler,创建WorkProcessor     //Set up a WorkerPool to distribute an event to one of a pool of work handler threads.     //Each event will only be processed by one of the work handlers.     //The Disruptor will automatically start this processors when #start() is called.     //@param workHandlers the work handlers that will process events.     //@return a {@link EventHandlerGroup} that can be used to chain dependencies.     @SafeVarargs     @SuppressWarnings("varargs")     public final EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers) {         return createWorkerPool(new Sequence[0], workHandlers);     }          //创建WorkerPool,添加到consumerRepository中     EventHandlerGroup<T> createWorkerPool(final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers) {         final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);         //创建WorkerPool对象,以及根据workHandlers创建WorkProcessor         final WorkerPool<T> workerPool = new WorkerPool<>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);         //添加WorkerPool对象到consumerRepository中           consumerRepository.add(workerPool, sequenceBarrier);         final Sequence[] workerSequences = workerPool.getWorkerSequences();         //将每个消费者线程持有的Sequence对象添加到生产者Sequencer的gatingSequences属性中(Sequence[]属性)         updateGatingSequencesForNextInChain(barrierSequences, workerSequences);         return new EventHandlerGroup<>(this, consumerRepository, workerSequences);     }          private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) {         if (processorSequences.length > 0) {             ringBuffer.addGatingSequences(processorSequences);             for (final Sequence barrierSequence : barrierSequences) {                 ringBuffer.removeGatingSequence(barrierSequence);             }             consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);         }     }     ...          //Starts the event processors and returns the fully configured ring buffer.     //The ring buffer is set up to prevent overwriting any entry that is yet to be processed by the slowest event processor.     //This method must only be called once after all event processors have been added.     //@return the configured ring buffer.     public RingBuffer<T> start() {         checkOnlyStartedOnce();         for (final ConsumerInfo consumerInfo : consumerRepository) {             //在执行Disruptor.handleEventsWithWorkerPool()方法,调用Disruptor.createWorkerPool()方法时,             //会将新创建的WorkerPool对象封装成WorkerPoolInfo对象(即ConsumerInfo对象),             //然后通过add()方法添加到consumerRepository中             //所以下面会调用WorkerPoolInfo.start()方法             consumerInfo.start(executor);         }         return ringBuffer;     }          private void checkOnlyStartedOnce() {         //线程的开关使用CAS实现         if (!started.compareAndSet(false, true)) {             throw new IllegalStateException("Disruptor.start() must only be called once.");         }     }     ... }  //Provides a repository mechanism to associate EventHandlers with EventProcessors class ConsumerRepository<T> implements Iterable<ConsumerInfo> {     private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler = new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>();     private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence = new IdentityHashMap<Sequence, ConsumerInfo>();     private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>();          //添加WorkerPool对象到consumerRepository中     public void add(final WorkerPool<T> workerPool, final SequenceBarrier sequenceBarrier) {         final WorkerPoolInfo<T> workerPoolInfo = new WorkerPoolInfo<>(workerPool, sequenceBarrier);         consumerInfos.add(workerPoolInfo);         for (Sequence sequence : workerPool.getWorkerSequences()) {             eventProcessorInfoBySequence.put(sequence, workerPoolInfo);         }     }     ... }  class WorkerPoolInfo<T> implements ConsumerInfo {     private final WorkerPool<T> workerPool;     private final SequenceBarrier sequenceBarrier;     private boolean endOfChain = true;         WorkerPoolInfo(final WorkerPool<T> workerPool, final SequenceBarrier sequenceBarrier) {         this.workerPool = workerPool;         this.sequenceBarrier = sequenceBarrier;     }          @Override     public void start(Executor executor) {         workerPool.start(executor);     }     ... }  public final class WorkerPool<T> {     private final AtomicBoolean started = new AtomicBoolean(false);     private final Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);     private final RingBuffer<T> ringBuffer;     //WorkProcessors are created to wrap each of the provided WorkHandlers     private final WorkProcessor<?>[] workProcessors;          //Create a worker pool to enable an array of WorkHandlers to consume published sequences.     //This option requires a pre-configured RingBuffer which must have RingBuffer#addGatingSequences(Sequence...) called before the work pool is started.     //@param ringBuffer       of events to be consumed.     //@param sequenceBarrier  on which the workers will depend.     //@param exceptionHandler to callback when an error occurs which is not handled by the {@link WorkHandler}s.     //@param workHandlers     to distribute the work load across.     @SafeVarargs     public WorkerPool(final RingBuffer<T> ringBuffer, final SequenceBarrier sequenceBarrier, final ExceptionHandler<? super T> exceptionHandler, final WorkHandler<? super T>... workHandlers) {         this.ringBuffer = ringBuffer;         final int numWorkers = workHandlers.length;         //根据workHandlers创建WorkProcessor         workProcessors = new WorkProcessor[numWorkers];         for (int i = 0; i < numWorkers; i++) {             workProcessors[i] = new WorkProcessor<>(ringBuffer, sequenceBarrier, workHandlers[i], exceptionHandler, workSequence);         }     }          //Start the worker pool processing events in sequence.     //@param executor providing threads for running the workers.     //@return the {@link RingBuffer} used for the work queue.     //@throws IllegalStateException if the pool has already been started and not halted yet     public RingBuffer<T> start(final Executor executor) {         if (!started.compareAndSet(false, true)) {             throw new IllegalStateException("WorkerPool has already been started and cannot be restarted until halted.");         }            final long cursor = ringBuffer.getCursor();         workSequence.set(cursor);            for (WorkProcessor<?> processor : workProcessors) {             processor.getSequence().set(cursor);             //通过传入的线程池,执行WorkProcessor对象的run()方法             executor.execute(processor);         }         return ringBuffer;     }     ... }  public final class WorkProcessor<T> implements EventProcessor {     private final AtomicBoolean running = new AtomicBoolean(false);     private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);     private final RingBuffer<T> ringBuffer;     private final SequenceBarrier sequenceBarrier;     private final WorkHandler<? super T> workHandler;     private final ExceptionHandler<? super T> exceptionHandler;     private final Sequence workSequence;     private final EventReleaser eventReleaser = new EventReleaser() {         @Override         public void release() {             sequence.set(Long.MAX_VALUE);         }     };     private final TimeoutHandler timeoutHandler;      //Construct a {@link WorkProcessor}.     //@param ringBuffer       to which events are published.     //@param sequenceBarrier  on which it is waiting.     //@param workHandler      is the delegate to which events are dispatched.     //@param exceptionHandler to be called back when an error occurs     //@param workSequence     from which to claim the next event to be worked on.  It should always be initialised as Sequencer#INITIAL_CURSOR_VALUE     public WorkProcessor(final RingBuffer<T> ringBuffer, final SequenceBarrier sequenceBarrier, final WorkHandler<? super T> workHandler, final ExceptionHandler<? super T> exceptionHandler, final Sequence workSequence) {         this.ringBuffer = ringBuffer;         this.sequenceBarrier = sequenceBarrier;         this.workHandler = workHandler;         this.exceptionHandler = exceptionHandler;         this.workSequence = workSequence;         if (this.workHandler instanceof EventReleaseAware) {             ((EventReleaseAware) this.workHandler).setEventReleaser(eventReleaser);         }         timeoutHandler = (workHandler instanceof TimeoutHandler) ? (TimeoutHandler) workHandler : null;     }          //通过对sequence进行修改来实现消费RingBuffer里的数据     @Override     public void run() {         if (!running.compareAndSet(false, true)) {             throw new IllegalStateException("Thread is already running");         }         sequenceBarrier.clearAlert();         notifyStart();            boolean processedSequence = true;         long cachedAvailableSequence = Long.MIN_VALUE;         long nextSequence = sequence.get();         T event = null;         while (true) {             try {                 if (processedSequence) {                     processedSequence = false;                     do {                         nextSequence = workSequence.get() + 1L;                         //设置消费者当前的消费进度                         sequence.set(nextSequence - 1L);                     } while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));                 }                    if (cachedAvailableSequence >= nextSequence) {                     //从RingBuffer中获取要消费的数据                     event = ringBuffer.get(nextSequence);                     //执行消费者实现的onEvent()方法来消费数据                     workHandler.onEvent(event);                     processedSequence = true;                 } else {                     //通过sequenceBarrier.waitFor()方法看看消费者是否需要等待生产者投递消息                     cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);                 }             } catch (final TimeoutException e) {                 notifyTimeout(sequence.get());             } catch (final AlertException ex) {                 if (!running.get()) {                     break;                 }             } catch (final Throwable ex) {                 //handle, mark as processed, unless the exception handler threw an exception                 exceptionHandler.handleEventException(ex, nextSequence, event);                 processedSequence = true;             }         }         notifyShutdown();         running.set(false);     }     ... }
public class Disruptor<T> {     private final RingBuffer<T> ringBuffer;          private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) {         if (processorSequences.length > 0) {             ringBuffer.addGatingSequences(processorSequences);             for (final Sequence barrierSequence : barrierSequences) {                 ringBuffer.removeGatingSequence(barrierSequence);             }             consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);         }     }     ... }  abstract class RingBufferPad {     protected long p1, p2, p3, p4, p5, p6, p7; }  abstract class RingBufferFields<E> extends RingBufferPad {     ...     private static final Unsafe UNSAFE = Util.getUnsafe();     private final long indexMask;     //环形数组存储事件消息     private final Object[] entries;     protected final int bufferSize;     //RingBuffer的sequencer属性代表了当前线程对应的生产者     protected final Sequencer sequencer;          RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {         this.sequencer = sequencer;         this.bufferSize = sequencer.getBufferSize();         if (bufferSize < 1) {             throw new IllegalArgumentException("bufferSize must not be less than 1");         }         if (Integer.bitCount(bufferSize) != 1) {             throw new IllegalArgumentException("bufferSize must be a power of 2");         }         this.indexMask = bufferSize - 1;         //初始化数组         this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];         //内存预加载         fill(eventFactory);     }          private void fill(EventFactory<E> eventFactory) {         for (int i = 0; i < bufferSize; i++) {             entries[BUFFER_PAD + i] = eventFactory.newInstance();         }     }          protected final E elementAt(long sequence) {         return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));     }     ... }  public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {     ...     //Add the specified gating sequences to this instance of the Disruptor.       //They will safely and atomically added to the list of gating sequences.     //@param gatingSequences The sequences to add.     public void addGatingSequences(Sequence... gatingSequences) {         sequencer.addGatingSequences(gatingSequences);     }     ... }  public interface Sequencer extends Cursored, Sequenced {     ...     //Add the specified gating sequences to this instance of the Disruptor.       //They will safely and atomically added to the list of gating sequences.     //@param gatingSequences The sequences to add.     void addGatingSequences(Sequence... gatingSequences);     ... }  public abstract class AbstractSequencer implements Sequencer {     private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =         AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");     ...     @Override     public final void addGatingSequences(Sequence... gatingSequences) {         SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);     }     ... }  class SequenceGroups {     static <T> void addSequences(final T holder, final AtomicReferenceFieldUpdater<T, Sequence[]> updater, final Cursored cursor, final Sequence... sequencesToAdd) {         long cursorSequence;         Sequence[] updatedSequences;         Sequence[] currentSequences;          do {             currentSequences = updater.get(holder);             updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length);             cursorSequence = cursor.getCursor();              int index = currentSequences.length;             for (Sequence sequence : sequencesToAdd) {                 sequence.set(cursorSequence);                 updatedSequences[index++] = sequence;             }         } while (!updater.compareAndSet(holder, currentSequences, updatedSequences));          cursorSequence = cursor.getCursor();         for (Sequence sequence : sequencesToAdd) {             sequence.set(cursorSequence);         }     }     ... }

 

3.Disruptor的WaitStrategy等待策略分析

在生产者发布消息时,会调用WaitStrategy的signalAllWhenBlocking()方法唤醒阻塞的消费者。在消费者消费消息时,会调用WaitStrategy的waitFor()方法阻塞消费过快的消费者。

 

当然,不同的策略不一定就是阻塞消费者,比如BlockingWaitStrategy会通过ReentrantLock来阻塞消费者,而YieldingWaitStrategy则通过yield切换线程来实现让消费者无锁等待,即通过Thread的yield()方法切换线程让另一个线程继续执行自旋判断操作。

 

所以YieldingWaitStrategy等待策略的效率是最高的 + 最耗费CPU资源,当然效率次高、比较耗费CPU资源的是BusySpinWaitStrategy等待策略。

 

Disruptor提供了如下几种等待策略:

一.完全阻塞的等待策略BlockingWaitStrategy  二.切换线程自旋的等待策略YieldingWaitStrategy  三.繁忙自旋的等待策略BusySpinWaitStrategy  四.轻微阻塞的等待策略LiteBlockingWaitStrategy 也就是唤醒阻塞线程时,通过GAS避免并发获取锁的等待策略  五.最小睡眠 + 切换线程的等待策略SleepingWaitStrategy

总结:

为了达到最高效率,有大量CPU资源,可切换线程让多个线程自旋判断 为了保证高效的同时兼顾CPU资源,可以让单个线程自旋判断 为了保证比较高效更加兼顾CPU资源,可以切换线程自旋 + 最少睡眠 为了完全兼顾CPU资源不考虑效率问题,可以采用重入锁实现阻塞唤醒 为了完全兼顾CPU资源但考虑一点效率,可以采用重入锁 + GAS唤醒
//完全阻塞的等待策略 //Blocking strategy that uses a lock and condition variable for EventProcessors waiting on a barrier. //This strategy can be used when throughput and low-latency are not as important as CPU resource. public final class BlockingWaitStrategy implements WaitStrategy {     private final Lock lock = new ReentrantLock();     private final Condition processorNotifyCondition = lock.newCondition();          @Override     public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException {         long availableSequence;         if ((availableSequence = cursorSequence.get()) < sequence) {             lock.lock();             try {                 while ((availableSequence = cursorSequence.get()) < sequence) {                     barrier.checkAlert();                     processorNotifyCondition.await();                 }             } finally {                 lock.unlock();             }         }         while ((availableSequence = dependentSequence.get()) < sequence) {             barrier.checkAlert();         }         return availableSequence;     }      @Override     public void signalAllWhenBlocking() {         lock.lock();         try {             processorNotifyCondition.signalAll();         } finally {             lock.unlock();         }     } }  //切换线程自旋的等待策略 //Yielding strategy that uses a Thread.yield() for EventProcessors waiting on a barrier after an initially spinning. //This strategy is a good compromise between performance and CPU resource without incurring significant latency spikes. public final class YieldingWaitStrategy implements WaitStrategy {     private static final int SPIN_TRIES = 100;      @Override     public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException {         long availableSequence;         int counter = SPIN_TRIES;         while ((availableSequence = dependentSequence.get()) < sequence) {             counter = applyWaitMethod(barrier, counter);         }         return availableSequence;     }      @Override     public void signalAllWhenBlocking() {          }      private int applyWaitMethod(final SequenceBarrier barrier, int counter) throws AlertException {         barrier.checkAlert();         if (0 == counter) {             //切换线程,让另一个线程继续执行自旋操作             Thread.yield();         } else {             --counter;         }         return counter;     } }  //繁忙自旋的等待策略 //Busy Spin strategy that uses a busy spin loop for EventProcessors waiting on a barrier. //This strategy will use CPU resource to avoid syscalls which can introduce latency jitter. //It is best used when threads can be bound to specific CPU cores. public final class BusySpinWaitStrategy implements WaitStrategy {     @Override     public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException {         long availableSequence;         while ((availableSequence = dependentSequence.get()) < sequence) {             barrier.checkAlert();         }         return availableSequence;     }      @Override     public void signalAllWhenBlocking() {          } }  //轻微阻塞的等待策略(唤醒阻塞线程时避免了并发获取锁) //Variation of the BlockingWaitStrategy that attempts to elide conditional wake-ups when the lock is uncontended. //Shows performance improvements on microbenchmarks. //However this wait strategy should be considered experimental as I have not full proved the correctness of the lock elision code. public final class LiteBlockingWaitStrategy implements WaitStrategy {     private final Lock lock = new ReentrantLock();     private final Condition processorNotifyCondition = lock.newCondition();     private final AtomicBoolean signalNeeded = new AtomicBoolean(false);      @Override     public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException {         long availableSequence;         if ((availableSequence = cursorSequence.get()) < sequence) {             lock.lock();             try {                 do {                     signalNeeded.getAndSet(true);                     if ((availableSequence = cursorSequence.get()) >= sequence) {                         break;                     }                     barrier.checkAlert();                     processorNotifyCondition.await();                 } while ((availableSequence = cursorSequence.get()) < sequence);             } finally {                 lock.unlock();             }         }         while ((availableSequence = dependentSequence.get()) < sequence) {             barrier.checkAlert();         }         return availableSequence;     }      @Override     public void signalAllWhenBlocking() {         if (signalNeeded.getAndSet(false)) {             lock.lock();             try {                 processorNotifyCondition.signalAll();             } finally {                 lock.unlock();             }         }     } }  //最小睡眠 + 切换线程的等待策略SleepingWaitStrategy //Sleeping strategy that initially spins, then uses a Thread.yield(),  //and eventually sleep LockSupport.parkNanos(1) for the minimum number of nanos the OS  //and JVM will allow while the EventProcessors are waiting on a barrier. //This strategy is a good compromise between performance and CPU resource. //Latency spikes can occur after quiet periods. public final class SleepingWaitStrategy implements WaitStrategy {     private static final int DEFAULT_RETRIES = 200;     private final int retries;      public SleepingWaitStrategy() {         this(DEFAULT_RETRIES);     }          public SleepingWaitStrategy(int retries) {         this.retries = retries;     }      @Override     public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException {         long availableSequence;         int counter = retries;                  while ((availableSequence = dependentSequence.get()) < sequence) {             counter = applyWaitMethod(barrier, counter);         }         return availableSequence;     }      @Override     public void signalAllWhenBlocking() {          }      private int applyWaitMethod(final SequenceBarrier barrier, int counter) throws AlertException {         barrier.checkAlert();         if (counter > 100) {             --counter;         } else if (counter > 0) {             --counter;             Thread.yield();         } else {             LockSupport.parkNanos(1L);         }         return counter;     } }

 

4.Disruptor的高性能原因

一.使用了环形结构 + 数组 + 内存预加载

二.使用了单线程写的方式并配合内存屏障

三.消除伪共享(填充缓存行)

四.序号栅栏和序号配合使用来消除锁

五.提供了多种不同性能的等待策略

 

5.Disruptor高性能之数据结构(内存预加载机制)

(1)RingBuffer使用环形数组来存储元素

(2)采用了内存预加载机制

 

(1)RingBuffer使用环形数组来存储元素

环形数组可以避免数组扩容和缩容带来的性能损耗。

 

(2)RingBuffer采用了内存预加载机制

初始化RingBuffer时,会将entries数组里的每一个元素都先new出来。比如RingBuffer的大小设置为8,那么初始化RingBuffer时,就会先将entries数组的8个元素分别指向新new出来的空的Event对象。往RingBuffer填充元素时,只是将对应的Event对象进行赋值。所以RingBuffer中的Event对象是一直存活着的,也就是说它能最小程度减少系统GC频率,从而提升性能。

public class Main {     public static void main(String[] args) {         //参数准备         OrderEventFactory orderEventFactory = new OrderEventFactory();         int ringBufferSize = 4;         ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());              //参数一:eventFactory,消息(Event)工厂对象         //参数二:ringBufferSize,容器的长度         //参数三:executor,线程池(建议使用自定义线程池),RejectedExecutionHandler         //参数四:ProducerType,单生产者还是多生产者         //参数五:waitStrategy,等待策略         //1.实例化Disruptor对象         Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(             orderEventFactory,             ringBufferSize,             executor,             ProducerType.SINGLE,             new BlockingWaitStrategy()         );              //2.添加Event处理器,用于处理事件         //也就是构建Disruptor与消费者的一个关联关系         disruptor.handleEventsWith(new OrderEventHandler());              //3.启动disruptor         disruptor.start();              //4.获取实际存储数据的容器: RingBuffer         RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();         OrderEventProducer producer = new OrderEventProducer(ringBuffer);         ByteBuffer bb = ByteBuffer.allocate(8);         for (long i = 0; i < 5; i++) {             bb.putLong(0, i);             //向容器中投递数据             producer.sendData(bb);         }         disruptor.shutdown();         executor.shutdown();     } }  public class Disruptor<T> {     private final RingBuffer<T> ringBuffer;     private final Executor executor;     ...          //Create a new Disruptor.     //@param eventFactory   the factory to create events in the ring buffer.     //@param ringBufferSize the size of the ring buffer, must be power of 2.     //@param executor       an Executor to execute event processors.     //@param producerType   the claim strategy to use for the ring buffer.     //@param waitStrategy   the wait strategy to use for the ring buffer.     public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor, final ProducerType producerType, final WaitStrategy waitStrategy) {         this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor);     }          //Private constructor helper     private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor) {         this.ringBuffer = ringBuffer;         this.executor = executor;     }     ... }  //Ring based store of reusable entries containing the data representing an event being exchanged between event producer and EventProcessors. //@param <E> implementation storing the data for sharing during exchange or parallel coordination of an event. public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {     //值为-1     public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;     protected long p1, p2, p3, p4, p5, p6, p7;     ...          //Create a new Ring Buffer with the specified producer type (SINGLE or MULTI)     public static <E> RingBuffer<E> create(ProducerType producerType, EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {         switch (producerType) {             case SINGLE:                 return createSingleProducer(factory, bufferSize, waitStrategy);             case MULTI:                 return createMultiProducer(factory, bufferSize, waitStrategy);             default:                 throw new IllegalStateException(producerType.toString());         }     }          //Create a new single producer RingBuffer with the specified wait strategy.     public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {         SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);         return new RingBuffer<E>(factory, sequencer);     }          //Construct a RingBuffer with the full option set.     //@param eventFactory to newInstance entries for filling the RingBuffer     //@param sequencer sequencer to handle the ordering of events moving through the RingBuffer.     RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) {         super(eventFactory, sequencer);     }     ... }  abstract class RingBufferFields<E> extends RingBufferPad {     private final long indexMask;     //环形数组存储事件消息     private final Object[] entries;     protected final int bufferSize;     //RingBuffer的sequencer属性代表了当前线程对应的生产者     protected final Sequencer sequencer;     ...          RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {         this.sequencer = sequencer;         this.bufferSize = sequencer.getBufferSize();         if (bufferSize < 1) {             throw new IllegalArgumentException("bufferSize must not be less than 1");         }         if (Integer.bitCount(bufferSize) != 1) {             throw new IllegalArgumentException("bufferSize must be a power of 2");         }         this.indexMask = bufferSize - 1;         //初始化数组         this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];         //内存预加载         fill(eventFactory);     }          private void fill(EventFactory<E> eventFactory) {         for (int i = 0; i < bufferSize; i++) {             //设置一个空的数据对象             entries[BUFFER_PAD + i] = eventFactory.newInstance();         }     }     ... }  abstract class RingBufferPad {     protected long p1, p2, p3, p4, p5, p6, p7; }

 

6.Disruptor高性能之内核(使用单线程写)

Disruptor的RingBuffer之所以可以做到完全无锁是因为单线程写。离开单线程写,没有任何技术可以做到完全无锁。Redis和Netty等高性能技术框架也是利用单线程写来实现的。

 

具体就是:单生产者时,固然只有一个生产者线程在写。多生产者时,每个生产者线程都只会写各自获取到的Sequence序号对应的环形数组的元素,从而使得多个生产者线程相互之间不会产生写冲突。

 

7.Disruptor高性能之系统内存优化(内存屏障)

要正确实现无锁,还需要另外一个关键技术——内存屏障。对应到Java语言,就是valotile变量与Happens Before语义。

 

内存屏障:Linux的smp_wmb()/smp_rmb()。

 

8.Disruptor高性能之系统缓存优化(消除伪共享)

CPU缓存是以缓存行(Cache Line)为单位进行存储的。缓存行是2的整数幂个连续字节,一般为32-256个字节,最常见的缓存行大小是64个字节。

 

当多线程修改互相独立的变量时,如果这些变量共享同一个缓存行,就会对这个缓存行形成竞争,从而无意中影响彼此性能,这就是伪共享。

 

消除伪共享:利用了空间换时间的思想。

 

由于代表着一个序号的Sequence其核心字段value是一个long型变量(占8个字节),所以有可能会出现多个Sequence对象的value变量共享同一个缓存行。因此,需要对Sequence对象的value变量消除伪共享。具体做法就是:对Sequence对象的value变量前后增加7个long型变量。

 

注意:伪共享与Sequence的静态变量无关,因为静态变量本身就是多个线程共享的,而不是多个线程隔离独立的。

class LhsPadding {     protected long p1, p2, p3, p4, p5, p6, p7; }  class Value extends LhsPadding {     protected volatile long value; }  class RhsPadding extends Value {     protected long p9, p10, p11, p12, p13, p14, p15; }  public class Sequence extends RhsPadding {     static final long INITIAL_VALUE = -1L;     private static final Unsafe UNSAFE;     private static final long VALUE_OFFSET;      static {         UNSAFE = Util.getUnsafe();         try {             VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));         } catch (final Exception e) {             throw new RuntimeException(e);         }     }      //Create a sequence initialised to -1.     public Sequence() {         this(INITIAL_VALUE);     }      //Create a sequence with a specified initial value.     public Sequence(final long initialValue) {         UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);     }      //Perform a volatile read of this sequence's value.     public long get() {         return value;     }      //Perform an ordered write of this sequence.       //The intent is a Store/Store barrier between this write and any previous store.     public void set(final long value) {         UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);     }     ... }

 

9.Disruptor高性能之序号获取优化(自旋 + CAS)

生产者投递Event时会使用"long sequence = ringBuffer.next()"获取序号,而序号栅栏SequenceBarrier和会序号Sequence搭配起来一起使用,用来协调和管理消费者和生产者的工作节奏,避免锁的使用。

 

各个消费者和生产者都持有自己的序号,这些序号需满足如下条件以避免生产者速度过快,将还没来得及消费的消息覆盖。

一.消费者序号数值必须小于生产者序号数值 二.消费者序号数值必须小于其前置消费者的序号数值 三.生产者序号数值不能大于消费者中最小的序号数值

高性能的序号获取优化:为避免生产者每次执行next()获取序号时,都要查询消费者的最小序号,Disruptor采取了自旋 + LockSupport挂起线程 + 缓存最小序号 + CAS来优化。既避免了锁,也尽量在不耗费CPU的情况下提升了性能。

 

单生产者的情况下,只有一个线程添加元素,此时没必要使用锁。多生产者的情况下,会有多个线程并发获取Sequence序号添加元素,此时会通过自旋 + CAS避免锁。

public class OrderEventProducer {     private RingBuffer<OrderEvent> ringBuffer;          public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {         this.ringBuffer = ringBuffer;     }          public void sendData(ByteBuffer data) {         //1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号         long sequence = ringBuffer.next();         try {             //2.根据这个序号, 找到具体的"OrderEvent"元素             //注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象"             OrderEvent event = ringBuffer.get(sequence);             //3.进行实际的赋值处理             event.setValue(data.getLong(0));         } finally {             //4.提交发布操作             ringBuffer.publish(sequence);         }     } }  //Ring based store of reusable entries containing the data representing an event being exchanged between event producer and EventProcessors. //@param <E> implementation storing the data for sharing during exchange or parallel coordination of an event. public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {     //值为-1     public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;     protected long p1, p2, p3, p4, p5, p6, p7;     ...          //Increment and return the next sequence for the ring buffer.     //Calls of this method should ensure that they always publish the sequence afterward.     //E.g.     //  long sequence = ringBuffer.next();     //  try {     //      Event e = ringBuffer.get(sequence);     //      ...     //  } finally {     //      ringBuffer.publish(sequence);     //  }     //@return The next sequence to publish to.     @Override     public long next() {         return sequencer.next();     }          //Publish the specified sequence.     //This action marks this particular message as being available to be read.     //@param sequence the sequence to publish.     @Override     public void publish(long sequence) {         sequencer.publish(sequence);     }          //Get the event for a given sequence in the RingBuffer.     //This call has 2 uses.       //Firstly use this call when publishing to a ring buffer.     //After calling RingBuffer#next() use this call to get hold of the preallocated event to fill with data before calling RingBuffer#publish(long).     //Secondly use this call when consuming data from the ring buffer.       //After calling SequenceBarrier#waitFor(long) call this method with any value greater than that      //your current consumer sequence and less than or equal to the value returned from the SequenceBarrier#waitFor(long) method.     //@param sequence for the event     //@return the event for the given sequence     @Override     public E get(long sequence) {         //调用父类RingBufferFields的elementAt()方法         return elementAt(sequence);     }     ... }  abstract class RingBufferPad {     protected long p1, p2, p3, p4, p5, p6, p7; }  abstract class RingBufferFields<E> extends RingBufferPad {     ...     private static final Unsafe UNSAFE = Util.getUnsafe();     private final long indexMask;     //环形数组存储事件消息     private final Object[] entries;     protected final int bufferSize;     //RingBuffer的sequencer属性代表了当前线程对应的生产者     protected final Sequencer sequencer;          RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {         this.sequencer = sequencer;         this.bufferSize = sequencer.getBufferSize();         if (bufferSize < 1) {             throw new IllegalArgumentException("bufferSize must not be less than 1");         }         if (Integer.bitCount(bufferSize) != 1) {             throw new IllegalArgumentException("bufferSize must be a power of 2");         }         this.indexMask = bufferSize - 1;         //初始化数组         this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];         //内存预加载         fill(eventFactory);     }          private void fill(EventFactory<E> eventFactory) {         for (int i = 0; i < bufferSize; i++) {             entries[BUFFER_PAD + i] = eventFactory.newInstance();         }     }          protected final E elementAt(long sequence) {         return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));     }     ... }
public abstract class AbstractSequencer implements Sequencer {     private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =         AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");     //环形数组的大小     protected final int bufferSize;     //等待策略     protected final WaitStrategy waitStrategy;     //当前生产者的进度     protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);     //每一个Sequence都对应着一个消费者(一个EventHandler或者一个WorkHandler)     //这些Sequence会通过SEQUENCE_UPDATER在执行Disruptor的handleEventsWith()等方法时,     //由RingBuffer的addGatingSequences()方法进行添加     protected volatile Sequence[] gatingSequences = new Sequence[0];     ...          //Create with the specified buffer size and wait strategy.     //@param bufferSize The total number of entries, must be a positive power of 2.     //@param waitStrategy     public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy) {         if (bufferSize < 1) {             throw new IllegalArgumentException("bufferSize must not be less than 1");         }         if (Integer.bitCount(bufferSize) != 1) {             throw new IllegalArgumentException("bufferSize must be a power of 2");         }         this.bufferSize = bufferSize;         this.waitStrategy = waitStrategy;     }     ... }  abstract class SingleProducerSequencerPad extends AbstractSequencer {     protected long p1, p2, p3, p4, p5, p6, p7;          public SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy) {         super(bufferSize, waitStrategy);     } }  abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad {     public SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy) {         super(bufferSize, waitStrategy);     }          //表示生产者的当前序号,值为-1     protected long nextValue = Sequence.INITIAL_VALUE;     //表示消费者的最小序号,值为-1     protected long cachedValue = Sequence.INITIAL_VALUE; }  public final class SingleProducerSequencer extends SingleProducerSequencerFields {     protected long p1, p2, p3, p4, p5, p6, p7;          //Construct a Sequencer with the selected wait strategy and buffer size.     //@param bufferSize   the size of the buffer that this will sequence over.     //@param waitStrategy for those waiting on sequences.     public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy) {         super(bufferSize, waitStrategy);     }     ...          @Override     public long next() {         return next(1);     }          @Override     public long next(int n) {         //Sequence的初始化值为-1         if (n < 1) {             throw new IllegalArgumentException("n must be > 0");         }         //nextValue指的是当前Sequence         //this.nextValue为SingleProducerSequencerFields的变量         //第一次调用next()方法时,nextValue = -1         //第二次调用next()方法时,nextValue = 0         //第三次调用next()方法时,nextValue = 1         //第四次调用next()方法时,nextValue = 2         //第五次调用next()方法时,nextValue = 3         long nextValue = this.nextValue;         //第一次调用next()方法时,nextSequence = -1 + 1 = 0         //第二次调用next()方法时,nextSequence = 0 + 1 = 1         //第三次调用next()方法时,nextSequence = 1 + 1 = 2         //第四次调用next()方法时,nextSequence = 2 + 1 = 3         //第五次调用next()方法时,nextSequence = 3 + 1 = 4         long nextSequence = nextValue + n;         //wrapPoint会用来判断生产者序号是否绕过RingBuffer的环         //如果wrapPoint是负数,则表示还没绕过RingBuffer的环         //如果wrapPoint是非负数,则表示已经绕过RingBuffer的环         //假设bufferSize = 3,那么:         //第一次调用next()方法时,wrapPoint = 0 - 3 = -3,还没绕过RingBuffer的环         //第二次调用next()方法时,wrapPoint = 1 - 3 = -2,还没绕过RingBuffer的环         //第三次调用next()方法时,wrapPoint = 2 - 3 = -1,还没绕过RingBuffer的环         //第四次调用next()方法时,wrapPoint = 3 - 3 = 0,已经绕过RingBuffer的环         //第五次调用next()方法时,wrapPoint = 4 - 3 = 1,已经绕过RingBuffer的环         long wrapPoint = nextSequence - bufferSize;         //cachedGatingSequence是用来将消费者的最小消费序号缓存起来         //这样就不用每次执行next()方法都要去获取消费者的最小消费序号         //第一次调用next()方法时,cachedGatingSequence = -1         //第二次调用next()方法时,cachedGatingSequence = -1         //第三次调用next()方法时,cachedGatingSequence = -1         //第四次调用next()方法时,cachedGatingSequence = -1         //第五次调用next()方法时,cachedGatingSequence = 1         long cachedGatingSequence = this.cachedValue;                  //第四次调用next()方法时,wrapPoint大于cachedGatingSequence,执行条件中的逻辑         if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {             //最小的消费者序号             long minSequence;             //自旋操作:             //Util.getMinimumSequence(gatingSequences, nextValue)的含义就是找到消费者中最小的序号值             //如果wrapPoint > 消费者中最小的序号,那么生产者线程就需要进行阻塞             //即如果生产者序号 > 消费者中最小的序号,那么就挂起并进行自旋操作             //第四次调用next()方法时,nextValue = 2,wrapPoint = 0,gatingSequences里面的消费者序号如果还没消费(即-1),则要挂起             while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {                 //TODO: Use waitStrategy to spin?                   LockSupport.parkNanos(1L);              }             //cachedValue接收了消费者的最小序号             //第四次调用next()方法时,假设消费者的最小序号minSequence为1,则cachedValue = 1             this.cachedValue = minSequence;         }         //第一次调用完next()方法时,nextValue会变为0         //第二次调用完next()方法时,nextValue会变为1         //第三次调用完next()方法时,nextValue会变为2         //第四次调用完next()方法时,nextValue会变为3         //第五次调用完next()方法时,nextValue会变为4         this.nextValue = nextSequence;         //第一次调用next()方法时,返回的nextSequence = 0         //第二次调用next()方法时,返回的nextSequence = 1         //第三次调用next()方法时,返回的nextSequence = 2         //第四次调用next()方法时,返回的nextSequence = 3         //第五次调用next()方法时,返回的nextSequence = 4         return nextSequence;     }          @Override     public void publish(long sequence) {         //设置当前生产者的sequence         cursor.set(sequence);         //通过等待策略通知阻塞的消费者         waitStrategy.signalAllWhenBlocking();     }     ... }  public final class Util {     ...     //Get the minimum sequence from an array of {@link com.lmax.disruptor.Sequence}s.     //@param sequences to compare.     //@param minimum   an initial default minimum. If the array is empty this value will be returned.     //@return the smaller of minimum sequence value found in sequences and minimum; minimum if sequences is empty     public static long getMinimumSequence(final Sequence[] sequences, long minimum) {         for (int i = 0, n = sequences.length; i < n; i++) {             long value = sequences[i].get();             minimum = Math.min(minimum, value);         }         return minimum;     }     ... }  public final class MultiProducerSequencer extends AbstractSequencer {     ...     @Override     public long next() {         return next(1);     }          @Override     public long next(int n) {         if (n < 1) {             throw new IllegalArgumentException("n must be > 0");         }         long current;         long next;         do {             //获取当前生产者的序号             current = cursor.get();             next = current + n;             //wrapPoint会用来判断生产者序号是否绕过RingBuffer的环             //如果wrapPoint是负数,则表示还没绕过RingBuffer的环             //如果wrapPoint是非负数,则表示已经绕过RingBuffer的环             long wrapPoint = next - bufferSize;             //cachedGatingSequence是用来将消费者的最小消费序号缓存起来             //这样就不用每次执行next()方法都要去获取消费者的最小消费序号             long cachedGatingSequence = gatingSequenceCache.get();                if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {                 //gatingSequence表示的是消费者的最小序号                 long gatingSequence = Util.getMinimumSequence(gatingSequences, current);                 if (wrapPoint > gatingSequence) {                     //TODO, should we spin based on the wait strategy?                     LockSupport.parkNanos(1);                      continue;                 }                 gatingSequenceCache.set(gatingSequence);             } else if (cursor.compareAndSet(current, next)) {                 break;             }         } while (true);         return next;     }     ... }

 

发表评论

评论已关闭。

相关文章