In-depth optimization and production practice of Flink engine in Kuaishou

In-depth optimization and production practice of Flink engine in Kuaishou

Abstract: This article is compiled from the speech given by Liu Jiangang, a technical expert from Kuaishou's real-time computing team, at the Flink Forward Asia 2021 production practice session. The main contents include:

  1. The history and current status of Kuaishou Flink
  2. Flink fault tolerance improvement
  3. Flink engine control and practice
  4. Kuaishou batch processing practice
  5. Future plans

01History and Current Status of Kuaishou Flink

Kuaishou began to deeply integrate Flink in 2018. After four years of development, the real-time computing platform has gradually improved and enabled various surrounding components.

  • In 2018, we built a platform for Flink 1.4 and significantly improved our operation and maintenance management capabilities, making it ready for production.
  • In 2019, we started iterative development based on version 1.6, and many businesses began to become real-time. For example, optimizing interval joins brought significant benefits to commercial platforms, and developing real-time multidimensional analysis to accelerate the real-time of large multidimensional reports. This year, our Flink SQL platform was also put into use.
  • In 2020, we upgraded to 1.10 and made many improvements to the sql functions. At the same time, we further optimized Flink's core engine to ensure Flink's ease of use, stability, and maintainability.
  • In 2021, we began to focus on offline computing, support the construction of lake-warehouse integration, and further improve the Flink ecosystem.

The above picture shows Kuaishou's technology stack based on Flink.

  • The core and the bottom layer is Flink's computing engine, which includes stream computing and batch processing. We have done a lot of work on stability and performance.
  • The outer layer is the peripheral components that interact with Flink, including middleware such as Kafka and RocketMQ, data analysis tools such as ClickHouse and Hive, and data lakes such as Hudi. Users can build various applications based on Flink and these components, covering various scenarios such as real-time, near real-time, and batch processing.
  • The outermost layer is the specific usage scenarios, which are commonly used by video-related businesses such as e-commerce and commercialization, and application scenarios include machine learning, multi-dimensional analysis, etc. In addition, many technical departments use Flink to implement data import and conversion, such as CDC and lake warehouse integration.

In terms of application scale, we have 500,000 CPU cores, and we mainly use Yarn and K8s for resource hosting. We run more than 2,000 jobs on them, with a peak processing speed of 600 million per second and 31.7 trillion items processed per day. During holidays or events, the traffic will even double.

02Improved fault tolerance

Fault tolerance mainly includes the following parts:

  • First, single-point recovery supports in-place restart when any number of tasks fail, and long-running jobs can basically continue to run without interruption.
  • Secondly, it is the response to cluster failures, including cold standby, hot standby and the integration of Kafka dual clusters; the last is the use of blacklists.

In order to achieve exactly-once, Flink requires the entire job to be restarted if any node fails. A global restart will cause a long pause, up to more than ten minutes. Some scenarios do not pursue exactly-once, such as real-time scenarios such as recommendations, but they have high requirements for service availability and cannot tolerate job interruptions. There are also scenarios with slow initialization such as model training, and the restart time is particularly long. Once restarted, it will cause a great impact. Based on the above considerations, we developed a single point recovery function.

The figure above shows the basic principle of single-point recovery. As shown in the figure, there are three tasks, and the middle task fails. First, the Flink master node will reschedule the middle task. At this time, the upstream and downstream tasks will not fail, but wait for reconnection. After the middle task is successfully scheduled, the master node will notify the downstream task to reconnect to the upstream task. At the same time, the middle task will also connect to its upstream task and restore data reading by rebuilding the read view. After the upstream and downstream are successfully connected, the job can work normally.

After understanding the basic principles, let's take a look at the case of online multi-task recovery. In the actual environment, multiple tasks often fail at the same time. At this time, we will recover the failed tasks one by one according to the topological order. For example, in the above figure, the order is restored from left to right.

After this feature was launched, nearly 100 jobs within our company used it. Jobs can continue to run under normal failure conditions, and even if there are small traffic fluctuations, the business side can remain unaware of them. The business side has completely bid farewell to the nightmare of service interruption.

Once a cluster failure occurs, it is fatal. All data will be lost and the service will crash. Our solution mainly includes cold standby, hot standby, and dual cluster integration of Flink and Kafka.

Cold standby mainly refers to backing up data. After a cluster fails, computing tasks can be quickly started in another cluster.

As shown in the figure above, KwaiJobManager is Kuaishou's job management service, in which the failover coordinator is mainly responsible for fault handling. We will save all jar packages and other files in HDFS, and all information in MySQL, both of which are highly available. The job runs in the main cluster ClusterA, and the incremental snapshots are used online, which will cause file dependency problems, so we regularly make savepoints and copy them to the backup cluster. In order to avoid too many files, we set up a scheduled deletion of historical snapshots.

Once the service detects a failure in cluster A, it will immediately start the job in cluster B and restore from the most recent snapshot to ensure that the state is not lost. For users, they only need to set up the primary and backup clusters, and the platform will take care of the rest. Users are unaware of the failure throughout the process.

Hot standby means that two clusters run the same task at the same time. Our hot standby is full-link, Kafka or ClickHouse are all running in dual. The top display layer will only use one copy of the result data for display. Once a failure occurs, the display layer will immediately switch to the other copy of the data. The switching process takes less than one second, and the user is unaware of it.

Compared with cold standby, hot standby requires the same amount of resources to back up and run, but the switching speed is faster, which is more suitable for scenarios with extremely high requirements such as the Spring Festival Gala.

The dual-cluster integration of Flink and Kafka is mainly because Kuaishou's Kafka has dual-cluster capabilities, so Flink is required to support reading and writing dual-cluster Kafka topics, so that when a Kafka cluster crashes, Flink can switch online seamlessly. As shown in the figure above, we have abstracted the dual-cluster Kafka in Flink. A logical topic corresponds to two physical topics at the bottom layer, which are composed of multiple partitions. Flink consumption of logical topics is equivalent to reading data from the two underlying physical topics at the same time.

We abstract all changes in clusters into expansion and contraction of partitions. For example, cluster failure can be seen as logical topic partition contraction; switching from a single cluster to two clusters can be seen as logical topic expansion; topic migration can be seen as logical topic expansion and then contraction. Here we use two clusters as examples. In fact, whether it is two clusters or more clusters, the principle is the same, and we provide support.

The blacklist function is required in the following two situations. The first is that the faulty machine is repeatedly scheduled, resulting in frequent job failures. The other is that the machine is stuck in some Flink nodes but not failed due to hardware or network reasons.

For the first case, we developed a threshold blacklist. If a job fails on the same machine or fails to deploy the threshold multiple times, it will be blacklisted if it exceeds the configured threshold. For the second case, we established an exception classification mechanism to directly remove containers and blacklist machines for network and disk jams. In addition, we also exposed the blacklist interface to the outside world, opened up external systems such as Yarn, and realized real-time blacklisting. We also took the Flink blacklist as an opportunity to establish a complete hardware exception handling process, realize automatic job migration, and fully automated operation and maintenance without user perception.

03Flink Engine Control and Practice

3.1 Flink Real-time Control

For long-running real-time jobs, users often need to make changes such as adjusting parameters to change behavior. There are also some system operations such as downgrading jobs and modifying log levels. These changes require restarting the job to take effect, which sometimes takes several minutes to dozens of minutes. In some important situations, this is intolerable. For example, during an activity or at a critical point in troubleshooting, if the job is stopped, the work will be in vain. Therefore, we need to adjust the behavior of the job in real time without stopping the job, that is, real-time control.

From a broader perspective, Flink is not only a computing task, but also a long-running service. Our real-time control is based on this consideration to provide an interactive control mode for real-time computing. As shown in the figure above, users interact with the Flink dispatcher through the classic kv data type. After receiving the message, Flink will first persist them to zk for failover, and then perform corresponding control based on the specific message, such as controlling the resource manager, controlling the job master or other components.

We support user-defined dynamic parameters and provide users with a lot of ready-made system controls. User customization mainly uses RichFunction to obtain dynamic parameters and implement corresponding logic, so that parameters can be passed in real time when the job is running to achieve real-time control effects.

The real-time control capabilities provided by the system mainly include data source rate limit, sampling, resetting Kafka offset, adjusting snapshot parameters, and operation and maintenance related functions such as changing log level and blacklisting nodes. In addition, we also support dynamic modification of some Flink native configurations.

Kuaishou has commercialized the real-time control function internally, making it very convenient for users to use.

3.2 Source control capabilities

When Flink cannot keep up with historical tasks or job performance, the following problems may occur:

First, the inconsistent processing speeds of various concurrent sources will further aggravate data disorder, loss, slow alignment and other issues. Second, snapshots will continue to grow, seriously affecting job performance. In addition, traffic resources are uncontrollable, which will cause stability issues such as CPU saturation and OOM under high load.

Since Flink is a pipeline real-time computing, starting from the data source can fundamentally solve the problem.

First, let's look at the historical data accurate playback function. The above figure shows that Kafka's historical data is consumed at twice the rate. After the Flink job catches up with the lag, it can be switched to real-time consumption. This method can effectively solve the stability problem of complex tasks.

The formula in the preceding figure is a basic principle: consumption ratio = Kafka time difference / Flink system time difference. Users only need to configure the ratio when using it.

Another capability is QPS speed limit. When the data flow is large, Flink will be overloaded and the job will be unstable. Based on the token bucket algorithm, we have implemented a distributed speed limit strategy to effectively reduce the pressure on Flink. After using QPS speed limit, the job becomes very healthy, as can be seen in the green part of the figure above. We used this technology to ensure flexible availability for the 2019 Spring Festival Gala screen.

In addition, we also support automatic adaptation of partition changes and real-time control, and users can adjust the QPS of the job anytime and anywhere.

The last function is data source alignment, which mainly refers to the alignment of watermarks. First, each subtask will regularly report its own watermark progress to the master node, mainly including the size and speed of the watermark. The master node will calculate the target of the next cycle, that is, the expected maximum watermark, plus a diff and return it to each node. Each source task will ensure that the watermark of the next cycle does not exceed the set target. At the bottom of the above figure is the target calculation formula, which predicts the watermark value of each task at the end of the next cycle, plus the maxdiff we allow and then takes the maximum value. In this way, the progress of each source can be guaranteed to be consistent, avoiding stability problems caused by excessive diff.

3.3 Job Balanced Scheduling

Resource imbalance often occurs in production environments. For example, the uneven distribution of Flink tasks leads to uneven resource usage of task managers, and the performance of jobs is often limited by the busiest nodes. To address this problem, we have developed a strategy for balanced job scheduling. The second problem is uneven CPU usage, where some machines are fully utilized while others are idle. To address this problem, we have developed a CPU balanced scheduling function.

In the figure above, there are three jobVertexes connected by hash shuffle. The middle part of the figure shows Flink's scheduling. Each jobVertex schedules tasks to the slot from top to bottom. As a result, the first two slots are full while the other slots are idle, and the first task manager is full while the second task manager is idle. This is a typical scenario of resource skewness, and we have optimized it. When scheduling, we first calculate the total resources required, that is, how many task managers are required, and then calculate the number of slots allocated to each TM to ensure that the slot resources in the TM are balanced. Finally, we evenly distribute tasks to each slot to ensure that the tasks in the slot are balanced.

There is another kind of tilt in the actual operation process - CPU tilt. Let's see how to solve this problem. On the left side of the above figure, the user applied for one core but actually only used 0.5 cores, and also applied for one core but actually used one core. According to the default scheduling policy, a large number of such cases may cause some machines to have high CPU utilization and others to be idle. Machines with high loads will have poor performance and stability. So how to minimize the difference between application and use?

Our solution is to accurately profile the job resources. The specific approach is divided into the following steps: during the job running, the CPU usage of each task's container is counted, and then the mapping from task to executionSlotSharingGroup and then to container is established. In this way, the CPU usage of each task's slot is known, and then the job is restarted according to the mapping relationship, and the corresponding resources are applied according to the historical CPU usage of the task's slot. Generally, some buffer will be reserved. As shown in the right figure above, if the prediction is accurate enough, the resources used by the task manager will remain unchanged after the restart, but the application value will be reduced, and the diff between the two will be smaller.

In fact, some advanced systems in the industry, such as borg, support dynamic modification of application values, but our underlying scheduling resources do not support this strategy, so we can only use resource profiling at the Flink layer to solve this problem. Of course, resource profiling cannot guarantee 100% accuracy. We also have other strategies, such as limiting machines with high CPU loads to continue to allocate resources to minimize imbalance. In addition, we have established a hierarchical guarantee system, with different cgroup restrictions for jobs of different priorities. For example, low-priority jobs are no longer over-allocated, and high-priority jobs are allowed to have a small amount of over-allocation, thereby avoiding imbalance caused by excessive CPU usage.

04Kuaishou Batch Processing Practice

The above picture is our batch processing architecture diagram. The bottom layer is the offline cluster, the middle is the Flink engine and Flink's data stream API, SQL API, and above are some platform aspects such as SQL entry, timing scheduling platform, etc. In addition, there are some stream-batch integration explorations, and at the top are various users such as video and commercialization.

In stream-batch integration, the characteristic of stream is low latency, and the characteristic of batch is high throughput. For stream-batch integration, we expect the system to be able to process unfield batch data and adjust the shuffle size of data blocks to balance the throughput and latency of the job.

Kuaishou has conducted a lot of exploration on stream-batch integration. We have established a unified Schema standard for storing data, including stream tables and batch tables. Users can use the same code to process stream tables and batch tables, but with different configurations. The results generated also need to comply with the unified Schema standard, so that the upstream and downstream can be connected and as much logic reuse as possible can be achieved. Schema unification is part of our Kuaishou data governance, and scenarios such as lake warehouse integration also have this demand.

The application scenarios mainly include the following aspects:

  • Indicator calculations, such as real-time indicators and report calculations.
  • Data backtracking: using existing offline data to regenerate other indicators.
  • Data warehouse acceleration mainly refers to the real-time acceleration of data warehouses and data lakes.

The benefits of stream and batch integration are multifaceted. First, it reduces development and operation and maintenance costs, enables as much code logic reuse as possible, and operation and maintenance no longer requires the maintenance of multiple systems. Second, the caliber of real-time processing and batch processing is consistent, ensuring the consistency of the final results. Finally, there are benefits in terms of resources. Some scenarios only require one real-time system.

We have optimized the scheduling. As shown in the figure above, in the three tasks, a and c have been completed, and b is still running. At this time, a fails. According to the default strategy, ABC needs to be re-run, even if c has been completed. In actual scenarios, a large number of c will be recalculated, resulting in huge resource loss. For this situation, we have enabled the following strategy by default: If the result of a is deterministic (in fact, most batch outputs are deterministic), c can be stopped from being recalculated, and only a and b need to be calculated.

The above picture shows our internal optimization and improvement of batch processing.

The first is the shuffle service, which now has both internal integration and is also trying out the community version, mainly to achieve decoupling of storage and computing, while improving the performance of shuffle. The second is dynamic resource scheduling, which mainly automatically determines the concurrency of operators based on the amount of data to avoid repeated manual adjustments. The third is slow node avoidance, also known as speculative execution, which is mainly to reduce the long-tail effect and reduce the total execution time. The fourth is hive optimization, such as UDF adaptation and syntax compatibility. In addition, for partition generation split, we have added caching, multi-threaded generation and other methods, which greatly reduces the time for sharding. Finally, there is support for some compression methods, such as gzip, zstd, etc.

05Future Planning

Our future plans are mainly divided into the following aspects:

  • The first is real-time computing, which further enhances Flink's performance, stability, and applicability, and accelerates various business scenarios through real-time computing.
  • The second is the unification of online and offline, including real-time, near real-time and batch processing. We expect to use Flink to unify Kuaishou's data synchronization, conversion and offline computing, so that all scenarios such as ETL, data warehouse, and data lake processing can use a set of Flink computing systems.
  • The last one is elastic scalability, which is mainly related to cloud native, including offline co-location and elastic scaling of jobs. ​

<<:  The tombstone mechanism that keeps iOS running smoothly is now available on Android!

>>:  An interface was launched in 4 hours, and the practice of Ctrip's efficient and unified hotel data service platform was realized

Recommend

How did the little cricket change from a "cricket" to a "food"?

Crickets, this tiny insect, have been closely con...

Xiaohongshu promotion method: How to promote effectively?

When you have a fan "offline", and he s...

Peak has passed, consumer expectations for wearable devices plummet

[[146121]] Interest in the Apple Watch and other ...

The May Day brand traffic grab

As the May Day holiday approaches, various indust...

Internet in 2016: From traffic to productivity

Faced with the "abdication of the gods"...

Qingming Festival is coming soon, come and have a taste of spring!

Mixed Knowledge Specially designed to cure confus...

Analysis of "HEYTEA" growth strategy!

Growth Effect Store size: At the end of 2018, the...