Large-scale offline reasoning based on Ray

Large-scale offline reasoning based on Ray

Offline reasoning for large models

Features

Offline reasoning for big data

Large model offline reasoning (Batch reasoning) refers to the process of performing distributed computing reasoning on large-scale models with billions to hundreds of billions of parameters. It has the following characteristics:

  1. Reasoning is performed on a batch of data at a time. The amount of data is usually massive, so the calculation process is usually offline.
  2. The execution process of reasoning jobs generally includes both data processing and model reasoning;
  3. The operation scale is usually large, using distributed computing and consuming a lot of computing resources;
  4. Compared with online reasoning, offline reasoning does not have high requirements for latency and mainly focuses on throughput and resource utilization.

Key Challenges

GPU Memory Wall

picture

Key Challenges of Offline Inference for Large Models — GPU Memory Wall

The first challenge is memory. Machine learning models are getting bigger and bigger, especially after the Transformers model, the model size has grown rapidly. As can be seen from the figure above, the model parameters in the field of machine learning have grown very rapidly in the past few years. Compared with the growth of model parameters, the improvement of GPU computing power is relatively slow, forming a larger and larger gap between the two. This brings a problem. When performing inference or training, the GPU memory may not be able to accommodate it, and the model needs to be split.

picture

Model segmentation

There are two common ways to split the model as listed on the left side of the above figure:

  • Pipeline Parallelism mode by layer
  • Tensor Parallelism mode split by weight

Splitting by layer is relatively simple, that is, splitting the different layers of the model into different groups and then putting them on different GPUs. For example, in the upper left picture, there are two GPUs, the first GPU stores L0-L3, and the second GPU stores L4-L7. Because the size of each layer is different, it is not necessarily evenly distributed. Some layers may be very large and occupy a single GPU, while smaller layers may be squeezed onto one GPU.

Splitting by weight means splitting the weights of the same layer of the model and placing them on different GPUs. For example, in the lower left figure, part of the weight A0 of L0 is placed on GPU 0, and the other part of the weight A1 is placed on GPU 1. During the inference process, the final result is obtained through matrix operations. In addition to these two methods, there are also some more complex splitting methods, such as the hybrid method that combines these two methods, or the Zero splitting method.

Model segmentation has the following advantages:

  1. Support for larger models: It can support offline reasoning of larger models based on existing hardware;
  2. Reduce costs: Splitting the existing model and placing it on a card with a smaller video memory can reduce some costs. Then, higher-end cards can be used for training, which consumes more resources.
  3. Space division multiplexing: Currently, space division multiplexing technology is used in many scenarios, such as NVIDIA's Multi-Process Service technology, which divides the GPU memory into different processes according to space, which can improve the utilization of the GPU. However, in this case, each process gets a part of the GPU memory. If it is not divided, it may occupy the entire card. Therefore, after the division, offline reasoning can also be run in this scenario.

Distributed Scheduling

picture

The key challenge of offline reasoning for large models — distributed scheduling

The second challenge is about distributed scheduling. There are two requirements:

The first is the need to support heterogeneous resources. As mentioned earlier, the inference process often involves data processing and inference at the same time. Therefore, data processing is expected to be performed on the CPU so as not to occupy the GPU and use the GPU for inference. Therefore, this requires the framework to support heterogeneous resource scheduling in a relatively friendly manner.

The second point is the need for elastic resource scheduling. The model is divided into different groups after segmentation. During the operation of the job, each group can be understood as a stage. Because the layers of the model contained in each group are different, the computing power requirements of different stages are also different. Moreover, it is difficult to estimate the computing power requirements in advance before running a job, so it is necessary to constantly adjust the parameters to achieve the best execution efficiency. Therefore, we hope that the computing framework can automatically scale the computing power of each stage according to the computing efficiency during operation, so that the stage with fast execution speed can automatically give up some computing power to the slow stage.

The above two requirements cannot be easily met by the current mainstream computing frameworks, such as Flink and Spark, mainly because Spark and Flink are generally bound to a relatively fixed batch/stream computing paradigm and are not flexible enough at the scheduling level.

performance

In terms of performance, since this is an offline computing job, we hope that its throughput and GPU utilization can be as high as possible.

The first point is that data can be easily and efficiently transmitted between stages. The serialization overhead caused by data storage should be avoided as much as possible. Pure memory transmission is a better method.

The second point is that on the inference side, data IO waits should be reduced as much as possible to avoid IO causing GPU idleness and maximize GPU utilization.

The third point is to combine resource elasticity to release GPUs with low utilization, thereby improving sorting utilization.

Case

picture

Case: Vit + Albert

The following is a real case, and also a multimodal example - the Vit + Albert twin tower model. In this case, we split the two models at the same time, one part of a GPU contains Albert's layers, and the other part contains Vit's layers. The embedding layer is usually large, so it is split into a separate group. The job contains a total of 3 stages, and image and text tokens are passed between stages. Therefore, the computing resources required for these 3 stages are different, that is, the ability to flexibly allocate computing power is required.

02

Using Ray to build a large model reasoning framework


About Ray

picture

About Ray

The Ray project was initiated by UC Berkeley's RISElab around 2017. The predecessor of RISElab was the more famous AMP Lab, which was the lab that incubated the Spark engine. After the lab was renamed RISElab, it incubated the Ray engine, which is positioned as a general distributed programming framework - Python-first. In theory, through the Ray engine, users can easily make any Python application distributed, especially machine learning related applications. Currently, one of Ray's main directions is machine learning. The initiator of Berkeley also created a startup company based on Ray - Anyscale. Currently, this project has received more than 20,000 followers on GitHub. In the industry, companies such as Uber, OpenAI, Ant, and ByteDance also have relevant application practices based on Ray.

Ray's architecture is divided into three layers. The bottom layer is various cloud infrastructures, which means that Ray shields the underlying infrastructure for users. After users pull up a Ray Cluster, they can immediately start distributed programming without considering the underlying cloud native or various environments. The middle layer is the Ray Core layer. This layer is the core basic capability provided by Ray, mainly providing a very concise low-level distributed programming API. Based on this set of APIs, users can easily distribute existing Python programs. It is worth noting that the API of this layer is low-level and is not bound to any computing paradigm, which is very general. The top layer is the rich machine learning library made by the Ray community based on the Ray Core layer. The positioning of this layer is to do machine learning pipeline. For example, data processing and reading, model training, hyperparameter optimization, reasoning, reinforcement learning, etc., can all use these libraries directly to complete the entire pipeline, which is also a direction currently focused by the Ray community.

What is more worth mentioning is that according to OpenAI's public information, ChatGPT, which has become popular this year, is based on Ray's ChatGPT training, including pre-training, Fine Tune, and reinforcement learning.

Ray Infrastructure

picture

Ray Infrastructure

The figure above shows the basic architecture of Ray Cluster, where each large box is a node. (The node here is a virtual concept, which can be a physical machine, a VM, or a Linux Docker. For example, on K8s, a node is a Pod.)

  • Head node: It is the scheduling center of Ray Cluster. The core component is GCS, which is responsible for global storage, scheduling, jobs, status, etc. Head node also has an observability dashboard.
  • Worker node: Except for the Head node, all others are Worker nodes, which carry specific workloads.
  • Raylet: Each node has a daemon process Raylet, which is a local scheduler responsible for task scheduling and worker management.
  • Object Store component: Each node has an Object Store component, which is responsible for object transmission between nodes. The Object Store components of each node in the entire Cluster form a global distributed memory. At the same time, on a single node, Object Store reduces copying by sharing memory between multiple processes.
  • Driver: When a user submits a job to Ray Cluster or connects with Notebook, Ray selects a node to run Driver and executes the user code. The Driver is destroyed after the job is completed.
  • Worker: is the carrier of Task and Actor in Ray.

It is worth noting here that in order to provide a simple distributed programming experience, Ray has done a lot of design at the Raylet layer. The implementation process is also relatively complicated. Interested friends can check out the relevant papers.

Ray Distributed Programming

picture

Ray Distributed Programming

The left side of the above picture is the API programming of Ray Core: Class is a Python class. If you want to make it distributed, you only need to add the @ray.remote decorator to the class, then create and call the Actor method, and finally retrieve the value through the ray.get method; because the Counter class is on other remote nodes, we define a Task (Python function) and use Object for distributed data transmission.

On the right is the library programming using Ray, training a simple machine learning model through RayTrain. You need to define a model first, which is the same as defining a model directly in Python. Then use the RayTrain API to fill in some Config and start training.

So we can see that these two methods, one is low-level and the other is high-level, are both recommended for users of Ray.

Building a large model reasoning framework based on Ray


picture

Using Ray to build a large model inference framework – Ray Datasets

Ray Datasets

We chose Ray Datasets to build a large-scale model reasoning framework. Ray Datasets provides a variety of data source access methods, is compatible with commonly used data sources in the field of machine learning, provides commonly used data processing operators, and supports general parallel computing, such as offline Bach reasoning. Another feature is that it can support the Pipeline execution mode, which can divide the data blocks into different windows, greatly accelerating the execution of the entire parallel computing. In short, Ray Datasets is a very practical data processing tool that can help us build a large-scale model reasoning framework more efficiently.

picture

Using Ray to build a large model inference framework v1 — Based on native Ray Dataset Pipeline

Therefore, we try to build a large model reasoning framework based on the native Ray Datasets Pipeline.

The pseudo code on the left describes the corresponding execution process. Assume that the model is divided into two groups by layer - ModelLayers1 and ModelLayers2. Call the Ray Datasets Window API to create a Pipeline, and call Map Message to perform parallel reasoning on the two model groups. The Computer parameter selects Actor, which means that Datasets will start an Actor Pool for the calculation process of each Map Batches later. The third parameter is the number of GPUs required for each calculation Actor. This parameter will directly affect the Actor behind it. It can be seen that even for more advanced libraries such as Datasets, its API can still easily support heterogeneous resources.

Compared with Spark, using Ray can significantly improve execution efficiency, and the advantage becomes more obvious as the scale of the job increases. Specifically, in a simple example, if we only have two GPUs, the model is divided into two groups, and the task goal is to process three data samples. When using Spark, two Executors need to be started to load the parameters of the first model group and process three data samples respectively, and write the data to external storage after processing; then the two Executors will load the parameters of the second model group respectively, and then process the samples separately, and the same processing needs to be performed as in the previous step, and finally the results will be written to external storage. This shows that this process is cumbersome and not very friendly to heterogeneous resources.

When using Ray, you only need to start two Actors, corresponding to the two Executors of Spark. However, these two Actors can load the parameters of the two model groups respectively. The execution process between the two Actors can be pipelined, and the data samples pass through the two Actors in turn. In addition, it is also very convenient to add another Actor on the CPU to read or store data. The framework uses Ray ObjectStore to store intermediate result data. Pure memory storage avoids the serialization overhead and can significantly improve execution efficiency. It can be seen that in such scenarios, using Ray can significantly improve efficiency compared to Spark.

picture

Using Ray to build a large model inference framework v1 — Based on native Ray Dataset Pipeline

The above is the first version of the large model reasoning framework, which can effectively solve the problem of heterogeneous resource scheduling. In theory, its execution efficiency also exceeds that of similar computing engines. However, we found that there are still some problems:

  • Each Window must create and destroy the Actor Pool, but the overhead of pulling up the Actor to load the model each time is too high;
  • When each Actor performs reasoning, the data IO and reasoning process are not parallelized, resulting in low GPU utilization;
  • The Actor Pool lacks flexibility. It wastes resources when the computing power requirements of each Stage are different. Parameters need to be adjusted continuously to solve this problem.
  • Difficulty debugging parameters using the API;
  • Lack of fault tolerance and speculative execution capabilities.

picture

Building a large model inference framework using Ray v2 — Streaming execution semantics in Ray Dataset Pipeline

To solve the above problems, we developed the second version of the inference framework. We added Streaming execution semantics to the internal implementation of Ray Datasets Pipeline. Each stage is connected through the queue. The Ray Object Reference is passed in the queue instead of the actual data. The actual data is on the actor side. This is equivalent to passing pointer arrays instead of actual data between functions when we write programs.

The second version of the reasoning framework is different from the first version. Behind each Stage is a stable Actor Pool, which will not be released after it is created. During the operation, the Stage reads the Object Reference from its Input Queue, and after reading the data, selects an Actor in its own Actor Pool to process the data. Because the Actor Pool is customized, it can achieve elasticity, so that the Actor Pool of the heavily loaded Stage will actively try to apply for more resources to increase its parallelism, while the Actor Pool of the lightly loaded Stage will gradually become idle and eventually release some Actors, thereby giving up resources to the Stage that needs more resources. Of course, this also requires a certain scheduling strategy, that is, how the Stage selects an Actor when distributing data. We are currently using the Most Recently Used strategy, which makes busy Actors busier, so that idle Actors can be easily idle and released. On the Actor side, multi-threading within the Actor is used to achieve parallel IO and reasoning calculations, which improves the utilization of the GPU.

It should be noted that the length of the queue between stages is limited, which can prevent the upstream stage from generating too much data and causing job OOM, which is equivalent to the role of back pressure in stream computing. The pseudo code of the second version is not much different from the first version, so the business does not need to spend a lot of effort to transform it.

picture

Using Ray to build a large model reasoning framework v2 — v2.3 Community collaboration

While developing the second version, I noticed that the Ray open source community was also considering similar issues. The community proposed an official REP, which listed issues that were very similar to our goals, especially in terms of improving GPU utilization and solving the difficulty of configuring Ray Datasets API parameters. The community proposed a new architecture, splitting Operator and Executor under the API layer, adding more flexibility and scalability. Later, we will also work with the community to use our implementation as an Executor under the new architecture.

03

Ray cloud-native deployment practice

picture

Ray Cloud Native Deployment — KubeRay

When deploying Ray, we used the Kuberay project, a complete solution from the open source community. As mentioned earlier, each Ray Cluster consists of a Head node and a Worker node. Each node is a computing resource, which can be a physical machine, Docker, etc., and is a Pod on K8s. When starting a Ray Cluster, use Kuberay's Operator to manage the entire life cycle, including creating and destroying Clusters, etc. Currently, this project is also quite active, and companies such as ByteDance, Microsoft, and Ant have participated in research and development and use.

picture

Ray Cloud Native Deployment — KubeRay

Within ByteDance, users can use Ray through the internal platform, by submitting jobs or using notebooks for interactive programming. The platform operates through the YAML and RESTful APIs provided by Kuberay. Kuberay also supports automatic expansion and horizontal expansion. Ray Cluster is used internally to collect load metrics and decide whether to expand more resources based on the metrics. If necessary, it triggers Kuberay to pull up new pods or delete idle pods.

To sum up, today we discussed offline reasoning for large models and the key challenges involved, and introduced how to use Ray to build a large model reasoning framework. In the future, we will continue to strengthen our cooperation with the community, optimize our platform, and explore more application scenarios on Ray.

<<:  Electron-based Windows version of NT QQ released Beta 15, adding background transparency switch

>>:  Meta's Twitter competitor is now available on the App Store, and can sync Instagram followers and accounts

Recommend

How to achieve explosive growth on WeChat? 1 model + 4 steps!

I have been doing fission activities recently. Al...

Internet advertising promotion planning methods!

What is planning? Planning is to simplify complex...

Orange warning! It’s not too late for those who use sunscreen to know this →

Compiled by: Gong Zixin Not long ago The painful ...

Why doesn’t Apple have iPhone 2 and iPhone 9?

Recently, Apple's autumn new product launch c...

Don’t be confused! 4 career suggestions for novice B-side designers

I have found that many newcomers who join B-side ...

Is there a scientific basis for the saying that shopping makes people happy?!

As soon as Double Eleven arrives, we all become s...

Linnaeus, the father of biology, was a naming maniac and also a dragon slayer.

May 23 is the birthday of Carl Linnaeus, the fath...