RxJava is a very popular framework at present. Its convenient thread switching has always been talked about by people. This article will give an in-depth understanding of RxJava's thread model from the perspective of source code. (Note: Many codes in this article are not the original RxJava source code, but pseudocode used to illustrate the logic) Getting Started Switching threads in RxJava is very simple. For example, the most common asynchronous thread processing and the main thread callback model can be handled elegantly with the following code:
As mentioned above, subscribeOn(Schedulers.io()) ensures that the doExpensiveWork function occurs in the io thread, and observeOn(AndroidSchedulers.mainThread()) ensures that the subscribe callback occurs in the Android main thread. So, this naturally leads to the key point of this article, what is the difference between subscribeOn and observeOn? Process Analysis To answer the above question, we first need to have a general understanding of the RxJava process. From the generation of an Observable to the final execution of subscribe, it can go through n transformations in the middle. Each transformation will generate a new Observable, just like the torch relay at the opening of the Olympics. Each time the torch is passed to the next person, the last torchbearer will light the Olympic flame, that is, the last Observable will execute the subscribe operation. Therefore, there must be a connection between each Observable. This relationship is reflected in the code that each transformed Observable will hold a reference to the OnSubscribe object in the previous Observable (the parameter required by the Observable.create function). The key code in the subscribe function of the final Observable is this sentence:
This observable is the last transformed observable, so who is this onSubscribe object? If an observable is not transformed and subscribe is executed directly, of course it is the onSubscribe we passed in create, but if it is transformed by map, reduce, etc., this onSubscribe should obviously be the parameter passed in to create the transformed observable. Most transformations are ultimately handed over to the lift function:
Therefore, the onSubscribe object mentioned above should be an instance of OnSubscribeLift, and the two parameters received by OnSubscribeLift are the OnSubscribe object in the previous Observable mentioned above, and operator is an abstract interface for each transformation. Let's look at the call method of the OnSubscribeLift object:
operator and parent are the two parameters mentioned above. As you can see, the operator interface has a call method that receives a Subscriber and returns a new Subscriber object. The following parent.call(st) is to call back the onSubscribe call method of the previous observable, and so on until the last onSubscribe is called. In this way, we first sorted out a line, that is, after the last observable is subscribed, the order of OnSubscribe calls is from back to front. This brings up another question. From the code above, we can see that the operator.call(o) method has been executed before parent.call(st). If the transformation operation is executed in the call method, then it seems that the transformation will also be passed from back to front? So this operator.call method is definitely not as simple as we think. Here we take the map operator as an example and look at the source code:
As expected, no transformation operation is performed here, but a MapSubscriber object is generated. Here we need to pay attention to the two parameters of the MapSubscriber constructor. Transformer is the Func1 object that actually performs the transformation. This is easy to understand. So which one is the Subscriber for o? What does it mean? For example: o1 -> o2 -> subscribe(Subscriber s0); o1 is transformed into o2 after the map operation. o2 executes the subscribe operation. If you understand the above, you can know that the execution order of this process is that s0 will be passed to o2 first, and the lift operation of o2 will convert s0 into s1 and pass it to o1. Then in the call(final Subscriber<? super R> s) method that generates the map operation of o2, who is s worth? Is it s0 or s1? The answer should be s0, which is its next-level Subscriber. The reason is very simple. The MapSubscriber object parent returned in the call method is s1. So, let's take a look at what MapSubscriber's onNext method does?
It's pretty clear, first the transformation is performed, then the onNext function of the next level is called back. So far, we have a general understanding of the entire process of an observable from the initialization to the transformation and then to the subscription. Simply put, when o1 is transformed into o2 through map, it can be understood that o2 has hooked o1 and will go through two processes. First, the call process of the onSubscribe object will flow from o2 to o1, which we call process a. After reaching o1, o1 will start the Subscriber's onNext series of processes, which we call process b. Process b is the process that actually performs the transformation, and its direction is from o1 to o2. Understanding this, we can further understand the thread model in RxJava. Tip: You must have a deep understanding of the difference between process A and process B. This is crucial to understanding thread switching below. Switching mode RxJava's abstraction for the thread model is Scheduler, which is an abstract class containing one abstract method:
What is this Worker? It is actually an abstract inner class of Scheduler, which mainly contains two abstract methods:
It can be seen that Worker is the main force of thread execution. One of the two methods is used to execute tasks immediately, and the other is used to execute delayed tasks. Scheduler is the factory of Worker, which is used to provide Worker to the outside world. There are two common ways to switch threads in RxJava, namely subscribeOn transformation and observeOn transformation, both of which receive the same Scheduler as a parameter. Next, let's compare the differences between the two from the source code level. subscribeOn First look at the subscribeOn part
Create a new Observable, the parameter passed in is OperatorSubscribeOn, obviously this should be an implementation of OnSubscribe, pay attention to the call implementation method of this OperatorSubscribeOn:
Here is the key point. We mentioned process a and process b above. First of all, let’s make it clear that the execution time of this call method is process a, which means that this call occurs before process b. In the call method, the Worker - inner object is first created through the external scheduler, and then a piece of code is executed in the inner. Magically, the call method code in Action0 is executed in the worker thread, which means that the process is switched at this moment. Note the last sentence of code source.unsafeSubscribe(s). source represents the creation of the OperatorSubscribeOn object, which is the previous Observable passed in. The source code of this sentence is as follows:
This is similar to the parent.call(st) function in the call method of the OnSubscribeLift object in the lift method mentioned above, which is to associate the current Observable with the previous Observable through onSubscribe. So far, we can roughly understand the principle of subscribeOn. It will switch threads in process a. However, since process a is actually the code of the serial relationship between Observables, and it flows from the later Observable to the previous Observable, this implies that for process b, the earliest subscribeOn will block the subsequent subscribeOn! For example:
In this code, both the doExpensiveWork function and the doAction function will be triggered in the io thread. observeOn Understanding subscribeOn will make it easier to understand observeOn. The observeOn function will eventually be converted to this function:
Obviously, this is a lift operation. We need to pay attention to the Operator OperatorObserveOn and check its call method:
What is returned here is an ObserveOnSubscriber object. Let's focus on the onNext function of this Subscriber.
It simply executes the schedule function. Let's take a look at this schedule:
What is the recursiveScheduler.schedule that comes in here? It is not magical. It is the worker created by the scheduler passed in by the ObserveOnSubscriber constructor:
Therefore, magic is generated again, and observeOn switches threads in its onNext. When is this onNext executed? As we can see from the above, it is in process b. Therefore, observeOn will affect the subsequent processes until the next observeOn occurs or ends. Peripheral skills Choice of threading model RxJava has several built-in thread models for us, the main differences are as follows:
Executed in the current thread. Unlike immediate, it does not execute immediately, but stores it in a queue and waits for other tasks in the current Scheduler to be executed after they are completed. This is not often used in our daily lives. It mainly serves special transformation operations such as repeat and retry.
Scheduler.Worker steals the show In fact, the Worker in RxJava can be completely extracted for my use, such as the following writing method, which is to open a new thread to execute an action.
Of course, you have to choose the right time to close (unsubscribe) the worker to free up resources. Operators with their own halo Some operators have default thread models, for example, repeat and retry mentioned above will be executed in the trampoline thread model by default, and buffer, debounce and the like will switch to computation by default. I won't go into details here, but just remember when you encounter certain problems that some characters come with their own equipment and auras. Summarize The most important thing to understand about RxJava's thread model is to distinguish it from our usual understanding of asynchrony:
This is the code we often wrote before. Usually, we only distinguish between UI threads and non-UI threads. After the doAsync function starts, the program is split. One thread is executing a doAsync, and the other thread is executing code b. RxJava takes a different approach and abstracts the entire thread. The processing order of RxJava is like a stream. This is not only reflected in the fact that the code is written like a chain, but also in logic. For Observable itself, changing the thread only changes the track of the stream, not splitting it. In Android, non-UI threads often process data, and UI threads display data, which is just a way to change this stream. As far as I understand, it is not appropriate to understand RxJava's thread switching as asynchronous, non-asynchronous, blocking, or non-blocking. For now, it can only be understood as transformation. So amazing! |
<<: What are the correct steps to develop an app?
>>: Aiti Tribe Stories (9): A Female Programmer's Development Dream
Nibiru H1 is fashionable and cost-effective, and ...
According to Streaming Media Network: On July 11 ...
Recently, a netizen complained to Clippings that ...
In 2016, Qihoo and Dasheng were gradually no long...
Are men richer or women richer? This is a very no...
appendix: 1. A little-known fact about body shape...
Review expert: Peng Guoqiu, deputy chief physicia...
About 40 years ago, unconventional superconductor...
On July 26 and 27, local time, at the 46th UNESCO...
The topic about Kuaibo and Wang Xin is very sensi...
This article mainly analyzes how to conduct membe...
The Economist published an article titled "T...
In recent days, I believe that many of our "...
1. Core factors affecting conversion rate 1. Flow...
This article is an original article from Science ...