1. Introduction to iTheme and DisruptoriTheme is a theme store app under vivo. Users can download themes, wallpapers, fonts, etc. to change and customize the phone interface style with one click. Disruptor is a high-performance memory queue developed by LMAX, a British foreign exchange trading company (used to pass messages between threads within the system, different from distributed message queues such as RocketMQ and Kafka). The system developed based on Disruptor can support 6 million orders per second. At present, many well-known projects including Apache Storm, Camel, and Log4j 2 have applied Disruptor to obtain high performance. It also has many applications within vivo. For example, the Disruptor queue is used in custom monitoring to temporarily store monitoring data reported by the monitoring SDK, and it is also used in i theme to count local memory indicator data. Next, we will introduce Disruptor from the perspectives of comparison between Disruptor and JDK built-in queues, core concepts of Disruptor, demo of Disruptor usage, core source code of Disruptor, high-performance principle of Disruptor, and application of Disruptor in i theme business. 2. Comparison with built-in queues in JDKLet's look at the comparison between the built-in queues in JDK and the Disruptor. The underlying implementation of queues is generally divided into three types: arrays, linked lists, and heaps. Heaps are generally used to implement queues with priority characteristics, so they are not considered for now. In addition, ConcurrentLinkedQueue and LinkedTransferQueue are unbounded queues. In systems with particularly high stability requirements, bounded queues can only be selected to prevent the producer from being too fast and causing memory overflow. In this way, the remaining optional thread-safe queues in JDK are ArrayBlockingQueue and LinkedBlockingQueue. Since LinkedBlockingQueue is implemented based on linked lists, the data stored in linked lists is not continuous in memory, which is not friendly to caches. In addition, LinkedBlockingQueue is locked and has poor performance. ArrayBlockingQueue has the same problem. It also needs to be locked. In addition, ArrayBlockingQueue has a false sharing problem, which will also lead to poor performance. The Disruptor to be introduced today is an array-based bounded lock-free queue that complies with the principle of spatial locality and can make good use of the CPU cache. At the same time, it avoids false sharing and greatly improves performance. 3. Disruptor Core ConceptsAs shown in the figure below, we can first have an intuitive concept of Disruptor from the perspective of data flow. Disruptor supports single (multiple) producer and single (multiple) consumer modes. When consuming, it supports broadcast consumption (HandlerA will consume and process all messages, and HandlerB will also consume and process all messages) and cluster consumption (HandlerA and HandlerB each consume part of the messages). After HandlerA and HandlerB complete consumption, they will hand over the message to HandlerC for further processing. The following is an introduction to the core concepts of Disruptor based on the official architecture diagram of Disruptor:
4. Disruptor Demo4.1 Defining EventsEvent is a specific data entity. Producers produce Events and store them in RingBuffer. Consumers consume them from RingBuffer for logical processing. Event is just an ordinary Java object and does not need to implement the interface defined in Disruptor. 4.2 Defining EventFactoryUsed to create Event objects. 4.3 Defining ProducersAs you can see, the generator mainly holds the RingBuffer object to publish data. There are several points to note here:
4.4 Defining ConsumersConsumers can implement the EventHandler interface and define their own processing logic. 4.5 Main Process
5. Disruptor source code analysisThis article takes single (multiple) producers and single consumers as examples for analysis. 5.1 Creating a DisruptorFirst, RingBuffer is created using the passed in parameters, and the created RingBuffer and the passed in executor are handed over to the Disruptor object. Next, we analyze the creation process of RingBuffer, which is divided into single producer and multiple producers. Regardless of whether it is a single producer or multiple producers, a RingBuffer object will eventually be created, but the Sequencer object passed to the RingBuffer is different. As you can see, an Object array is eventually created inside the RingBuffer to store the Event data. There are a few points to note here:
5.2 Adding ConsumersThe core code for adding consumers is as follows. The core is to encapsulate an EventHandler into a BatchEventProcessor. Then add it to consumerRepository. When Disruptor is started later, it will traverse all BatchEventProcessors (implementing the Runnable interface) in consumerRepository and submit BatchEventProcessor tasks to the thread pool. After creating the Disruptor object, you can add an EventHandler through the Disruptor object. Here is one thing to note: when calling the handleEventsWith method directly through the Disruptor object, an empty Sequence array is passed. What does this mean? You can see that the field name of the createEventProcessors method receiving the empty Sequence array is barrierSequences, which means barrier sequence number in Chinese. How to understand this field? For example, the following code adds two handlers to the Disruptor, denoted as handlerA and handlerB. This is serial consumption. For an Event, handlerA must consume it before handlerB can consume it. For handlerA, it has no preceding consumer (the consumer can consume wherever the producer produces), so its barrierSequences is an empty array. For handlerB, its preceding consumer is handlerA, so its barrierSequences is A's consumption progress, which means that handlerB's consumption progress is less than handlerA's consumption progress. If the handler is added in the following way, handlerA and handlerB will consume all Event data, similar to broadcast consumption in MQ messages, and the barrierSequences array of handlerC contains the consumption progress of handlerA and handlerB. This is why barrierSequences is an array. When handlerC consumes data later, it will take the smaller value of the consumption progress of A and B for judgment. For example, if A consumes to progress 6 and B consumes to progress 4, then C can only consume data with index 3. This is also the meaning of barrierSequences. 5.3 Starting the DisruptorThe startup logic of the Disruptor is relatively simple. It traverses the EventProcessors (implementing the Runnable interface) collected in the consumerRepository and submits them to the executor specified when creating the Disruptor. The run method of the EventProcessor will start a while loop, constantly trying to obtain data from the RingBuffer for consumption. 5.4 Publishing DataBefore analyzing the source code of Disruptor's data publishing, let's review the overall process of data publishing.
5.4.1 Get the serial numberThe next method applies for a sequence number by default. nextValue indicates the assigned sequence number, nextSequence indicates applying for n more sequence numbers based on this number (n is 1 here), and cachedValue indicates the minimum consumption progress of the cached consumer. Assume there is a RingBuffer of size 8, the data with index 6 has been published (nextValue is 6), and the consumer has not started consuming (cachedValue and cachedGatingSequence is -1). At this time, the producer wants to continue publishing data and calls the next() method to apply for the position with sequence number 7 (nextSequence is 7). The calculated wrapPoint is 7-8=-1. At this time, wrapPoint is equal to cachedGatingSequence, you can continue to publish data, as shown in the left figure. Finally, the nextValue is assigned a value of 7, indicating that the position of sequence number 7 has been occupied by the producer. Then the producer continues to call the next() method to apply for data with sequence number 0. At this time, nextValue is 7, nextSequence is 8, and wrapPoint is 0. Since the consumer has not consumed it yet, (cachedGatingSequence is -1), at this time wrapPoint is greater than cachedGatingSequence, so the if judgment of the next method is established, and LockSupport.parkNanos will be called to block and wait for consumers to consume. The getMinimumSequence method is used to obtain the minimum consumption progress of multiple consumers. 5.4.2 Get Event by Sequence NumberDirectly obtain the Event object of the specified serial number through the Unsafe tool class. At this time, an empty object is obtained, so the next step is to assign business values to the Event object. After the assignment is completed, call the publish method to finally publish the data. 5.4.3 Publishing DataAfter the producer obtains the available sequence number, it first assigns the business value to the empty Event object at the sequence number, and then calls the publish method of RingBuffer to publish the data. RingBuffer will delegate to the sequencer object it holds (single producer and multiple producers correspond to different sequencers) to actually publish. The publishing logic of a single producer is relatively simple. It updates the cursor progress (cursor represents the production progress of the producer, the position has actually published data, and the nextSequence in the next method represents the maximum sequence number applied by the producer, and the data may not have been actually published yet), and then wakes up the waiting consumers. waitStrategy has different implementations, so the wake-up logic is also different. For example, when the BusySpinWaitStrategy strategy is adopted, the consumer spins and waits when it cannot obtain data, and then continues to determine whether there is new data to be consumed. Therefore, the signalAllWhenBlocking of the BusySpinWaitStrategy strategy is an empty implementation and does nothing. 5.4.4 Consumption dataAs mentioned earlier, when the Disruptor starts, it will submit the EventProcessor that encapsulates the EventHandler (BatchEventProcessor is used as an example here) to the thread pool for execution. The run method of BatchEventProcessor will call the processEvents method to continuously try to obtain data from the RingBuffer for consumption. The logic of processEvents is analyzed below (the code has been simplified). It will start a while loop and call the sequenceBarrier.waitFor method to obtain the maximum available sequence number. For example, as mentioned in the section on obtaining the sequence number, the producer continues to produce, but the consumer has not consumed. At this time, the producer has already produced all the data in the RingBuffer, and the producer can no longer continue to produce. The producer will be blocked at this time. Assume that the consumer starts consuming at this time, so nextSequence is 0, and AvailableSequence is 7. At this time, the consumer can consume in batches and consume all the 8 produced data. After the consumption is completed, the consumption progress is updated. After the consumption progress is updated, the producer can perceive the latest consumption progress through the Util.getMinimumSequence method, so that it will no longer be blocked and continue to publish data. Let's analyze the waitFor method of SequenceBarrier. First, it calls the waitFor method of waitStrategy to obtain the maximum available sequence number. Taking the BusySpinWaitStrategy strategy as an example, the meanings of the three parameters of its waitFor method are:
Because dependentSequence is divided into two cases, the logic of waitFor can also be divided into two cases for discussion:
After the waitFor method of waitStrategy returns and gets the maximum available sequence number availableSequence, you need to call the sequencer again. getHighestPublishedSequence gets the highest sequence number that is truly available. This is related to the producer model. If it is a single producer, because the data is published continuously, the passed availableSequence is directly returned. If it is a multi-producer, because multiple producers have multiple threads producing data, the published data is not continuous, so it is necessary to pass The getHighestPublishedSequence method obtains the maximum published and continuous sequence number, because the sequence numbers must be obtained sequentially for consumption and cannot be skipped. 6. Analysis of Disruptor’s high performance principle6.1 Space PreallocationAs mentioned in the previous source code analysis, RingBuffer maintains an Object array (that is, the container that actually stores data). When RingBuffer is initialized, the Object array has already initialized some empty Events using EventFactory. It does not need to be created at runtime to avoid frequent GC. In addition, the elements in the RingBuffe array are all created at once during initialization, so the memory addresses of these elements are likely to be continuous. When consumers consume, they follow the principle of spatial locality. After consuming the first Event, the second Event will be consumed soon. When consuming the first Event, the CPU will also load the events after the first Event in the memory into the cache. In this way, when the second Event is consumed, it is already in the CPU cache, so it does not need to be loaded from the memory, which can also greatly improve performance. 6.2. Avoiding False Sharing6.2.1 An example of false sharingAs shown in the following code, a Pointer class is defined, which has two long type member variables x and y. Then in the main method, two threads increment x and y of the same Pointer object 100000000 times respectively. Finally, the method time consumption is counted. After multiple tests on my local computer, the average time is about 3600ms. Then modify the Pointer class as follows: insert 7 long type variables between variables x and y, that's all. Then continue to count the time consumption through the above main method, the average is about 500ms. It can be seen that the time consumption before the modification is more than 7 times that after the modification (avoiding false sharing). So what is false sharing, and why can avoiding false sharing lead to such a big performance improvement? 6.2.2 Why Avoiding False Sharing Can Improve PerformanceThe access speed of memory is much slower than that of CPU. In order to use CPU efficiently, a cache is added between CPU and memory, which is called CPU Cache. In order to improve performance, more data needs to be obtained from CPU Cache instead of from memory. CPU Cache loads data from memory in units of cache lines (usually 64 bytes). Java's long type is 8 bytes, so one cache line can store 8 long type variables. However, this type of loading brings a disadvantage. As shown in the above example, suppose there is a long type variable x, and there is another long type variable y next to it. When x is loaded, y will also be loaded. If at this time, the thread of CPU Core1 is modifying x, and the thread of another CPU Core2 is reading y. When the former modifies x, x and y will be loaded into the CPU Cache corresponding to CPU Core1 at the same time. After the update, x and all other cache lines containing x will be invalid. When the thread of CPU Core2 reads y, it finds that this cache line has been invalidated and needs to be reloaded from the main memory. This is false sharing. x and y are unrelated, but the update of x requires re-reading from the main memory, which slows down the program performance. One solution is to fill 7 long type variables between x and y as done in the above example to ensure that x and y are not loaded into the same cache line. Java8 also adds a new annotation @Contended (JVM plus the startup parameter -XX:-RestrictContended will take effect), which can also avoid false sharing. 6.2.3. Using Pseudo-Sharing in DisruptorThe Disruptor uses the value field of the Sequence class to indicate the production/consumption progress. You can see that 7 long type variables are filled before and after the field to avoid false sharing. In addition, the array inside the RingBuffer, This technique is also used by SingleProducerSequencer and others. 6.3. Lock-freeWhen producers produce data, they need to enter the queue. When consumers consume data, they need to exit the queue. When entering the queue, you cannot overwrite elements that have not been consumed. When exiting the queue, you cannot read elements that have not been written. Therefore, the Disruptor needs to maintain an entry index (where the producer data is produced, corresponding to the cursor in AbstractSequencer) and a dequeue index (the sequence number with the smallest consumption progress among all consumers). The most complicated operation in Disruptor is the queue operation. Let's take the next(n) method of MultiProducerSequencer (applying for n sequence numbers) as an example to analyze how Disruptor implements lock-free operation. The code is as follows. It determines whether there are enough sequence numbers (free positions). If not, it gives up the CPU usage right and then re-determines. If so, it uses CAS to set the cursor (queue index). 6.4. Support batch consumption definition EventThis is easier to understand. When analyzing the logic of consumption data, it was introduced in the previous article that consumers will obtain the largest available serial number and then consume these messages in batches. 7. The use of Disruptor in i-theme businessMany open source projects use Disruptor, such as Log4j2, log framework, to implement asynchronous logging. Disruptor is also used in HBase, Storm and other projects. Vivo's i-theme business also uses Disruptor. The following is a brief introduction to its two usage scenarios. 7.1. Monitoring data reportingThe business monitoring system is very important to enterprises and can help enterprises discover and solve problems in a timely manner. It can easily detect business indicator data, improve business decisions, and ensure the sustainable development of the business. iTheme uses Disruptor (multiple producers and single consumer) to temporarily store the business indicator data to be reported, and then there are regular tasks to continuously extract data and report to the monitoring platform, as shown in the figure below. 7.2. Statistical analysis of local cache keysiTheme business uses a lot of local cache. In order to count the number of keys (deduplication) in the local cache and the number of keys in each cache mode, consider using Disruptor to temporarily store and consume processing data. Because many places in the business code involve access to local caches, that is, producers are multi-threaded. Considering that consumption processing is relatively simple, and if multi-thread consumption is used, lock synchronization is involved, so consumers adopt a single-threaded mode. The overall process is shown in the figure below. First, add the call to cache access statistics in the cache access tool class. After the cache access data enters RingBuffer, a single-threaded consumer uses HyperLogLog to re-count the number of different keys, and uses regular matching to count the number of keys in each mode. Then, asynchronous tasks obtain statistical results regularly for display. It should be noted that because the RingBuffer queue size is fixed, if the producer produces too fast and the consumer cannot consume it, if the next method is used to apply for the serial number, if there is not enough space left, the producer will block. Therefore, it is recommended to use the tryPublishEvent method to publish data. It internally uses the tryNext method to apply for the serial number. If the method fails to apply for the available serial number, an exception will be thrown. In this way, the producer can perform compatibility processing when it senses, rather than blocking and waiting. 8. Use suggestions
IX. ConclusionThis article first introduces the high-performance lock-free memory queue Disruptor by comparing the built-in thread-safe queues and the features of Disruptor in JDK. Then, the core concepts and basic use of Disruptor are introduced, allowing readers to establish a preliminary understanding of Disruptor. Then, the core implementation and high-performance principles of Disruptor are introduced from the perspective of source code and principles (space pre-allocation, avoiding pseudo-sharing, lock-free, and supporting batch consumption). Secondly, the application of Disruptor in practice is introduced in combination with the i-theme business. Finally, based on the above principles analysis and application practice, some Disruptor best practice strategies are summarized. Reference articles: https://time.geekbang.org/column/article/132477 https://lmax-exchange.github.io/disruptor/ |
<<: Let’s talk about how to customize the appearance of symbol images in SwiftUI
>>: The answers to the Super Code Challenge have been revealed! Which questions are you stuck on?
Introduction: In 2014, the 4G industry developed ...
Peter Drucker, the father of modern management, o...
As the epidemic continues, many companies are not ...
Disheveled, unkempt, wearing a backpack and flip-...
Can I buy things through mini programs? Is it rel...
For SEOers, using some tags in HTML code is more ...
The rise of mobile Internet is inseparable from t...
As self-media and internet celebrities became mor...
Online channels 1. Basics are online The major mo...
Today, Double 11 is no longer just an e-commerce ...
Now many friends have opened accounts on Douyin. ...
Maybe we won’t get a satisfactory result immediat...
About the platform, about pictures, about typeset...
The Communist Youth League of China is a mass org...
Many SEMers have asked a question in the bidding ...