Analysis of the core principles of high-performance lock-free queue Disruptor and its application in i-theme business

Analysis of the core principles of high-performance lock-free queue Disruptor and its application in i-theme business

1. Introduction to iTheme and Disruptor

iTheme is a theme store app under vivo. Users can download themes, wallpapers, fonts, etc. to change and customize the phone interface style with one click.

Disruptor is a high-performance memory queue developed by LMAX, a British foreign exchange trading company (used to pass messages between threads within the system, different from distributed message queues such as RocketMQ and Kafka). The system developed based on Disruptor can support 6 million orders per second. At present, many well-known projects including Apache Storm, Camel, and Log4j 2 have applied Disruptor to obtain high performance. It also has many applications within vivo. For example, the Disruptor queue is used in custom monitoring to temporarily store monitoring data reported by the monitoring SDK, and it is also used in i theme to count local memory indicator data.

Next, we will introduce Disruptor from the perspectives of comparison between Disruptor and JDK built-in queues, core concepts of Disruptor, demo of Disruptor usage, core source code of Disruptor, high-performance principle of Disruptor, and application of Disruptor in i theme business.

2. Comparison with built-in queues in JDK

Let's look at the comparison between the built-in queues in JDK and the Disruptor. The underlying implementation of queues is generally divided into three types: arrays, linked lists, and heaps. Heaps are generally used to implement queues with priority characteristics, so they are not considered for now. In addition, ConcurrentLinkedQueue and LinkedTransferQueue are unbounded queues. In systems with particularly high stability requirements, bounded queues can only be selected to prevent the producer from being too fast and causing memory overflow. In this way, the remaining optional thread-safe queues in JDK are ArrayBlockingQueue

and LinkedBlockingQueue.

Since LinkedBlockingQueue is implemented based on linked lists, the data stored in linked lists is not continuous in memory, which is not friendly to caches. In addition, LinkedBlockingQueue is locked and has poor performance. ArrayBlockingQueue has the same problem. It also needs to be locked. In addition, ArrayBlockingQueue has a false sharing problem, which will also lead to poor performance. The Disruptor to be introduced today is an array-based bounded lock-free queue that complies with the principle of spatial locality and can make good use of the CPU cache. At the same time, it avoids false sharing and greatly improves performance.

3. Disruptor Core Concepts

As shown in the figure below, we can first have an intuitive concept of Disruptor from the perspective of data flow. Disruptor supports single (multiple) producer and single (multiple) consumer modes. When consuming, it supports broadcast consumption (HandlerA will consume and process all messages, and HandlerB will also consume and process all messages) and cluster consumption (HandlerA and HandlerB each consume part of the messages). After HandlerA and HandlerB complete consumption, they will hand over the message to HandlerC for further processing.

The following is an introduction to the core concepts of Disruptor based on the official architecture diagram of Disruptor:

  • RingBuffer: As mentioned earlier, Disruptor is a high-performance memory queue, and RingBuffer is the data structure of the memory queue. It is a circular array and a carrier for data.
  • Producer: Disruptor is a typical producer-consumer model. Therefore, the producer is the core component of the Disruptor programming model, which can be a single producer or multiple producers.
  • Event: A specific data entity. The producer produces the Event and stores it in the RingBuffer. The consumer consumes it from the RingBuffer for logical processing.
  • Event Handler: Developers need to implement the EventHandler interface to define consumer processing logic.
  • Wait Strategy: Wait strategy defines how to wait when the consumer cannot obtain data from the RingBuffer.
  • Event Processor: Event loop processor, EventProcessor inherits the Runnable interface, and its subclass implements the run method. There is a while loop inside, which continuously tries to obtain data from the RingBuffer and hand it over to the EventHandler for processing.
  • Sequence: RingBuffer is an array. Sequence (serial number) is used to mark where the producer data is produced and where the consumer data is consumed.
  • Sequencer: There are two implementations: single producer and multi-producer. When a producer publishes data, it needs to apply for an available sequence number first. Sequencer is used to coordinate the application of sequence numbers.
  • Sequence Barrier: See analysis below.

4. Disruptor Demo

4.1 Defining Events

Event is a specific data entity. Producers produce Events and store them in RingBuffer. Consumers consume them from RingBuffer for logical processing. Event is just an ordinary Java object and does not need to implement the interface defined in Disruptor.

 public class OrderEvent { private long value; public long getValue() { return value; } public void setValue(long value) { this.value = value; } }

4.2 Defining EventFactory

Used to create Event objects.

 public class OrderEventFactory implements EventFactory<OrderEvent> { public OrderEvent newInstance() { return new OrderEvent(); } }

4.3 Defining Producers

As you can see, the generator mainly holds the RingBuffer object to publish data. There are several points to note here:

  • RingBuffer maintains an Object array (that is, the container that actually stores data). When RingBuffer is initialized, the Object array has already initialized some empty Events using EventFactory, so it does not need to be created at runtime to improve performance. Therefore, here, an empty object is obtained by getting the specified serial number through RingBuffer, and it needs to be assigned a value before it can be published.
  • Here, the available sequence number is obtained through the next method of RingBuffer. If the RingBuffer is insufficient, it will be blocked.
  • After getting the sequence number through the next method, you need to make sure to publish the data using the publish method.
 public class OrderEventProducer { private RingBuffer<OrderEvent> ringBuffer; public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void sendData(ByteBuffer data) { // 1、在生产者发送消息的时候, 首先需要从我们的ringBuffer里面获取一个可用的序号long sequence = ringBuffer.next(); try { //2、注意此时获取的OrderEvent对象是一个没有被赋值的空对象OrderEvent event = ringBuffer.get(sequence); //3、进行实际的赋值处理event.setValue(data.getLong(0)); } finally { //4、 提交发布操作ringBuffer.publish(sequence); } } }

4.4 Defining Consumers

Consumers can implement the EventHandler interface and define their own processing logic.

 public class OrderEventHandler implements EventHandler<OrderEvent> { public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception { System.out.println("消费者: " + event.getValue()); } }

4.5 Main Process

  • First, initialize a Disruptor object. Disruptor has multiple overloaded constructors. Supports passing in EventFactory, ringBufferSize (needs to be a power of 2), executor (used to execute the event processing logic of EventHandler, one EventHandler corresponds to one thread, and one thread only serves one EventHandler), producer mode (supports single producer and multiple producers), and blocking waiting strategy. When creating a Disruptor object, a RingBuffer object of the specified size will be created internally.
  • After defining the Disruptor object, you can add consumer EventHandler through the object.
  • When the Disruptor is started, the EventHandler consumer added in step 2 will be encapsulated into an EventProcessor (implementing the Runnable interface) and submitted to the executor object specified when the Disruptor is constructed. Since the run method of the EventProcessor is a while loop, it continuously tries to obtain data from the RingBuffer. Therefore, it can be said that one EventHandler corresponds to one thread, and one thread only serves one EventHandler.
  • Get the RingBuffer held by the Disruptor, then you can create a producer, publish production data through the RingBuffer, and then the task started in the EventProcessor can consume the data and hand it over to the EventHandler for processing.
 public static void main(String[] args) { OrderEventFactory orderEventFactory = new OrderEventFactory(); int ringBufferSize = 4; ExecutorService executor = Executors.newFixedThreadPool(1); /** * 1. 实例化disruptor对象1) eventFactory: 消息(event)工厂对象2) ringBufferSize: 容器的长度3) executor: 4) ProducerType: 单生产者还是多生产者5) waitStrategy: 等待策略*/ Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory, ringBufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy()); // 2. 添加消费者的监听disruptor.handleEventsWith(new OrderEventHandler()); // 3. 启动disruptor disruptor.start(); // 4. 获取实际存储数据的容器: RingBuffer RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer(); OrderEventProducer producer = new OrderEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long i = 0; i < 5; i++) { bb.putLong(0, i); producer.sendData(bb); } disruptor.shutdown(); executor.shutdown(); }

5. Disruptor source code analysis

This article takes single (multiple) producers and single consumers as examples for analysis.

5.1 Creating a Disruptor

First, RingBuffer is created using the passed in parameters, and the created RingBuffer and the passed in executor are handed over to the Disruptor object.

 public Disruptor( final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor, final ProducerType producerType, final WaitStrategy waitStrategy){ this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor); }

Next, we analyze the creation process of RingBuffer, which is divided into single producer and multiple producers.

 public static <E> RingBuffer<E> create( ProducerType producerType, EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy){ switch (producerType){ case SINGLE: // 单生产者return createSingleProducer(factory, bufferSize, waitStrategy); case MULTI: // 多生产者return createMultiProducer(factory, bufferSize, waitStrategy); default: throw new IllegalStateException(producerType.toString()); } }

Regardless of whether it is a single producer or multiple producers, a RingBuffer object will eventually be created, but the Sequencer object passed to the RingBuffer is different. As you can see, an Object array is eventually created inside the RingBuffer to store the Event data. There are a few points to note here:

  • RingBuffer is implemented using an array. After creating the array, the fill method is called to call the EventFactory factory method to initialize the elements in the array. When these elements are used later, they are directly obtained through subscripts and assigned to the corresponding properties. This avoids repeated creation of Event objects and frequent GC.
  • The elements in the RingBuffe array are all created at once during initialization, so the memory addresses of these elements are likely to be continuous. When consumers consume, they follow the principle of spatial locality. After consuming the first Event, the second Event will be consumed soon. When consuming the first Event, the CPU will also load the events after the first Event in the memory into the cache. In this way, when the second Event is consumed, it is already in the CPU cache, so it does not need to be loaded from the memory, which can greatly improve performance.
 public static <E> RingBuffer<E> createSingleProducer( EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy){ SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy); return new RingBuffer<E>(factory, sequencer); }
 RingBufferFields( EventFactory<E> eventFactory, Sequencer sequencer){ // 省略部分代码... // 额外创建2个填充空间的大小, 首尾填充, 避免数组的有效载荷和其它成员加载到同一缓存行this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; fill(eventFactory); } private void fill(EventFactory<E> eventFactory){ for (int i = 0; i < bufferSize; i++){ // BUFFER_PAD + i为真正的数组索引entries[BUFFER_PAD + i] = eventFactory.newInstance(); } }

5.2 Adding Consumers

The core code for adding consumers is as follows. The core is to encapsulate an EventHandler into a BatchEventProcessor.

Then add it to consumerRepository. When Disruptor is started later, it will traverse all BatchEventProcessors (implementing the Runnable interface) in consumerRepository and submit BatchEventProcessor tasks to the thread pool.

 public final EventHandlerGroup<T> handleEventsWith( final EventHandler<? super T>... handlers){ // 通过disruptor对象直接调用handleEventsWith方法时传的是空的Sequence数组return createEventProcessors(new Sequence[0], handlers); }
 EventHandlerGroup<T> createEventProcessors( final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers) { // 收集添加的消费者的序号final Sequence[] processorSequences = new Sequence[eventHandlers.length]; // 本批次消费由于添加在同一个节点之后, 因此共享该屏障final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); // 为每个EventHandler创建一个BatchEventProcessor for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) { final EventHandler<? super T> eventHandler = eventHandlers[i]; final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler); if (exceptionHandler != null){ batchEventProcessor.setExceptionHandler(exceptionHandler); } // 添加到消费者信息仓库中consumerRepository.add(batchEventProcessor, eventHandler, barrier); processorSequences[i] = batchEventProcessor.getSequence(); } // 更新网关序列(生产者只需要关注所有的末端消费者节点的序列) updateGatingSequencesForNextInChain(barrierSequences, processorSequences); return new EventHandlerGroup<>(this, consumerRepository, processorSequences); }

After creating the Disruptor object, you can add an EventHandler through the Disruptor object. Here is one thing to note: when calling the handleEventsWith method directly through the Disruptor object, an empty Sequence array is passed. What does this mean? You can see that the field name of the createEventProcessors method receiving the empty Sequence array is barrierSequences, which means barrier sequence number in Chinese. How to understand this field?

For example, the following code adds two handlers to the Disruptor, denoted as handlerA and handlerB. This is serial consumption. For an Event, handlerA must consume it before handlerB can consume it. For handlerA, it has no preceding consumer (the consumer can consume wherever the producer produces), so its barrierSequences is an empty array. For handlerB, its preceding consumer is handlerA, so its barrierSequences is A's consumption progress, which means that handlerB's consumption progress is less than handlerA's consumption progress.


 disruptor.handleEventsWith(handlerA).handleEventsWith(handlerB);

If the handler is added in the following way, handlerA and handlerB will consume all Event data, similar to broadcast consumption in MQ messages, and the barrierSequences array of handlerC contains the consumption progress of handlerA and handlerB. This is why barrierSequences is an array. When handlerC consumes data later, it will take the smaller value of the consumption progress of A and B for judgment. For example, if A consumes to progress 6 and B consumes to progress 4, then C can only consume data with index 3. This is also the meaning of barrierSequences.

 disruptor.handleEventsWith(handlerA, handlerB).handleEventsWith(handlerC);



5.3 Starting the Disruptor

The startup logic of the Disruptor is relatively simple. It traverses the EventProcessors (implementing the Runnable interface) collected in the consumerRepository and submits them to the executor specified when creating the Disruptor. The run method of the EventProcessor will start a while loop, constantly trying to obtain data from the RingBuffer for consumption.

 disruptor.start();
 public RingBuffer<T> start() { checkOnlyStartedOnce(); for (final ConsumerInfo consumerInfo : consumerRepository) { consumerInfo.start(executor); } return ringBuffer; } public void start(final Executor executor) { executor.execute(eventprocessor); }

5.4 Publishing Data

Before analyzing the source code of Disruptor's data publishing, let's review the overall process of data publishing.

  • Call the next method to obtain an available sequence number. This method may be blocked.
  • Get the corresponding Event from the RingBuffer using the sequence number obtained in the previous step. Since all Events in the RingBuffer have been created during initialization, only an empty object is obtained here.
  • Therefore, the next step is to assign business values ​​to the empty object.
  • Calling the next method requires final release in the finally method to mark the sequence number data as having been actually produced.
 public void sendData(ByteBuffer data) { long sequence = ringBuffer.next(); try { OrderEvent event = ringBuffer.get(sequence); event.setValue(data.getLong(0)); } finally { ringBuffer.publish(sequence); } }

5.4.1 Get the serial number

The next method applies for a sequence number by default. nextValue indicates the assigned sequence number, nextSequence indicates applying for n more sequence numbers based on this number (n is 1 here), and cachedValue indicates the minimum consumption progress of the cached consumer.

Assume there is a RingBuffer of size 8, the data with index 6 has been published (nextValue is 6), and the consumer has not started consuming (cachedValue and

cachedGatingSequence is -1). At this time, the producer wants to continue publishing data and calls the next() method to apply for the position with sequence number 7 (nextSequence is 7). The calculated wrapPoint is 7-8=-1. At this time, wrapPoint is equal to

cachedGatingSequence, you can continue to publish data, as shown in the left figure. Finally, the nextValue is assigned a value of 7, indicating that the position of sequence number 7 has been occupied by the producer.

Then the producer continues to call the next() method to apply for data with sequence number 0. At this time, nextValue is 7, nextSequence is 8, and wrapPoint is 0. Since the consumer has not consumed it yet,

(cachedGatingSequence is -1), at this time wrapPoint is greater than cachedGatingSequence, so the if judgment of the next method is established, and LockSupport.parkNanos will be called to block and wait for consumers to consume. The getMinimumSequence method is used to obtain the minimum consumption progress of multiple consumers.


 public long next() { return next(1); }
 public long next(int n) { /** * 已分配的序号的缓存(已分配到这里), 初始-1. 可以看该方法的返回值nextSequence, * 接下来生产者就会该往该位置写数据, 它赋值给了nextValue, 所以下一次调用next方* 法时, nextValue位置就是表示已经生产好了数据, 接下来要申请nextSequece的数据*/ long nextValue = this.nextValue; // 本次申请分配的序号long nextSequence = nextValue + n; // 构成环路的点:环形缓冲区可能追尾的点= 等于本次申请的序号-环形缓冲区大小// 如果该序号大于最慢消费者的进度, 那么表示追尾了, 需要等待long wrapPoint = nextSequence - bufferSize; // 上次缓存的最小网关序号(消费最慢的消费者的进度) long cachedGatingSequence = this.cachedValue; // wrapPoint > cachedGatingSequence 表示生产者追上消费者产生环路(追尾), 即缓冲区已满, // 此时需要获取消费者们最新的进度, 以确定是否队列满if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { // 插入StoreLoad内存屏障/栅栏, 保证可见性。 // 因为publish使用的是set()/putOrderedLong, 并不保证其他消费者能及时看见发布的数据// 当我再次申请更多的空间时, 必须保证消费者能消费发布的数据cursor.setVolatile(nextValue); long minSequence; // minSequence是多个消费者的最小序号, 要等所有消费者消费完了才能继续生产while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { LockSupport.parkNanos(1L); } // 缓存生产者们最新的消费进度this.cachedValue = minSequence; } // 这里只写了缓存, 并未写volatile变量, 因为只是预分配了空间但是并未被发布数据, // 不需要让其他消费者感知到。消费者只会感知到真正被发布的序号this.nextValue = nextSequence; return nextSequence; }

5.4.2 Get Event by Sequence Number

Directly obtain the Event object of the specified serial number through the Unsafe tool class. At this time, an empty object is obtained, so the next step is to assign business values ​​to the Event object. After the assignment is completed, call the publish method to finally publish the data.

 OrderEvent event = ringBuffer.get(sequence);
 public E get(long sequence) { return elementAt(sequence); }
 protected final E elementAt(long sequence) { return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)); }

5.4.3 Publishing Data

After the producer obtains the available sequence number, it first assigns the business value to the empty Event object at the sequence number, and then calls the publish method of RingBuffer to publish the data. RingBuffer will delegate to the sequencer object it holds (single producer and multiple producers correspond to different sequencers) to actually publish. The publishing logic of a single producer is relatively simple. It updates the cursor progress (cursor represents the production progress of the producer, the position has actually published data, and the nextSequence in the next method represents the maximum sequence number applied by the producer, and the data may not have been actually published yet), and then wakes up the waiting consumers.

waitStrategy has different implementations, so the wake-up logic is also different. For example, when the BusySpinWaitStrategy strategy is adopted, the consumer spins and waits when it cannot obtain data, and then continues to determine whether there is new data to be consumed. Therefore, the signalAllWhenBlocking of the BusySpinWaitStrategy strategy is an empty implementation and does nothing.

 ringBuffer.publish(sequence);
 public void publish(long sequence) { sequencer.publish(sequence); }
 public void publish(long sequence) { // 更新生产者进度cursor.set(sequence); // 唤醒等待的消费者waitStrategy.signalAllWhenBlocking(); }

5.4.4 Consumption data

As mentioned earlier, when the Disruptor starts, it will submit the EventProcessor that encapsulates the EventHandler (BatchEventProcessor is used as an example here) to the thread pool for execution. The run method of BatchEventProcessor will call the processEvents method to continuously try to obtain data from the RingBuffer for consumption. The logic of processEvents is analyzed below (the code has been simplified). It will start a while loop and call the sequenceBarrier.waitFor method to obtain the maximum available sequence number. For example, as mentioned in the section on obtaining the sequence number, the producer continues to produce, but the consumer has not consumed. At this time, the producer has already produced all the data in the RingBuffer, and the producer can no longer continue to produce. The producer will be blocked at this time. Assume that the consumer starts consuming at this time, so nextSequence is 0, and

AvailableSequence is 7. At this time, the consumer can consume in batches and consume all the 8 produced data. After the consumption is completed, the consumption progress is updated. After the consumption progress is updated, the producer can perceive the latest consumption progress through the Util.getMinimumSequence method, so that it will no longer be blocked and continue to publish data.

 private void processEvents() { T event = null; // sequence记录消费者的消费进度, 初始为-1 long nextSequence = sequence.get() + 1L; // 死循环,因此不会让出线程,需要独立的线程(每一个EventProcessor都需要独立的线程) while (true) { // 通过屏障获取到的最大可用序号final long availableSequence = sequenceBarrier.waitFor(nextSequence); // 批量消费while (nextSequence <= availableSequence) { event = dataProvider.get(nextSequence); eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } // 更新消费进度(批量消费, 每次消费只更新一次Sequence, 减少性能消耗) sequence.set(availableSequence); } }

Let's analyze the waitFor method of SequenceBarrier. First, it calls the waitFor method of waitStrategy to obtain the maximum available sequence number. Taking the BusySpinWaitStrategy strategy as an example, the meanings of the three parameters of its waitFor method are:

  • sequence: the sequence number that the consumer expects to obtain, that is, the progress that the current consumer has consumed + 1
  • cursor: the current producer's generation progress
  • dependentSequence: The consumption progress of the previous consumer that the consumer depends on. This field is used when adding EventHandler, creating
    BatchEventProcessor is created. If the current consumer has no previous dependent consumers, then it only needs to care about the progress of the producer. It can consume wherever the producer produces, so dependentSequence is cursor. If the current consumer has a previous dependent consumer, then dependentSequence is cursor.
    FixedSequenceGroup(dependentSequences).

Because dependentSequence is divided into two cases, the logic of waitFor can also be divided into two cases for discussion:

  • The current consumer has no previous consumer: Assume that the cursor is 6, that is, the data with sequence number 6 has been published. At this time, the passed sequence is 6, then the waitFor method can directly return availableSequence(6), and it can be consumed normally. After the data with sequence number 6 is consumed, the consumer continues to call waitFor to obtain data. The passed sequence is 7, and availableSequence is still not 6 at this time, so the consumer needs to spin and wait. When the producer continues to publish data, because dependentSequence holds the producer's generation progress, the consumer can perceive it and continue to consume.
  • The current consumer has a predecessor consumer: Assume that cursor is 6, and the current consumer C has two predecessor dependent consumers A (consumption progress is 5) and B (consumption progress is 4), then availableSequence
    (FixedSequenceGroup instance, its get method is to get the minimum value of A and B, which is 4) is 4. If the current consumer C expects to consume the data with index 4, it can consume it normally, but it cannot consume the data with index 5. It needs to wait for its predecessor B to consume the data with progress 5 before it can continue to consume.

After the waitFor method of waitStrategy returns and gets the maximum available sequence number availableSequence, you need to call the sequencer again.

getHighestPublishedSequence gets the highest sequence number that is truly available. This is related to the producer model. If it is a single producer, because the data is published continuously, the passed availableSequence is directly returned. If it is a multi-producer, because multiple producers have multiple threads producing data, the published data is not continuous, so it is necessary to pass

The getHighestPublishedSequence method obtains the maximum published and continuous sequence number, because the sequence numbers must be obtained sequentially for consumption and cannot be skipped.

 public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException { /** * sequence: 消费者期望获取的序号* cursorSequence: 生产者的序号* dependentSequence: 消费者需要依赖的序号*/ long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); if (availableSequence < sequence) { return availableSequence; } // 目标sequence已经发布了, 这里获取真正的最大序号(和生产者模型有关) return sequencer.getHighestPublishedSequence(sequence, availableSequence); }
 public long waitFor( final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; // 确保该序号已经被我前面的消费者消费(协调与其他消费者的关系) while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); // 自旋等待ThreadHints.onSpinWait(); } return availableSequence; }

6. Analysis of Disruptor’s high performance principle

6.1 Space Preallocation

As mentioned in the previous source code analysis, RingBuffer maintains an Object array (that is, the container that actually stores data). When RingBuffer is initialized, the Object array has already initialized some empty Events using EventFactory. It does not need to be created at runtime to avoid frequent GC.

In addition, the elements in the RingBuffe array are all created at once during initialization, so the memory addresses of these elements are likely to be continuous. When consumers consume, they follow the principle of spatial locality. After consuming the first Event, the second Event will be consumed soon. When consuming the first Event, the CPU will also load the events after the first Event in the memory into the cache. In this way, when the second Event is consumed, it is already in the CPU cache, so it does not need to be loaded from the memory, which can also greatly improve performance.

6.2. Avoiding False Sharing

6.2.1 An example of false sharing

As shown in the following code, a Pointer class is defined, which has two long type member variables x and y. Then in the main method, two threads increment x and y of the same Pointer object 100000000 times respectively. Finally, the method time consumption is counted. After multiple tests on my local computer, the average time is about 3600ms.

 public class Pointer { volatile long x; volatile long y; @Override public String toString() { return new StringJoiner(", ", Pointer.class.getSimpleName() + "[", "]") .add("x=" + x) .add("y=" + y) .toString(); } }
 public static void main(String[] args) throws InterruptedException { Pointer pointer = new Pointer(); int num = 100000000; long start = System.currentTimeMillis(); Thread t1 = new Thread(() -> { for(int i = 0; i < num; i++){ pointer.x++; } }); Thread t2 = new Thread(() -> { for(int i = 0; i < num; i++){ pointer.y++; } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(System.currentTimeMillis() - start); System.out.println(pointer); }

Then modify the Pointer class as follows: insert 7 long type variables between variables x and y, that's all. Then continue to count the time consumption through the above main method, the average is about 500ms. It can be seen that the time consumption before the modification is more than 7 times that after the modification (avoiding false sharing). So what is false sharing, and why can avoiding false sharing lead to such a big performance improvement?

 public class Pointer { volatile long x; long p1, p2, p3, p4, p5, p6, p7; volatile long y; @Override public String toString() { return new StringJoiner(", ", Pointer.class.getSimpleName() + "[", "]") .add("x=" + x) .add("y=" + y) .toString(); } }

6.2.2 Why Avoiding False Sharing Can Improve Performance

The access speed of memory is much slower than that of CPU. In order to use CPU efficiently, a cache is added between CPU and memory, which is called CPU Cache. In order to improve performance, more data needs to be obtained from CPU Cache instead of from memory. CPU Cache loads data from memory in units of cache lines (usually 64 bytes). Java's long type is 8 bytes, so one cache line can store 8 long type variables.

However, this type of loading brings a disadvantage. As shown in the above example, suppose there is a long type variable x, and there is another long type variable y next to it. When x is loaded, y will also be loaded. If at this time, the thread of CPU Core1 is modifying x, and the thread of another CPU Core2 is reading y. When the former modifies x, x and y will be loaded into the CPU Cache corresponding to CPU Core1 at the same time. After the update, x and all other cache lines containing x will be invalid. When the thread of CPU Core2 reads y, it finds that this cache line has been invalidated and needs to be reloaded from the main memory.

This is false sharing. x and y are unrelated, but the update of x requires re-reading from the main memory, which slows down the program performance. One solution is to fill 7 long type variables between x and y as done in the above example to ensure that x and y are not loaded into the same cache line. Java8 also adds a new annotation @Contended (JVM plus the startup parameter -XX:-RestrictContended will take effect), which can also avoid false sharing.

6.2.3. Using Pseudo-Sharing in Disruptor

The Disruptor uses the value field of the Sequence class to indicate the production/consumption progress. You can see that 7 long type variables are filled before and after the field to avoid false sharing. In addition, the array inside the RingBuffer,

This technique is also used by SingleProducerSequencer and others.

 class LhsPadding { protected long p1, p2, p3, p4, p5, p6, p7; } class Value extends LhsPadding { protected volatile long value; } class RhsPadding extends Value { protected long p9, p10, p11, p12, p13, p14, p15; }

6.3. Lock-free

When producers produce data, they need to enter the queue. When consumers consume data, they need to exit the queue. When entering the queue, you cannot overwrite elements that have not been consumed. When exiting the queue, you cannot read elements that have not been written. Therefore, the Disruptor needs to maintain an entry index (where the producer data is produced, corresponding to the cursor in AbstractSequencer) and a dequeue index (the sequence number with the smallest consumption progress among all consumers).

The most complicated operation in Disruptor is the queue operation. Let's take the next(n) method of MultiProducerSequencer (applying for n sequence numbers) as an example to analyze how Disruptor implements lock-free operation. The code is as follows. It determines whether there are enough sequence numbers (free positions). If not, it gives up the CPU usage right and then re-determines. If so, it uses CAS to set the cursor (queue index).

 public long next(int n) { do { // cursor类似于入队索引, 指的是上次生产到这里current = cursor.get(); // 目标是再生产n个next = current + n; // 前文分析过, 用于判断消费者是否已经追上生产进度, 生产者能否申请到n个序号long wrapPoint = next - bufferSize; // 获取缓存的上一次的消费进度long cachedGatingSequence = gatingSequenceCache.get(); // 第一步:空间不足就继续等待if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { // 重新计算下所有消费者里的最小消费进度long gatingSequence = Util.getMinimumSequence(gatingSequences, current); // 依然没有足够的空间, 让出CPU使用权if (wrapPoint > gatingSequence) { LockSupport.parkNanos(1); continue; } // 更新下最新的最小的消费进度gatingSequenceCache.set(gatingSequence); } // 第二步:看见空间足够时尝试CAS竞争空间else if (cursor.compareAndSet(current, next)) { break; } } while (true); return next; }

6.4. Support batch consumption definition Event

This is easier to understand. When analyzing the logic of consumption data, it was introduced in the previous article that consumers will obtain the largest available serial number and then consume these messages in batches.

7. The use of Disruptor in i-theme business

Many open source projects use Disruptor, such as Log4j2, log framework, to implement asynchronous logging. Disruptor is also used in HBase, Storm and other projects. Vivo's i-theme business also uses Disruptor. The following is a brief introduction to its two usage scenarios.

7.1. Monitoring data reporting

The business monitoring system is very important to enterprises and can help enterprises discover and solve problems in a timely manner. It can easily detect business indicator data, improve business decisions, and ensure the sustainable development of the business. iTheme uses Disruptor (multiple producers and single consumer) to temporarily store the business indicator data to be reported, and then there are regular tasks to continuously extract data and report to the monitoring platform, as shown in the figure below.



7.2. Statistical analysis of local cache keys

iTheme business uses a lot of local cache. In order to count the number of keys (deduplication) in the local cache and the number of keys in each cache mode, consider using Disruptor to temporarily store and consume processing data. Because many places in the business code involve access to local caches, that is, producers are multi-threaded. Considering that consumption processing is relatively simple, and if multi-thread consumption is used, lock synchronization is involved, so consumers adopt a single-threaded mode.

The overall process is shown in the figure below. First, add the call to cache access statistics in the cache access tool class. After the cache access data enters RingBuffer, a single-threaded consumer uses HyperLogLog to re-count the number of different keys, and uses regular matching to count the number of keys in each mode. Then, asynchronous tasks obtain statistical results regularly for display.

It should be noted that because the RingBuffer queue size is fixed, if the producer produces too fast and the consumer cannot consume it, if the next method is used to apply for the serial number, if there is not enough space left, the producer will block. Therefore, it is recommended to use the tryPublishEvent method to publish data. It internally uses the tryNext method to apply for the serial number. If the method fails to apply for the available serial number, an exception will be thrown. In this way, the producer can perform compatibility processing when it senses, rather than blocking and waiting.


8. Use suggestions

  • Disruptor is based on the producer-consumer model. If production is fast and consumption is slow, the producer will be unable to write data. Therefore, it is not recommended to process long-term businesses in the Disruptor consumption thread.
  • An EventHandler corresponds to a thread, and a thread only serves one EventHandler. Disruptor needs to be for each
    EventHandler (EventProcessor) creates a thread. Therefore, when creating a Disruptor, it is not recommended to pass into the specified thread pool, but the Disruptor itself creates the corresponding thread according to the number of EventHandlers.
  • When a producer calls the next method to apply for a serial number, if the available serial number is not obtained, it will block. This is necessary to note. It is recommended to use the tryPublishEvent method. The producer will return immediately when the available serial number is not applied for, and will not block the business thread.
  • If you use the next method to request an available sequence number, you need to make sure that you call publish in the finally method to actually publish the data.
  • Set a waiting policy reasonably. When consumers cannot get data, they will wait according to the set waiting policy. BlockingWaitStrategry is the least efficient policy, but it consumes the least CPU. YieldingWaitStrategy has lower latency, higher throughput, and higher CPU occupancy. This policy can be used when the number of CPUs is sufficient.

IX. Conclusion

This article first introduces the high-performance lock-free memory queue Disruptor by comparing the built-in thread-safe queues and the features of Disruptor in JDK. Then, the core concepts and basic use of Disruptor are introduced, allowing readers to establish a preliminary understanding of Disruptor. Then, the core implementation and high-performance principles of Disruptor are introduced from the perspective of source code and principles (space pre-allocation, avoiding pseudo-sharing, lock-free, and supporting batch consumption). Secondly, the application of Disruptor in practice is introduced in combination with the i-theme business. Finally, based on the above principles analysis and application practice, some Disruptor best practice strategies are summarized.


Reference articles:

https://time.geekbang.org/column/article/132477

https://lmax-exchange.github.io/disruptor/


<<:  Let’s talk about how to customize the appearance of symbol images in SwiftUI

>>:  The answers to the Super Code Challenge have been revealed! Which questions are you stuck on?

Recommend

Keywords in the 4G virtual operator era: opportunities and challenges coexist

Introduction: In 2014, the 4G industry developed ...

A clear, complete and reusable solution for recalling lost users

Peter Drucker, the father of modern management, o...

5.20 Programmer: The goddess is waiting for you to hook up

Disheveled, unkempt, wearing a backpack and flip-...

Website optimization SEO, how to optimize tags?

For SEOers, using some tags in HTML code is more ...

Full experience of APP promotion channels, conversion rate is the first

The rise of mobile Internet is inseparable from t...

Xiaohongshu KOL promotion: the secret of popular notes!

As self-media and internet celebrities became mor...

2015 App Promotion Guide (Full Version)

Online channels 1. Basics are online The major mo...

Double 11 anti-routine marketing, do something different!

Today, Double 11 is no longer just an e-commerce ...

Where can I check Douyin credit score? What are the rules?

Now many friends have opened accounts on Douyin. ...

How to use data to drive operational growth

Maybe we won’t get a satisfactory result immediat...

Teach you how to create a complete bidding promotion plan!

Many SEMers have asked a question in the bidding ...