To understand a system, we usually start with the architecture. What we care about is: after the system is successfully deployed, which services are started on each node, and how the services interact and coordinate with each other. Below is the architecture diagram after the Flink cluster is started. When the Flink cluster is started, a JobManager and one or more TaskManagers are started first. The Client submits tasks to the JobManager, which then dispatches tasks to each TaskManager for execution. The TaskManager then reports heartbeat and statistics to the JobManager. Data is transmitted between TaskManagers in the form of streams. All three are independent JVM processes.
It can be seen that Flink's task scheduling is a multi-threaded model, and different jobs/tasks are mixed in one TaskManager process. Although this method can effectively improve CPU utilization, I personally don't like this design because it not only lacks a resource isolation mechanism, but also is not convenient for debugging. A process model similar to Storm, where only the tasks of the job are run in one JVM, is more reasonable in actual applications. Job ExampleThe examples shown in this article are for flink-1.0.x version We use SocketTextStreamWordCount from the examples package that comes with Flink. This is an example of counting the number of word occurrences from a socket stream.
Enter a word on the netcat terminal and monitor the output of taskmanager to see the results of the word statistics. The specific code of SocketTextStreamWordCount is as follows: public static void main(String[] args) throws Exception{ // Check input final ParameterTool params = ParameterTool.fromArgs(args); ... // set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data DataStream<String> text = env.socketTextStream(params.get("hostname"), params.getInt("port"), '\n', 0); DataStream<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .keyBy(0) .sum(1); counts.print(); // execute program env.execute("WordCount from SocketTextStream Example"); } We replace the last line of code env.execute with System.out.println(env.getExecutionPlan()); and run the code locally (with the concurrency set to 2). We can get the JSON string of the logical execution plan graph of the topology. Paste the JSON string into http://flink.apache.org/visualizer/ to visualize the execution graph. However, this is not the final execution graph running in Flink, but a plan graph that represents the relationship between topological nodes, which corresponds to SteramGraph in Flink. In addition, after submitting the topology (with concurrency set to 2), you can also see another execution plan graph in the UI, as shown below, which corresponds to JobGraph in Flink. GraphIt looks a bit messy, how can there be so many different graphs? In fact, there are more graphs. The execution graph in Flink can be divided into four layers: StreamGraph -> JobGraph -> ExecutionGraph -> Physical execution graph.
For example, the evolution of the four-layer execution graph of SocketTextStreamWordCount with 2 concurrency levels (Source has 1 concurrency level) in the above text is shown in the following figure (click to view the enlarged figure): Here is a brief explanation of some nouns.
So why does Flink design these four graphs? What is the purpose? Spark also has multiple graphs, data dependency graphs and physical execution DAGs. Their purpose is the same, which is to decouple. Each graph has its own function. Each graph corresponds to a different stage of the job, making it easier to do things at that stage. We provide a more complete hierarchical diagram of the Flink Graph. First, we can see that in addition to StreamGraph, there is also OptimizedPlan on JobGraph. OptimizedPlan is converted from Batch API. StreamGraph is converted from Stream API. Why can't API be directly converted to JobGraph? Because the graph structure and optimization methods of Batch and Stream are very different. For example, Batch has a lot of pre-execution analysis to optimize the execution of the graph, and this optimization is not universally applicable to Stream. Therefore, it is more convenient and clear to optimize Batch through OptimizedPlan, and it will not affect Stream. The responsibility of JobGraph is to unify the graphs of Batch and Stream to clearly describe the structure of a topology graph, and to optimize chaining. Chaining is universal for Batch and Stream, so it is done at this layer. The responsibility of ExecutionGraph is to facilitate scheduling and monitoring and tracking of the status of each task, so ExecutionGraph is a parallelized JobGraph. The "physical execution graph" is the tasks that are ultimately distributed and running on various machines. So we can see that this decoupling method greatly facilitates our work at each layer, and each layer is isolated from each other. |
<<: 5 steps to prepare for a successful microservices journey
>>: How to build machine learning models using JavaScript
Every time I see a good copy , A question will ar...
Editor's note: When it comes to smart hardwar...
When using a POS machine, if the selected machine...
How to optimize the effectiveness of advertising,...
Introduction: For operations personnel, the work ...
My friend Lao Wang complained to me this morning,...
Your familiarity with channels means whether you ...
via:appying If overseas games are to be released ...
On July 12, at Huawei's "2018 Sustainabi...
Recently, the battle between Didi and Uber is get...
A name that seems to be full of clickbait titles....
As the leader of the third-party application mark...
The design tools you use today may no longer be s...
In 2021, China's car ownership will reach 350...
About 1 million restaurant workers in the United ...