Flink Principles and Implementation: Architecture and Topology Overview

Flink Principles and Implementation: Architecture and Topology Overview

To understand a system, we usually start with the architecture. What we care about is: after the system is successfully deployed, which services are started on each node, and how the services interact and coordinate with each other. Below is the architecture diagram after the Flink cluster is started.

When the Flink cluster is started, a JobManager and one or more TaskManagers are started first. The Client submits tasks to the JobManager, which then dispatches tasks to each TaskManager for execution. The TaskManager then reports heartbeat and statistics to the JobManager. Data is transmitted between TaskManagers in the form of streams. All three are independent JVM processes.

  • Client is the client that submits the job, and can be running on any machine (as long as it is connected to the JobManager environment). After submitting the job, the Client can end the process (Streaming task) or not and wait for the result to be returned.
  • JobManager is mainly responsible for scheduling jobs and coordinating tasks to do checkpoints. Its responsibilities are very similar to Storm's Nimbus. After receiving resources such as jobs and JAR packages from clients, it generates an optimized execution plan and dispatches it to each TaskManager for execution in units of tasks.
  • The number of slots is set when TaskManager is started. Each slot can start a task, which is a thread. It receives the tasks to be deployed from JobManager. After deployment and startup, it establishes a Netty connection with its upstream to receive and process data.

It can be seen that Flink's task scheduling is a multi-threaded model, and different jobs/tasks are mixed in one TaskManager process. Although this method can effectively improve CPU utilization, I personally don't like this design because it not only lacks a resource isolation mechanism, but also is not convenient for debugging. A process model similar to Storm, where only the tasks of the job are run in one JVM, is more reasonable in actual applications.

Job Example

The examples shown in this article are for flink-1.0.x version

We use SocketTextStreamWordCount from the examples package that comes with Flink. This is an example of counting the number of word occurrences from a socket stream.

  • First, start the local server using netcat:

    $ nc -l 9000
    
  • Then submit the Flink program

    $ bin/flink run examples/streaming/SocketTextStreamWordCount.jar \
     --hostname 10.218.130.9 \
     --port 9000
    

Enter a word on the netcat terminal and monitor the output of taskmanager to see the results of the word statistics.

The specific code of SocketTextStreamWordCount is as follows:

public static void main(String[] args) throws Exception{
 // Check input final ParameterTool params = ParameterTool.fromArgs(args);
 ...

 // set up the execution environment
 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 // get input data
 DataStream<String> text =
 env.socketTextStream(params.get("hostname"), params.getInt("port"), '\n', 0);

 DataStream<Tuple2<String, Integer>> counts =
 // split up the lines in pairs (2-tuples) containing: (word,1)
 text.flatMap(new Tokenizer())
 // group by the tuple field "0" and sum up tuple field "1"
 .keyBy(0)
 .sum(1);
 counts.print();
 
 // execute program
 env.execute("WordCount from SocketTextStream Example");
}

We replace the last line of code env.execute with System.out.println(env.getExecutionPlan()); and run the code locally (with the concurrency set to 2). We can get the JSON string of the logical execution plan graph of the topology. Paste the JSON string into http://flink.apache.org/visualizer/ to visualize the execution graph.

However, this is not the final execution graph running in Flink, but a plan graph that represents the relationship between topological nodes, which corresponds to SteramGraph in Flink. In addition, after submitting the topology (with concurrency set to 2), you can also see another execution plan graph in the UI, as shown below, which corresponds to JobGraph in Flink.

Graph

It looks a bit messy, how can there be so many different graphs? In fact, there are more graphs. The execution graph in Flink can be divided into four layers: StreamGraph -> JobGraph -> ExecutionGraph -> Physical execution graph.

  • StreamGraph: It is the initial graph generated based on the code written by the user through the Stream API. It is used to represent the topology of the program.
  • JobGraph: StreamGraph is optimized to generate JobGraph, a data structure submitted to JobManager. The main optimization is to chain multiple qualified nodes together as one node, which can reduce the serialization/deserialization/transmission consumption required for data flow between nodes.
  • ExecutionGraph: The distributed execution graph generated by JobManager based on JobGraph is the core data structure of the scheduling layer.
  • Physical execution graph: After the JobManager schedules the Job according to the ExecutionGraph, the "graph" formed after the Task is deployed on each TaskManager is not a specific data structure.

For example, the evolution of the four-layer execution graph of SocketTextStreamWordCount with 2 concurrency levels (Source has 1 concurrency level) in the above text is shown in the following figure (click to view the enlarged figure):

Here is a brief explanation of some nouns.

  • StreamGraph: The initial graph generated based on the code written by the user through the Stream API.

    • StreamNode: A class used to represent an operator and has all relevant properties, such as concurrency, incoming and outgoing edges, etc.
    • StreamEdge: represents the edge connecting two StreamNodes.
  • JobGraph: StreamGraph is optimized to generate JobGraph, which is the data structure submitted to JobManager.

    • JobVertex: After optimization, multiple StreamNodes that meet the conditions may be chained together to generate a JobVertex, that is, a JobVertex contains one or more operators. The input of JobVertex is JobEdge and the output is IntermediateDataSet.
    • IntermediateDataSet: represents the output of JobVertex, that is, the data set generated by operator processing. The producer is JobVertex and the consumer is JobEdge.
    • JobEdge: represents a data transmission channel in the job graph. The source is IntermediateDataSet and the target is JobVertex. That is, data is transferred from IntermediateDataSet to the target JobVertex through JobEdge.
  • ExecutionGraph: The distributed execution graph generated by JobManager based on JobGraph is the core data structure of the scheduling layer.

    • ExecutionJobVertex: corresponds to JobVertex in JobGraph. Each ExecutionJobVertex has as many ExecutionVertex as the concurrency.
    • ExecutionVertex: represents one of the concurrent subtasks of ExecutionJobVertex, with input being ExecutionEdge and output being IntermediateResultPartition.
    • IntermediateResult: corresponds one-to-one to the IntermediateDataSet in the JobGraph. The number of IntermediateResultPartitions of each IntermediateResult is equal to the concurrency of the operator.
    • IntermediateResultPartition: represents an output partition of ExecutionVertex. The producer is ExecutionVertex and the consumer is several ExecutionEdges.
    • ExecutionEdge: represents the input of ExecutionVertex. The source is IntermediateResultPartition and the target is ExecutionVertex. There can be only one source and one target.
    • Execution: is an attempt to execute an ExecutionVertex. When a failure occurs or data needs to be recalculated, the ExecutionVertex may have multiple ExecutionAttemptIDs. An Execution is uniquely identified by the ExecutionAttemptID. The deployment of tasks and the update of task status between the JM and TM are all determined by the ExecutionAttemptID to determine the message recipient.
  • Physical execution graph: After the JobManager schedules the Job according to the ExecutionGraph, the "graph" formed after the Task is deployed on each TaskManager is not a specific data structure.

    • Task: After the Execution is scheduled, the corresponding Task is started in the assigned TaskManager. Task wraps the operator with the user execution logic.
    • ResultPartition: represents the data generated by a Task and corresponds one-to-one with the IntermediateResultPartition in the ExecutionGraph.
    • ResultSubpartition: is a subpartition of ResultPartition. Each ResultPartition contains multiple ResultSubpartitions, the number of which is determined by the number of downstream consuming Tasks and DistributionPattern.
    • InputGate: represents the input encapsulation of a Task, and corresponds one-to-one to a JobEdge in the JobGraph. Each InputGate consumes one or more ResultPartitions.
    • InputChannel: Each InputGate will contain one or more InputChannels, which correspond one-to-one to the ExecutionEdge in the ExecutionGraph and are also connected one-to-one to the ResultSubpartition, that is, one InputChannel receives the output of one ResultSubpartition.

So why does Flink design these four graphs? What is the purpose? Spark also has multiple graphs, data dependency graphs and physical execution DAGs. Their purpose is the same, which is to decouple. Each graph has its own function. Each graph corresponds to a different stage of the job, making it easier to do things at that stage. We provide a more complete hierarchical diagram of the Flink Graph.

First, we can see that in addition to StreamGraph, there is also OptimizedPlan on JobGraph. OptimizedPlan is converted from Batch API. StreamGraph is converted from Stream API. Why can't API be directly converted to JobGraph? Because the graph structure and optimization methods of Batch and Stream are very different. For example, Batch has a lot of pre-execution analysis to optimize the execution of the graph, and this optimization is not universally applicable to Stream. Therefore, it is more convenient and clear to optimize Batch through OptimizedPlan, and it will not affect Stream. The responsibility of JobGraph is to unify the graphs of Batch and Stream to clearly describe the structure of a topology graph, and to optimize chaining. Chaining is universal for Batch and Stream, so it is done at this layer. The responsibility of ExecutionGraph is to facilitate scheduling and monitoring and tracking of the status of each task, so ExecutionGraph is a parallelized JobGraph. The "physical execution graph" is the tasks that are ultimately distributed and running on various machines. So we can see that this decoupling method greatly facilitates our work at each layer, and each layer is isolated from each other.

<<:  5 steps to prepare for a successful microservices journey

>>:  How to build machine learning models using JavaScript

Recommend

Always have trouble writing good copy? Just look at these 30 tips

Every time I see a good copy , A question will ar...

What to expect at CES 2015

Editor's note: When it comes to smart hardwar...

Which POS machine company is good? How to judge?

When using a POS machine, if the selected machine...

Low cost, high exposure, advertising optimization model!

How to optimize the effectiveness of advertising,...

The most complete salon event planning summary in history!

Introduction: For operations personnel, the work ...

What is traffic hijacking in paid promotion? Why are landing pages hacked?

My friend Lao Wang complained to me this morning,...

Toutiao information flow advertising analysis and delivery skills!

Your familiarity with channels means whether you ...

Monetization tips for educational short video hosts!

A name that seems to be full of clickbait titles....

App Store Market Optimization Tips and Strategies!

As the leader of the third-party application mark...

Some useful UX/UI design tools and download addresses abroad

The design tools you use today may no longer be s...