Data access technology in artificial intelligence online feature system

Data access technology in artificial intelligence online feature system

1. Online Feature System

In mainstream Internet products, whether it is classic computational advertising, search, recommendation, or vertical field route planning, driver dispatch, and material intelligent design, the strategy system based on artificial intelligence technology has penetrated into all aspects of product functions. Correspondingly, each strategy system is inseparable from a large number of online features to support the accurate response of model algorithms or manual rules to requests, so the feature system has become an important pillar to support online strategy systems. Meituan Dianping's technical blog has previously launched many articles on feature systems, such as "Overview of Data Cleaning and Feature Processing in Machine Learning", which focuses on introducing offline data cleaning and mining methods in the feature production process, and "Business Empowerment Tool Takeaway Feature Archives", which focuses on using different storage engines to solve different feature data query requirements. And "Takeaway Sorting System Feature Production Framework" focuses on introducing the feature production pipeline of feature calculation, data synchronization, and online query.

This article takes Meituan Hotel and Travel Online Feature System as a prototype, focusing on introducing some common technical points in practice from the perspective of online data access to solve the problems faced by online feature systems under high concurrency situations.

1.1 Online feature system framework - integration of production, scheduling and service

The online feature system is an online service that obtains relevant feature data through the system context. Its function can be a simple Key-Value (KV) type storage, providing feature query services online, or radiating to a full set of feature service systems such as general feature production, unified feature scheduling, and real-time feature monitoring. It can be said that a simple and usable feature system can be completed in a few man-days, but in complex business scenarios, making this more convenient, fast, and stable requires a team's long-term accumulation.

The above structure diagram is an overview of the integrated feature system. From bottom to top is the direction of data flow. The functions of each part are as follows:

  • Data source: the raw data used to calculate features. Depending on business needs, the data source may be a distributed file system (such as Hive), a relational database (such as MySQL), a message queue (such as Kafka), etc.
  • Feature production: This part is responsible for reading data from various data sources and providing a computing framework for producing features. The production framework needs to be designed comprehensively according to the type of data source and different computing requirements, so there will be multiple sets of production frameworks.
  • Feature import: This part is responsible for writing the calculated features to online storage for feature services to read. This part mainly focuses on issues such as dependencies between import jobs, speed and consistency of concurrent writing, etc.
  • Feature service: This part is the core functional part of the entire feature system, providing online feature storage and access services and directly serving the upper-level strategy system.

According to the above process, the life cycle of a feature can be abstracted into five steps: read, calculate, write, store, and retrieve. The entire process becomes a whole within the feature system framework, which serves as an integrated solution for feature engineering. This article mainly introduces some common practical experiences around the core functions of feature services, "storage" and "retrieval". The extended parts of the feature system, such as feature production and system framework, will be introduced in detail in subsequent articles.

1.2 The core of the feature system - storage and retrieval

In simple terms, the core function of the feature system can be considered as a large HashMap, which is used to store and quickly extract the feature set of relevant dimensions in each request. However, the actual situation is not as simple as HashMap. Taking the system indicators of our general online feature system (Datahub) as an example, its core functions mainly face challenges in storage and reading:

  1. High concurrency: The policy system is user-oriented, the server peak QPS exceeds 10,000, and the database peak QPS exceeds 1 million (caused by batch requests).
  2. High throughput: Each request may contain thousands of features, and the network IO is high. The average server network egress traffic is 500Mbps, with a peak of 1.5Gbps.
  3. Big data: Although the feature data needed online will not be as large as the offline Hive library, the number of data items will exceed 1 billion and the amount of bytes will reach TB level.
  4. Low latency: In response to user requests, in order to maintain user experience, the interface latency should be as low as possible, and the server-side TP99 indicator needs to be below 10ms.

The above indicators are only for reference of our system. The scale of feature systems of various departments and companies may vary greatly. However, no matter how large a feature system is, its core goal must be: high concurrency, high throughput, big data, and low latency, but they have different priorities. When the optimization direction of the system is multi-objective, we cannot use any method independently to achieve all aspects with limited resources. What is left for us is the most important business demand characteristics and the solutions corresponding to these characteristics.

2. Online Feature Access Technology

This section introduces some commonly used access technologies in online feature systems to enrich our arsenal. The main content is not a detailed system design, but a general technical solution to some common problems. However, as mentioned in the previous section, how to use appropriate technologies according to strategic requirements and formulate corresponding solutions is the core value of every architect.

2.1 Data Stratification

When the total amount of feature data reaches TB level, a single storage medium can hardly support the complete business needs. High-performance online service memory or cache is a drop in the bucket in terms of data volume. Distributed KV storage can provide larger storage space but is not fast enough in some scenarios. There are many open source distributed KV storage or cache solutions, such as Redis/Memcache, HBase, Tair, etc. These open source solutions have a large number of contributors who are constantly working on their functions and performance. This article will not go into more details.

When building an online feature system, we actually need to understand what our feature data looks like. Some data is very hot, and we can cover a large number of requests with minimal memory cost through memory copies or caches. Some data is not hot, but once the access requires a stable and fast response speed, a distributed storage solution based on full memory is a good choice. For data with a very large volume or very fast growth, we need to choose a storage solution with disk backup - and we need to choose storage technology based on different types of read and write distributions.

When the business develops to a certain level, a single feature type will hardly cover all business needs. Therefore, in the selection of storage solutions, data stratification is required according to feature types. After stratification, different storage engines uniformly provide feature data to policy services, which is the best practice to maintain both system performance and functionality.

2.2 Data Compression

Loading a large number of offline features into the online system and transferring them between systems requires considerable overhead in terms of memory, network bandwidth and other resources. Data compression is a typical example of trading time for space, which can often reduce space usage by multiples, which is a great boon for precious online memory and bandwidth resources. The essential idea of ​​data compression is to reduce information redundancy. For the application scenario of feature systems, we have accumulated some practical experience to share with you.

2.2.1 Storage Format

Feature data is simply the feature name and feature value. Taking user portrait as an example, a user has features such as age, gender, hobbies, etc. There are usually the following ways to store such feature data:

  1. JSON format, which fully retains the feature name-feature value pairs and is expressed in the form of a JSON string.
  2. Metadata extraction, like Hive, feature names (metadata) are saved separately, and feature data is represented by a list of feature values ​​in String format.
  3. Metadata solidification also saves metadata separately, but uses strong types to define each feature, such as Integer, Double, etc. instead of a unified String type.

Each of the three formats has its own advantages and disadvantages:

  1. The advantage of the JSON format is that the number of features can be variable. Taking user portraits as an example, user A may have age and gender tags. User B may have place of origin and hobby tags. Different user tags can vary greatly and can be stored conveniently. However, the disadvantage is that each set of features must store the feature name, and when the feature types are highly homogeneous, a lot of redundant information will be included.
  2. The characteristics of metadata extraction are opposite to those of the JSON format. It only retains the feature value itself, and the feature name is stored separately as metadata, which reduces the storage of redundant feature names. However, the disadvantage is that the data format must be homogeneous, and if features need to be added or deleted, the entire data set needs to be refreshed after the metadata is changed.
  3. The advantages of metadata solidification are the same as those of metadata extraction, and it is more space-saving. However, its access process requires the implementation of proprietary serialization, which has both implementation difficulty and read and write speed costs.

In a feature system, a batch of feature data is usually completely homogeneous. At the same time, in order to cope with batch requests under high concurrency, we use metadata extraction as a storage solution in practice, which saves 2 to 10 times the space compared to the JSON format (the specific ratio depends on the length of the feature name, the number of features, and the type of feature value).

2.2.2 Byte Compression

When it comes to data compression, it is easy to think of using lossless byte compression algorithms. The main idea of ​​lossless compression is to represent frequently occurring patterns with shorter bytecodes. Considering that the read and write mode of the online feature system is to write the entire amount once and read one by one multiple times, compression needs to be performed on a single piece of data rather than global compression. The current mainstream short text compression algorithms implemented in Java include Gzip, Snappy, Deflate, LZ4, etc. We conducted two sets of experiments, mainly comparing the above algorithms from three indicators: the average compression speed of a single piece, the average decompression speed of a single piece, and the compression rate.

Dataset: We selected two real online feature datasets, each with 100,000 feature records. The records are in plain text format, with an average length of 300-400 characters (600-800 bytes).

Compression algorithm: The Deflate algorithm has 1 to 9 compression levels. The higher the level, the greater the compression ratio, and the longer the operation takes. The LZ4 algorithm has two compression levels, which we represent with 0 and 1. In addition, LZ4 has different implementation versions: JNI, Java Unsafe, and Java Safe. For detailed differences, refer to https://github.com/lz4/lz4-java. I will not explain too much here.

The millisecond time in the experimental result graph is the compression or decompression time of a single record. The compression ratio is calculated as the bytecode length before compression/bytecode length after compression. It can be seen that the compression/decompression time of all compression algorithms will increase as the compression ratio increases. Among them, the Java Unsafe and Java Safe versions of LZ4 have obvious speed anomalies due to platform compatibility issues.

From the perspective of the usage scenario (full write once, multiple reads one by one), the main service indicators of the feature system are the response time and feature data storage efficiency under high feature concurrency. Therefore, the indicators that feature compression focuses on are actually: fast decompression speed and high compression ratio, and the compression speed is not required. Therefore, based on the performance of each algorithm in the above experiment, Snappy is more suitable for our needs.

2.2.3 Dictionary Compression

The essence of compression is to use commonalities and re-encode without affecting the amount of information to reduce space usage. The byte compression in the previous section is a single-row compression, so it can only be applied to the commonalities in the same record, and cannot take into account global commonalities. For example: Assuming that the feature values ​​of a certain user dimension feature are exactly the same for all users, byte compression one by one cannot save any storage space, but we know that there is actually only one repeated value that appears repeatedly. Even within a single record, due to the limitation of the compression algorithm window size, long patterns are difficult to be taken into account. Therefore, a dictionary statistics of the global feature values, automatic or manual addition of frequent patterns to the dictionary and re-encoding can solve the limitations of short text byte compression.

2.3 Data Synchronization

When a large amount of feature data is required for each request and strategy calculation (for example, requesting thousands of advertiser features at a time), we need very strong online data acquisition capabilities. Among the different methods of storing features, accessing local memory is undoubtedly the solution with the best performance. To access feature data in local memory, we usually have two effective means: memory copy and client cache.

2.3.1 Memory Copy Technology

When the total amount of data is not large, the policy user can completely mirror a copy of the feature data locally. This mirror is called a memory copy. Using a memory copy is exactly the same as using local data, and the user does not need to worry about the existence of a remote data source. The memory copy needs to be synchronized and updated with the data source through certain protocols. This type of synchronization technology is called memory copy technology. In the scenario of an online feature system, the data source can be abstracted as a KV type data set. The memory copy technology needs to completely synchronize such a data set to the memory copy.

Push and Pull – Timeliness and Consistency

Generally speaking, there are two types of data synchronization: Push and Pull. Push technology is relatively simple, relying on the currently common message queue middleware, and can transmit a data change to a memory copy according to demand. However, even if a high-reliability message queue notification without duplication or omission is achieved (usually at a great cost), there is still the problem of batch data synchronization during initialization - so Push can only be used as a means to improve the timeliness of memory copies. In essence, memory copy synchronization still depends on the Pull protocol. A very good feature of the Pull-type synchronization protocol is idempotence, and a failed or successful synchronization will not affect the next new synchronization.

There are many options for Pull protocols. The simplest one is to pull all the data at a time, which is a basic protocol. However, in business needs, data synchronization efficiency is required, so it is important to use some more efficient Pull protocols. In order to reduce the amount of data pulled, these protocols essentially hope to efficiently calculate the most accurate data difference (Diff) and then synchronize these necessary data changes. Here are two Pull-type data synchronization protocols that we have used in engineering practice.

Synchronization based on version number - Replay log (RedoLog) and degradation algorithm

When the data source is updated, for each data change, the version number-based synchronization algorithm will assign a unique incremental version number to this change, and use an update queue to record the data changes corresponding to all version numbers.

When the memory replica initiates a synchronization request, it will carry the latest version number of the replica when it last completed synchronization, which means that all data changes after this version number need to be pulled in. After receiving the request, the data source finds all data changes greater than this version number from the update queue, summarizes the data changes, obtains the final Diff that needs to be updated, and returns it to the initiator. At this time, the memory replica only needs to update these Diff data.

For most business scenarios, the generation of feature data will be integrated into a unified update service, so the incremental version number can be generated serially. If in a distributed data update environment, you need to use a distributed id generator to obtain the incremental version number.

Another problem is the length of the update queue. If no optimization is performed, the update queue is theoretically the longest and may even exceed the size of the data set. One optimization method is to limit the maximum length of the update queue. Once the length exceeds the limit, a merge operation is performed. The merge operation merges the data in the queue in pairs. The larger version number is used as the merged version number. The merged update data set is the union of the two data sets. After the merge, the length of the new queue is reduced to half of the original update queue.

For the update queue after Merge, we can still use the same algorithm to calculate the synchronization Diff: find all data sets with a version number greater than the last update in the queue. It can be seen that due to the merging of version numbers, the calculated Diff is no longer completely accurate update data, and the earliest update data set in the queue may contain some data that has been synchronized - but such degradation does not affect the correctness of synchronization, it only causes a small amount of synchronization redundancy, the amount of redundancy depends on the number of times the earliest data set in the Diff has been merged.

MerkleTree Synchronization - Dataset Comparison Algorithm

The synchronization based on version number uses a similar idea to RedoLog, which records the history of business changes and obtains Diff by replaying the unsynchronized history. Since it takes a lot of overhead to record the growing RedoLog, the Merge strategy is used to degenerate the original log (Log). For batch or micro-batch updates, the synchronization algorithm based on version number can work well; on the contrary, if the data is updated in real time, a large number of RedoLogs will appear and degenerate quickly, affecting the efficiency of synchronization.

The Merkle Tree synchronization algorithm takes a different approach. Simply put, it obtains the Diff by directly comparing the differences between two data sets each time. Let's first look at the simplest algorithm: each time the memory copy sends the hash value of all data to the data source, the data source compares the entire data set, and performs synchronization operations on data with different hash values ​​- in this way, the Diff between the two data sets is accurately calculated. But the obvious problem is that transmitting the hash value of all data each time may not be easier than transmitting a few more data. The Merkle Tree synchronization algorithm uses the Merkle Tree data structure to optimize this comparison process.

In simple terms, Merkle Tree organizes the hash values ​​of all data sets into a tree. The leaf nodes of this tree describe the hash value of one (or a group of) data. The value of the middle node is obtained by hashing the hash values ​​of all its sons again, describing the overall hash of the data contained in the subtree with it as the root. Obviously, without considering hash conflicts, if two Merkle Tree root nodes are the same, it means that these are two completely identical data sets.

The Merkle Tree synchronization protocol is initiated by the replica, which sends the replica root node value to the data source. If it is consistent with the data source root node hash value, there is no data change and the synchronization is completed. Otherwise, the data source will send the hash of all child nodes of the root node to the replica for recursive comparison. For different hash values, continue to obtain until the leaf node, and you can completely determine the data that has changed. Taking the binary tree as an example, all data synchronization is completed after a maximum of LogN interactions.

2.3.2 Client caching technology

When the data is large and cannot be fully stored in the memory, the cold and hot data are clearly separated, and the timeliness of the data is not required, all kinds of businesses usually use client cache. The centralized implementation of client cache is part of the extension of feature services. I will not go into details about the general cache protocol and usage. From the business perspective of the online feature system, here are some thoughts and experiences in several directions.

Interface generalization - separation of cache logic and business

A feature system must meet various business needs, so its interfaces must be rich. From the perspective of data meaning, there are user classes, merchant classes, product classes, etc. From the perspective of data transmission protocols, there are Thrift and HTTP. From the perspective of calling methods, there are synchronous and asynchronous. From the perspective of data organization, there are single value, List, Map, and nested, etc. A good architecture design should separate data processing from business as much as possible, abstract the common parts of each interface, cache the implementation once, and multiple interfaces can benefit from reuse at the same time. The following introduces the generalization of client interfaces using synchronous and asynchronous interfaces as an example.

The synchronous interface has only one step:

  1. Make a request to the server and get the result.

The asynchronous interface is divided into two steps:

  1. Make a request to the server to get a Future instance.
  2. Make a request to the Future instance to get the data.

The only difference between synchronous and asynchronous interface data processing is the order, and you only need to sort out the execution order of each step. After introducing the cache, the data processing flow is compared as follows:

Processing boxes of different colors represent different requests. The asynchronous process requires two requests from the user to obtain data. As shown in the figure, the steps of "update cache with server data" and "merge server data with cache data" are completed in the second request in the asynchronous process, which is different from the synchronous process in which all steps are completed in the first request. The data process is divided into these sub-steps, and synchronization and asynchrony are just a combination of these steps in different orders. Therefore, the two steps of reading and writing cache (search cache, update cache) can be abstracted and decoupled from the rest of the logic.

Data storage — time precedes space, client and server separated

The client is to the server as the server is to the database. In fact, the idea of ​​data storage compression is exactly the same. The specific data compression and storage strategy has been introduced in detail in the data compression section above. Here I mainly want to explain two points:

Client-side compression and server-side compression have different goals due to different application scenarios. The use scenario of server-side compression is one-time high-throughput writing and high-concurrency and low-latency reading one by one. It mainly focuses on the decompression time during reading and the compression ratio during data storage. The client cache belongs to the top part of the data storage layer. Since the reading and writing scenarios are both high-concurrency and low-latency local memory operations, it has high requirements for compression speed, decompression speed, and data size, and it has more trade-offs to make.

Secondly, the client and the server are two completely independent modules. To put it bluntly, although we can write client code, it is not part of the service, but part of the caller's service. The client's data compression should be decoupled from the server as much as possible. The data formats of the two should not be coupled together for the sake of convenience. The data communication format with the server should be understood as an independent protocol, just like the communication between the server and the database. The data communication format has nothing to do with the storage format of the database.

Memory management - the contradiction between cache and generational recycling

The goal of the cache is to keep hot data (frequently accessed data) in memory in order to improve cache hit rate. The goal of JVM garbage collection (GC) is to release memory space for objects that have lost references. The two goals seem similar, but subtle differences make it difficult for the two to coexist in high-concurrency scenarios. Cache elimination will generate a large amount of memory garbage, making Full GC very frequent. This contradiction is not limited to the client, but a common problem faced by all JVM heap caches. Let's take a closer look at a scenario:

As data generated by requests is continuously added to the cache, Young GC occurs frequently when QPS is high, which will continuously cause the memory occupied by the cache to move from the new generation to the old generation. When the cache is full, the Least Recently Used (LRU) algorithm is used to eliminate cold data, which is kicked out of the cache and becomes garbage memory. Unfortunately, due to frequent Young GC, a lot of cold data enters the old generation. Eliminating the cache of the old generation will generate garbage in the old generation, thus triggering Full GC.

It can be seen that it is precisely because the cache elimination mechanism is inconsistent with the GC strategy target of the new generation that cache elimination will generate a lot of memory garbage in the old generation, and the speed of garbage generation has little to do with the cache size, but is related to the GC frequency of the new generation and the elimination speed of the heap cache. Both of these indicators are positively correlated with QPS. Therefore, the heap cache seems to have become a garbage pipeline leading to the old generation. The higher the QPS, the faster the garbage is generated!

Therefore, for high-concurrency cache applications, we should avoid using JVM's banded memory management, or we can say that the overhead and efficiency of the GC memory recovery mechanism cannot meet the memory management requirements under high-concurrency situations. Due to the mandatory memory management restrictions of the JVM virtual machine, we can serialize objects and store them off-heap to bypass JVM memory management, such as third-party technologies such as Ehcache and BigMemory. Or we can modify the underlying implementation of the JVM (similar to Taobao's previous practice) to achieve on-heap storage and avoid GC.

3. Conclusion

This article mainly introduces some technical points of online feature systems. Starting from the system's high concurrency, high throughput, big data, and low latency requirements, and taking some actual feature systems as prototypes, some design ideas for online feature systems are proposed. As mentioned above, the boundaries of feature systems are not limited to data storage and reading. Things like data import job scheduling, real-time features, feature calculation and production, data backup, disaster recovery, etc. can all be considered part of the feature system. This article is the first in a series of articles on online feature systems. Our feature system is also constantly evolving in the face of requirements and challenges, and there will be more practical experience to share with you later. One person's opinion is bound to have omissions and biases, but the stones from other mountains can be used to polish jade. If it can provide some ideas for architects when facing their own business, it will be a great help.

<<:  Application and expansion of MVP model in Ctrip Hotels

>>:  5 steps to prepare for a successful microservices journey

Recommend

Operational methodology for sinking markets

Some people say that the sinking market is really...

How to unleash the power of Moments advertising?

Since joining Tencent Wealth Management a year ag...

How to advertise in Kuaishou short videos and what is the promotion effect?

Opportunities are reserved for those who are prep...

4 Practical Tips for Conference Marketing

As an important part of the marketing process, co...

Apple officially responds to iOS 17 opening up third-party application functions

Before iOS 17 was released, due to the new EU reg...

up to date! Data rankings of 60 information flow advertising platforms!

Today I bring you the latest traffic rankings of ...

Android mobile live broadcast project development overview analysis

1. Description In the past two years, the live br...

How to achieve user experience beyond expectations?

Each product iteration requires that the new user...