Spring Festival Event- Technical Solution for Peak Reward Distribution

Spring Festival Event- Technical Solution for Peak Reward Distribution

Author: Zhang Jian

1. Background

The 2022 Spring Festival event was launched on 8 ByteDance apps, including many gameplays such as red envelope rain, New Year card collection, and fireworks display. Red envelope rain, card collection and fireworks display all have high peak burst traffic. Among them, the red envelope rain event will give hundreds of millions of cash rewards to tens of millions or even hundreds of millions of users within 10 minutes, and most requests are concentrated in the first 3 minutes. At the start of the project, the red envelope rain event was the largest source of traffic, and the estimated peak traffic for sending red envelopes was 1.8 million QPS.

In order to ensure user experience, activity effects and fund security, the Red Envelope Rain system needs to ensure ultra-high stability. The system design cannot rely heavily on any external system. In extreme cases, only the Red Envelope Rain service needs to be available, and user requests can be processed normally and the results returned. As a downstream service of the Red Envelope System, the Reward System is responsible for the entry of user rewards. It needs to carry a maximum of 1.8 million QPS of reward issuance requests, and ensure that the user experience is intact in the event of abnormal situations, and the rewards can be finally entered into the account, so as to ensure that there is no over-issuance or under-issuance.

2. Technical Challenges

2.1 High peak flow

There will be 7 red envelope rains on New Year's Eve, one every hour from 12:00, and the card collection and fireworks show will start at 19:30. Around 20:00 that night, the award traffic of the red envelope rain, card collection and fireworks show will be superimposed, which may generate more than 2 million QPS of award traffic. The downstream asset middle office service only provides 300,000 QPS of cash red envelopes and 400,000 QPS of coupons. The reward system needs to reduce peak flow and asynchronously record rewards to ensure that downstream services are not overloaded.

2.2 Many types of rewards

In addition to cash red envelopes, more than 10 coupons, physical rewards, avatar pendants, etc. will be issued in the card collection and fireworks conference scenes. Different coupons are issued by different downstream systems, and the throughput capacity of each system is different. Some systems can only provide 2000 TPS processing capacity. When the reward system is performing peak shaving and flow control, the flow control thresholds of different reward types need to be personalized according to the throughput capacity of the downstream system. When the downstream system capacity is limited, it is necessary to ensure that cash is entered first.

2.3 High system reliability

After introducing the message queue for asynchronous reward distribution, it is necessary to ensure the reliable delivery and reliable consumption of reward events as much as possible. Any reward must be recorded in the account eventually, and the stability and disaster recovery of the message queue cluster must also be taken into account.

In the event of a disaster with internal services or when reward events are accumulated in the message queue, users need to be unaware of this, and can see the reward flow on the activity wallet page and withdraw cash at any time. In addition to recording through consumption reward events, it is also necessary to introduce the ability to trigger forced recording by user withdrawal behavior. At the same time, it must be safe and reliable, and cannot be attacked by black industries and cause financial losses.

3. Technical solution

Due to the high peak traffic and high stability requirements of the Spring Festival activities, in order to ensure the stability and reliability of the reward system under high peak traffic, the overall solution based on message queue peak shaving and asynchronous request processing was selected when selecting the technical solution. The general process of reward distribution is as follows:

On the production side of reward events, in order to minimize the development costs of upstream access parties, the reward system provides a reward SDK based on the characteristics of different access scenarios, and defines a simple and clear reward distribution interface for access parties to choose from. The reliable delivery of reward events is guaranteed by the SDK. The reward event MQ uses two message queues in the company, ByteMQ and RocketMQ, to prevent the entire system from being unavailable due to the downtime of a single message queue cluster.

On the reward event consumption side, a consumer service is created for each Topic, and the four consumers have the same functions. The consumer service guarantees reliable message consumption and consumption rate limit.

In addition to incentive coins, other types of rewards are issued through the asset middle office service calling various downstream. During the Spring Festival event, the asset middle office does not yet support the peak shaving of reward requests, which needs to be done in the front of the reward system. In terms of business, the same order number can only issue one type of reward once. Due to the data isolation between the asset middle office and the incentive middle office system, the reward system needs to support the idempotence of issuing a single order number across services.

3.1 Reward SDK Design

The SDK runs in the access party service in an "embedded" code manner, which can avoid the latency and performance consumption caused by RPC-based network transmission, request data serialization, and return data deserialization. Although the overall latency and performance of the SDK are better than the RPC method, there are still very high requirements for the stability, performance consumption, and interface response latency of the SDK itself. Taking the red envelope rain scenario as an example, the awarding interface needs to return within 50ms. If the response time exceeds 50ms, it will increase the processing time of the entire activity gameplay interface, affect the throughput of the red envelope rain service, and ultimately affect the user experience of participating in the Spring Festival event.

The reward SDK realizes the generation and storage of reward tokens and the reliable delivery of reward events. The interface design provides customized interfaces for different access scenarios to minimize the user's understanding and access costs and shorten the development cycle.

In order to ensure that the SDK code structure is clear and has high scalability and maintainability, a layered design is used within the SDK at the code structure level, which is divided into an external interface layer, an internal interface layer, and an internal implementation layer.

3.1.1 External interface layer

The external interface layer defines the external interface exposed to users. In addition to the initialization, deinitialization and general asynchronous award interface, it also provides differentiated customized interfaces for red envelope rain, fireworks show and card collection. The definition of the general asynchronous award interface is consistent with the asynchronous award interface of the reward RPC service. By calling the RPC interface and issuing awards through the SDK, the access party can migrate in both directions at low cost.

The customized interface combines the characteristics of the usage scenario, solidifies common parameters such as activity ID, scenario ID, reward type, etc., reduces the number of interface input parameters, and makes the function name semantics clearer, which can further reduce the cost of use for the access party and improve the readability and maintainability of the access party's code. For some scenarios, it also undertakes the splicing of global idempotent IDs.

In addition to user information (user ID, device ID and AppID) and reward information (reward type, value), the award request must also carry a globally unique ID as the order number to achieve idempotency based on the order number. The order number is spliced ​​by the access party based on the activity information and user information. All interfaces support the caller to write extended fields (key-value pairs in Map format) to save business custom information.

3.1.2 Internal interface layer

The internal interface layer provides a general asynchronous reward distribution interface (SendBonus), a Token generation and storage interface (GenBonusToken), an initialization interface, and a deinitialization interface. The external interface is differentiated based on the internal interface and provides more detailed functions. The internal interface layer shields the internal implementation details from the upper layer.

Taking the asynchronous distribution interface SendBonus function as an example, it mainly integrates functions such as parameter checking, dot monitoring, virtual queue selection, construction and sending of reward messages, and generation and storage of reward tokens. After the parameter verification is passed, the SendBonus interface returns the reward token for use by the upper-level caller (usually returned to the front end and client).

 /*
SendBonus
@act event information
@user User information
@bonus bonus information
*/
func SendBonus(ctx context.Context, act Activity, user User, bonus *BonusContent) (string, error) {
// Parameter check
if err := CheckParams(act, user); err != nil {
// Output error log and monitor abnormal requests
return "", err
}

// Check if the reward type is legal
cfg, err := CheckBonus(bonus)
if err != nil {
// Output error log and monitor abnormal requests
return "", err
}

//Construct reward message
message := &event.BonusEvent{...}

// SendEvent selects the queue based on the reward attributes
if err = queue.SendEvent(ctx, message); err != nil {
return GenBonusToken(ctx, act, user, info, true), err
}

//Construct and return reward Token
return GenBonusToken(ctx, act, user, info, true), nil
}

3.1.3 Internal Implementation Layer

The internal implementation layer mainly includes two modules: reward token and virtual queue. The token module is responsible for the generation, storage and query of tokens; the queue module is responsible for the reliable delivery of messages.

A. Token Module

Within the entire activity system, the reward system distributes real rewards through consumption reward events (asynchronous messages). In the event of a disaster within the reward system or a delay in the actual receipt of rewards, the Token mechanism is introduced to ensure that the user experience is intact, that users can see the reward flow on the activity page, and that users can use the rewards (cash can be withdrawn, coupons can be used, etc.). Token exists as a credential for users to obtain rewards, and corresponds one-to-one with reward events. The generation and circulation process of Token is shown in the figure below:

Token data structure and encryption and decryption

The internal data structure of the token is defined using Protobuf, which improves the serialization and deserialization performance compared to JSON, and reduces the size of the serialized data by 50%. The token data will be returned to the client and saved locally. To prevent the black industry from parsing the token to construct data and maliciously request the server interface, the token data needs to be encrypted. The plain text of the token object serialized using Protobuf is encrypted using the company's KMS tool. The encrypted ciphertext is encoded using the Base64 algorithm for network transmission and local storage on the client. When decrypting, first perform Base64 decoding, then use the KMS tool to decrypt, and use Brotobuf to deserialize the obtained plain text to get the Token object.

The token data content is as follows:

 syntax = "proto3";

message BonusToken {
string TradeNo = 1; // Order number, globally unique, used for idempotence
int64 UserID = 2; // The UID in the APP at the time of awarding
string Activity = 3; // Activity
string Scene = 4; // Scene
int64 AwardType = 5; // Award type
int32 AwardCount = 6; // Award value
int64 AwardTime = 7; // Award issuance timestamp
string Desc = 8; // Reward text
}

Token Storage

Token storage is a typical scenario with more writes and fewer reads. The underlying storage needs to directly carry the peak traffic of award issuance (estimated 3.5 million QPS, and multiple awards will be issued in one request in some scenarios). Users will read the storage only after entering the wallet page (estimated 400,000 QPS). The magnitude of read and write requests is quite different. The validity period of the data is short, and the reward can be deleted after it is actually credited. The write scenario is to insert a single token, and the read scenario is to read the token list.

Tokens are mainly generated by red envelope rain, card collection and fireworks. The number of rewards for red envelope rain and card collection has a clear upper limit. In the fireworks game, users can receive rewards as fast as every 30 seconds. There is no limit on the number of times users can receive rewards. In theory, a single user can generate 500 tokens in the entire fireworks event.

Based on the estimated online traffic, read-write model and activity characteristics, it was decided to use Redis as the underlying storage, Hash as the data structure, the user's ActID as the key of the Hash data, the Token's order number TradeNo as the Hash Field, and the serialized plain text of the Token as the Hash Value.

Token Service

The Token service provides interfaces for querying the user token list and verifying the validity of encrypted tokens. Depending on whether the token ciphertext can be decrypted normally and whether the decrypted token exists in Redis, the token validity verification interface returns three results:

  • Illegal Token: The ciphertext cannot be decrypted
  • Unknown Token: The ciphertext can be decrypted, but no record is stored
  • Legal Token: The ciphertext can be decrypted and the records are stored

The reward SDK will not retry when writing to the Redis of the token, and there are very few cases where the token is not saved successfully. In order to ensure the security of funds and prevent malicious attacks from the black market, decryptable unknown tokens cannot be used for forced entry.

Token Usage

After the user participates in the event and receives the reward, the token is saved by the activity front-end calling the client JSB. When the user views the reward flow, the activity wallet page front-end will read the local token list through JSB and carry it when requesting the asset middle-end service. The asset middle-end service uses TokenSDK for decryption, and at the same time requests the Token service to read the server-side token list and merge it. The asset middle-end will also delete the tokens that have been credited in the merged list, insert the flow that has not yet been credited into the flow returned to the user, and correct the activity wallet balance to ensure that the user's rewards are visible in a timely manner.

When users withdraw money from the activity wallet page, they will also bring the local Token of the client to the asset middle office service. The asset middle office service will force the legal Token that has not been recorded to be recorded to ensure that users can complete the withdrawal operation.

The role of client and server tokens

When the message queue that the reward system relies on fails and cannot be written or consumed, or when there is a delay in the actual receipt of rewards due to peak load shaving and flow control, both tokens can ensure lossless user experience to a certain extent.

The client token is transmitted through the network between the user device and the backend service and is stored in the user device. The server token is transmitted through the internal network and stored in the centralized Redis storage. The two tokens back up each other. When the local token is not available, the server token can be relied on. When the server token service fails, the client token can still guarantee the user experience.

This event was launched simultaneously on 8 ByteDance apps. The Token service can also ensure that users have a consistent experience on different apps and even on different devices.

B. Queue module

The Queue module is responsible for providing a "reliable" message delivery service. The externally exposed SendEvent function can select the corresponding virtual queue to send messages based on the reward and provide unified monitoring capabilities.

 func SendEvent(ctx context.Context, msg *BonusEvent) error {
// Select a dedicated virtual queue based on reward information
queue := GetQueue(msg.Activity, msg.Scene, msg.BonusType)
data, err := proto.Marshal(message)
if err != nil {
return err
}
return queue.Send(ctx, message.UserID, message.UniqueID, data)
}

Virtual Queue is an encapsulation of ByteMQ and RocketMQ in the company. The usage details of the two message queue Producer-SDK are shielded through code encapsulation, and the two MQs are supported for mutual backup to improve the disaster recovery capability of the entire system. The class diagram of the virtual queue is as follows:

The Send method of the virtual queue can dynamically adjust the usage ratio of the primary and backup producers according to the user ID, providing automatic disaster recovery capabilities in the event of a single producer failure.

 func (q *Queue) Send(ctx context.Context, uid int64, tradeNo string, data []byte) error {
var err error
if (uid % 100) < GetQueueRatio(q.Name()) {
err = q.Master.Send(ctx, tradeNo, data)
if err != nil {
err = q.Backup.Send(ctx, tradeNo, data)
}
} else {
err = q.Backup.Send(ctx, tradeNo, data)
if err != nil {
err = q.Master.Send(ctx, tradeNo, data)
}
}
return err
}

When using the SDK asynchronous batch sending function of RocketMQ or ByteMQ, the Producer masks the difference in the failure callbacks of the two SDKs and returns the failure message to the upper layer through a unified channel. The Retry logic of the virtual queue is responsible for reading the failure messages of the primary and standby Producers and retrying the messages by rotating between the primary and standby Producers. If the service process exits without exception, the message can be guaranteed to be sent successfully. When the process exits normally, the Close method will wait for all messages to be processed before returning.

Message queue topic is configurable

The virtual queue uses two message queues, Master and Backup, and decouples them from the underlying message queue type through code abstraction. In a real online environment, in order to achieve disaster recovery, the Master and Backup of a single virtual queue need to use message queue Topics of different types or different physical clusters.

During the Spring Festival event, the R&D and operation teams of ByteMQ and RocketMQ respectively provided a dedicated cluster for the event and provided key operation and maintenance support. The reward system applied for two topics in the event clusters of ByteMQ and RocketMQ respectively. Based on the four topics, three virtual queues were built on the upper layer.

The Producer instance of a Topic can be reused in different queues. In the figure above, ByteMQ producer S is used as the Master in the Special Queue and as the Backup in the Express Queue; RocketMQ producer B is used as the Backup in both the Massive and Special Queue.

The message queue topic used within the reward SDK is configured in the dynamic configuration TCC. The mapping relationship between the virtual queue and the Producer instance can also be configured through TCC. The code is decoupled from the message queue cluster and topic. The message queue topic can be easily replaced during development, testing, and online operation.

The virtual queue corresponding to the reward is configurable

The correspondence between reward types and virtual queues is configured in TCC. Different reward types can dynamically specify the virtual queue to be sent. If no configuration is made, the Massive virtual queue is used by default. In the SendEvent method, call GetQueue to select the virtual queue for distribution. During the Spring Festival event, the Massive virtual queue carries cash rewards issued in all scenarios; the Special virtual queue carries coupons issued in all scenarios; and the Express virtual queue carries incentive gold coin rewards in all scenarios.

Asynchronous batch sending of messages

The producer SDKs of ByteMQ and RocketMQ both support synchronous and asynchronous batch message sending. The P99 latency of RocketMQ synchronous sending is 20 ms, while the P99 latency of ByteMQ synchronous sending is in the second level. When sending messages of the same order of magnitude, the CPU usage of RocketMQ is significantly higher than that of ByteMQ. In asynchronous sending mode, the producer SDK of the message queue will start the coroutine timing or send when the message in the buffer reaches the threshold. The timing interval and buffer threshold can be configured at initialization. Batch sending can reduce the number of requests made by producers to the message queue service. Assuming that every 100 messages are sent in batches, the QPS of the message queue service can be reduced by up to 100 times, greatly reducing the load on the message queue cluster.

In order to reduce the response delay of the reward event sending interface and keep the message queue cluster load at a low level, the asynchronous batch sending mode is used in large-traffic reward distribution scenarios, and ByteMQ is configured to carry the main traffic.

3.2 Consumer Design

The peak shaving function of the message queue is implemented by controlling the consumption speed of consumers. RocketMQ consumption is implemented based on the long round-robin mode, which combines the advantages of both push and pull modes. ByteMQ consumption is in pull mode. Consumer instances can control the consumption speed by controlling the frequency of pulling messages and the number of messages pulled at a time.

In the Spring Festival event reward distribution scenario, it is necessary not only to dynamically adjust the total consumption speed of multiple message queues to ensure that downstream reward services, asset middle-office services, and incentive middle-office services are not overloaded, and to make full use of machine resources; it is also necessary to dynamically control the consumption speed of different reward types and support the priority entry of important rewards such as cash.

There are many types of rewards issued in the event, and a message queue topic cannot be assigned separately for each reward. The number of rewards issued by different types varies significantly. Rewards with large issuance volume and high accounting priority have exclusive topics, while rewards with small issuance volume and low accounting priority share a topic. The actual accounting services (downstream services of asset middle office services) of different reward types have different accounting capabilities. The service with the smallest accounting capability can only process 2,000 issuance requests per second. It is necessary to support flexible consumption speed control capabilities in the reward type dimension.

In addition to multi-dimensional speed control, it is also necessary to provide reliable consumption capabilities. Each reward message must be successfully processed at least once, and all rewards must be successfully credited.

Based on the above background, there may be a difference between the message pulling speed (reading messages from the Topic) and the message processing speed (limiting the speed by the reward type, calling the reward system to issue rewards) of the reward consumer service. When the pulling speed is lower than the processing speed, the throughput of the reward service decreases, and the message accumulation time in the Broker becomes longer; when the pulling speed is higher than the processing speed, messages that cannot pass the reward type speed limit will accumulate in the memory of the consumer service process and block consumption. If the difference is significant, the consumer service process may exit due to OOM, affecting the stability of the service. For messages that are limited by the reward type, they need to be re-queued immediately, and the consumer service continues to process subsequent messages. Messages that fail to be processed temporarily due to network fluctuations and other reasons also need to be re-queued to ensure that the messages can be processed successfully in the end.

3.2.1 Consumption speed control

A. Consumption speed limit

RocketMQ consumer instances can configure the single-instance consumption speed and the number of consumption workers when they are started. To dynamically adjust the consumption speed, you need to restart the consumer instance. ByteMQ is compatible with the Kafka protocol. The Golang code uses sarama-cluster (https://github.com/bsm/sarama-cluster) to consume the ByteMQ queue. Compared with RocketMQ's SDK, sarama-cluster is simpler and does not provide single-instance consumption speed limit capabilities. A single instance can subscribe to multiple Partitions. Each Partition will start a coroutine to read messages from the Broker. Multiple Partitons share a global channel (Channel) to write pending messages. The business code needs to read messages from the global channel for processing. The speed limit logic can only be implemented in the business logic. Dynamic adjustment of the consumption speed does not require restarting the consumer instance.

Based on the characteristics of sarama-cluster, the single instance rate limiter of ByteMQ consumer is implemented using the Go native rate limiter (golang.org/x/time/rate). The code implementation is as follows:

 type Limiter struct {
Open bool
Fetcher LimitFetcher
inner *rate.Limiter
stop chan struct{}
}
// Wait is called before processing the message, and is processed after returning
func (s *Limiter) Wait() {
if s.Open {
_ = s.inner.Wait(context.Background())
}
}
// Loop is used to monitor speed limit changes
func (s *Limiter) Loop() {
for s.Open && s.Fetcher != nil {
select {
case <-time.After(time.Second * 5):
newLimit := s.Fetcher()
if newLimit != int(s.inner.Limit()) {
s.inner.SetLimit(rate.Limit(newLimit))
}
case <-s.stop:
return
}
}
}

The native Go rate limiter uses the token bucket algorithm to implement current limiting. It does not maintain a timer internally, but adopts the idea of ​​lazy loading. When obtaining a token, it calculates and updates the number of available tokens based on the time difference. It does not have any external dependencies and is very suitable for single-instance current limiting.

When the rate of the rate limiter is adjusted dynamically, the tokens consumed but not used through the rate limiter Reserve and Wait interfaces will not be cancelled. The blocking time of the Wait method will not change due to the rate adjustment. After the rate adjustment occurs, the QPS generated to the downstream consists of three parts: the requests that were waiting before the adjustment (blocked in rate.Limiter::Wait()), the requests brought by the newly added tokens after the adjustment, and the requests brought by the Burst (bucket capacity). The QPS generated to the downstream in a short period of time after the adjustment may exceed the expected speed. For burst traffic scenarios, the Burst should not be set too large.

 // SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated
// or underutilized by those which reserved (using Reserve or Wait) but did not yet act
// before SetLimitAt was called.
func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit)
B. Concurrent consumption

When RocketMQ consumes in an ordered manner, a single Queue can only be assigned one Worker for consumption. Only after the previous message in the current Queue is successfully processed will the next message be processed. The consumption speed is limited by the number of Queues and the processing delay of a single message. When consuming out of order, all Workers share a buffer and randomly consume messages from different Queues. Workers process messages concurrently. The more Workers there are, the faster the consumption speed.

When RocketMQ performs message confirmation (ACK), when the number of messages successfully processed locally exceeds a certain number, or when a certain time has passed since the last submission, the consumer instance will batch commit (BatchCommit) the successful consumption information to the Broker. The batch submission request contains the MsgID, QueueID, and Offset of each message. The Broker side provides a message confirmation window mechanism, which saves the minimum Offset in the window of the corresponding Queue to the disk each time. If the Broker goes down, the messages in the window that are greater than the Offset saved on the disk will be consumed again. From the consumer's perspective, the messages that have been successfully confirmed will be consumed. Therefore, RocketMQ cannot guarantee At Most Once, and the message processing logic needs to ensure idempotence.

The ByteMQ message confirmation mechanism is relatively simple. The Broker does not provide a message confirmation window mechanism. When receiving the Commit request from the consumer instance, the current Offset is saved directly. Messages with an offset less than the current Offset will not be consumed again. In the consumer instance, the MarkOffset method called by the business code will be based on the Offset+1 of the confirmed message and recorded in the memory, and submitted to the Broker by the coroutine at a fixed time. If the consumer instance goes down, the message whose Offset is not submitted to the Broker will be sent again by the Broker. ByteMQ cannot guarantee At Most Once, and consumers also need to ensure that the processing logic needs to ensure idempotence.

When consuming ByteMQ, after reading the message from the global channel exposed by sarama-cluster, calling the MarkOffset method after successful synchronization can ensure sequential consumption. However, synchronous processing will seriously reduce the consumption speed (a single instance can only process one message at a time). Starting coroutine asynchronous processing can process messages concurrently, and the consumption speed can be increased by increasing the number of coroutines. However, messages will be lost in cases such as abnormal exit of the consumer process and consumer downtime. For example: after a message with a large offset is processed and successfully confirmed (the offset is successfully submitted to the broker), the consumer crashes before the message with a small offset is successfully processed, and the broker no longer sends the message, resulting in the message being missed and not meeting the At Least Once semantics.

 // MarkOffset marks the provided message as processed, alongside a metadata string
// that represents the state of the partition consumer at that point in time. The
// metadata string can be used by another consumer to restore that state, so it
// can resume consumption.
//
// Note: calling MarkOffset does not necessarily commit the offset to the backend
// store immediately for efficiency reasons, and it may never be committed if
// your application crashes. This means that you may end up processing the same
// message twice, and your processing should ideally be idempotent.
func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
c.subs.Fetch(c.client.config.TryWrapTopicByEnv(msg.Topic), msg.Partition).MarkOffset(msg.Offset+ 1, metadata)
}

To solve the above-mentioned problem of missing messages, it is necessary to optimize the confirmation mechanism of ByteMQ at the business layer, that is, to implement the message confirmation window mechanism in the consumer code. In the consumer process, the offset is cached in the linked list according to the order of the message, and the pointer to the linked list node is stored in the HashMap with the offset as the key. When the message is successfully processed, the address is found through the HashMap and the state of the linked list node is modified. The local coroutine scans from the head of the linked list regularly and submits the successfully consumed offset to the Broker in strict order. During concurrent processing, it is ensured that messages with larger offsets will not be confirmed to the Broker in advance.

3.2.2 Event processing logic

RocketMQ provides a failure queue and retry capabilities, but ByteMQ does not have a failure handling mechanism. To smooth out the differences between the two message queues, the event handling method (HandleMessage) needs to ensure successful processing as much as possible, and messages that fail to be processed need to be re-queued (SendEventToBackup).

After the RocketMQ consumer fails to re-queue the failed message multiple times, it will continue to use the failure retry capability provided by the message queue SDK. Since the ByteMQ SDK does not have a failure handling mechanism, after the failed message fails to re-queue multiple times, its Offset will still be confirmed to ensure that it will not block subsequent message processing.

HandleMessage

 // HandleMessage for ByteMQ
func HandleMessage(msg *sarama.ConsumerMessage) error {
err := DoReward(msg.Context, msg.Value, limiter)
MarkOffser(msg, err) // Local confirmation, submitted by asynchronous coroutine at regular intervals
return nil
}

// HandleMessage for RocketMQ
func (w wrapper) HandleMessage(ctx context.Context, msg *pb.ConsumeMessage) error {
return handler.DoReward(ctx, msg.Msg.MsgBody, limiter)
}

type Limiter interface {
Allow(*BonusEvent) bool
}

func DoReward(ctx context.Context, data []byte, rate Limiter) error {
bonus := &BonusEvent{}
if err := proto.Unmarshal(data, bonus); err != nil {
return err
}
// Limit the flow according to the reward type. When rate is nil, there is no flow limit. When the circuit breaker is triggered, it will re-enter the queue directly.
if rate == nil || rate.Allow(bonus) {
// Synchronously call the reward service to issue rewards
if err := callReward(ctx, bonus); err == nil {
return nil
}
}
// Processing failure: re-write the queue
return SendEventToBackup(ctx, bonus.UniqueID, bonus)
}

SendEventToBackup

 func SendEventToBackup(ctx context.Context, tradeNo string, bonus *BonusEvent) error {
bonus.Retry++ // Increase the number of Retry times
data, err := proto.Marshal(bonus)
if err != nil {
return err
}
// Use the new PartitonKey to resend
newPartitionKey := fmt.Sprintf("%s{%d}", bonus.UniqueID, bonus.Retry)
for _, queue := range instances {
// Multiple alternative queues for re-entry queues
if err = queue.Send(ctx, newPartitionKey, data); err == nil {
return nil
}
}
// In extreme cases, handle the problem by retrieving the logs
logs.CtxError(ctx, "%s", base64.StdEncoding.EncodeToString(data) )
return err
}

3.2.3 Reward Type Speed ​​Limit

Since different reward types are ultimately recorded by different downstream systems, in order to ensure the stability of the downstream systems and reduce the return rate limiting errors and invalid calls of the downstream systems, a single instance rate limit is configured for each reward type.

 func NewLimiter() *Limiter {
l := &Limiter{
m: sync.Map{},
ticker: time.NewTicker(5 * time.Second),
}
l.loop()
return l
}

type Limiter struct {
m sync.Map
ticker *time.Ticker
}

type innerLimiter struct {
*rate.Limiter
Fuse bool
}

// Allow processes the message when it returns true; does not process the message when it returns false and directly re-enters the queue
func (L *Limiter) Allow(event *BonusEvent) bool {
if event == nil {
return true
}
if v, exist := LmLoad(GetBonusType(event)); exist {
if inner, ok := v.(*innerLimiter); ok {
if inner.Fuse { // The fuse switch is turned on
return false
}
return inner.Allow()
}
}
return true
}

func (L *Limiter) loop() {
go func() {
defer Recover()
L.run()
for range L.ticker.C {
L.run()
}
}()
}

// Monitor configuration changes and dynamically adjust speed limit
func (L *Limiter) run() {
for wt, config := range tcc.GetRateCfg() {
value, exist := LmLoad(wt)
if !exist || value == nil {
// Create a new rate limiter
LmStore(wt, &innerLimiter{
Limiter: rate.NewLimiter(rate.Limit(config.Rate), config.Burst),
Fuse: config.Fuse,
})
continue
}

if inner, ok := value.(*innerLimiter); ok {
// Update the existing current limiter
inner.Fuse = config.Fuse
if int(inner.Limiter.Limit()) != config.Rate {
inner.Limiter.SetLimit(rate.Limit(config.Rate))
}
continue
}

LmDelete(wt)
LmStore(wt, &innerLimiter{
Limiter: rate.NewLimiter(rate.Limit(config.Rate), config.Burst),
Fuse: config.Fuse,
})
}
}

func (L *Limiter) Close() {
if L.ticker != nil {
L.ticker.Stop()
L.ticker = nil
}
}

3.2.4 Coordination of consumption and reward type speed limits

The consumer is like a pipeline. The consumption rate limit is equivalent to the flow limit of the pipeline, and the reward type rate limit is equivalent to the flow limit of the pipeline. When the consumption rate is greater than the sum of all types of rates, it will cause the request to re-enter the queue. To reduce the re-entry queue, two points must be guaranteed:

  1. The consumption speed limit and reward type speed limit are linked. When the type speed limit is adjusted, the consumption speed is automatically adjusted to adapt
  2. When the upstream issues rewards, the probability distribution of different rewards and the type rate limit configuration match

During the Spring Festival activities, the probability of reward distribution was controlled by the algorithm strategy. In scenarios such as red envelope rain, fireworks display, and card collection prize draw, the probability distribution was in line with expectations, and no re-entry into the queue occurred.

3.3 Reward Service Design

The reward service is responsible for calling the asset middle office service and the incentive middle office service to issue specific rewards. It provides the upper layer with global idempotence guarantee, failed hosting retry, budget control and other capabilities.

Since the upstream uses the same idempotent ID to issue different rewards, and the data between different downstream systems is isolated, the reward service needs to store the processing status and results of all reward request to ensure global idempotence. The issuance request is stored using the company's self-developed Abase, and the CAS capability provided by Abase is used to perform concurrent control on the reward issuance behavior to ensure that the same idempotent ID can only be used for one issuance behavior. The reward type and value of the upstream retry request must be consistent with the original request in order to pass the verification and enter the actual issuance process.

The reward service provides two types of interfaces: synchronous reward issuance and asynchronous reward issuance. For scenarios where the reward issuance result needs to be perceived, the upstream needs to use the synchronous reward issuance interface. For example, reward event consumers need to clearly perceive whether the issuance is successful to decide whether a retry is needed. The stability and response delay of the synchronous interface are highly dependent on downstream services. The downstream issuance logic of some rewards is heavy and time-consuming, which can easily lead to upstream call timeouts and reduced stability.

For scenarios that do not require real-time perception of the distribution results, or are very sensitive to interface response experiments, the upstream needs to use an asynchronous award distribution interface. The asynchronous interface returns after successfully delivering the message to the message queue through budget control. The asynchronous interface can improve the system throughput and reduce the upstream waiting time. By utilizing the peak shaving and asynchronous capabilities of the message queue, the reward service can directly undertake the access of medium-scale (distribution QPS of 100,000 to 500,000) award distribution scenarios. For large-scale (distribution QPS above 500,000) award distribution scenarios, access is required through the reward SDK. Compared with the synchronous interface, the asynchronous interface supports general failure retry logic and exception handling capabilities. The access party does not need to develop related logic again, which can reduce R&D investment.

3.3.1 Simultaneous Awarding

The synchronous award distribution interface will return the account entry results returned by the downstream system in real time. The upstream service is responsible for handling failed requests, and the award service does not host them. The process of synchronous award distribution is shown in the figure below:

In the above flowchart, the write message queue and add record node can be set as strong dependency nodes according to the scenario requirements, or as weak dependency nodes. When the write message queue and add record nodes are set to weak dependency, the reward service cannot strictly guarantee global idempotence. At this time, idempotence needs to be guaranteed by the downstream system; when the message queue and Abase storage system have a disaster, the reward service can provide services to the outside world normally.

3.3.2 Asynchronous awards

Although the upstream calls the asynchronous award-giving interface, the budget control service will be called simultaneously during the upstream request to deduct the budget. In the asynchronous award-giving process, the award-giving request will be successfully written to the message queue and will be returned immediately. The subsequent award-giving process is triggered by the consumer service of the reward system through consumption messages, and ensures that it is successfully recorded.

During the asynchronous award request processing, when a non-retry error returned by the downstream system is received, the exception request will be written to a dedicated failure queue and archived in the Hive table for subsequent processing.

3.3.3 Budget Control

Budget control is one of the means to ensure the safety of funds. In the Spring Festival activities, in addition to the frequency control logic and budget control strategy of the event gameplay itself, the reward system, asset middle platform and downstream account services have their own budget control strategies.

The scene budget in the reward system can support dynamic adjustment by dynamically configuring the TCC configuration. The budget consumption is stored through KV. In order to prevent the occurrence of hotspot keys, a key is made according to the traffic size of the access scenario. The single budget key carries requests less than 500 QPS. When performing budget deduction, a specific budget key is determined by hashing the unique order number to determine the specific budget key, and several latest order numbers are stored in the value of the budget key. Based on the CAS capability of the storage system, the limited budget deduction idempotence capability is provided. If a high concurrent request is generated on the single budget key and the stored order number is eliminated, it will cause a budget overdeduction. When performing budget configuration, a certain proportion of overload is made to prevent false interception due to uneven traffic and overdeduction.

In the asset middle platform system, based on the ability to execute Lua scripts, a multi-key transaction budget control scheme is realized, providing relatively strict budget control capabilities. In the downstream account services, transaction capabilities based on relational data are strictly controlled to ensure that there will be no over-issuance in active scenarios.

4. Conclusion

The Spring Festival event was officially launched on January 24, 2022 and ended on January 31, 2022 (New Year's Eve), and lasted for 7 days. During the event, about 7 billion rewards were issued through the reward system, and 2 billion were issued on New Year's Eve alone. In many red envelope rains, the reward system has achieved reliable processing of all messages from the production end to the consumer end. No effective differences were detected offline reconciliation, and all cash rewards were successfully recorded.

During the Spring Festival activities, there are extremely high requirements for the performance, stability and reliability of related services. When designing technical solutions, the technical selection and conventional requirements are different, and performance and reliability need to be weighed among the optional components. Reducing system complexity, reducing external dependencies, and fully understanding of the dependencies is the key to ensuring the stability and reliability of the entire system.

<<:  Practical application of user experience optimization on special effects side - Package size

>>:  In-depth understanding of the Android graphics system

Recommend

How to achieve new user traffic across the entire network?

Recently, the E-Commerce Research Center of the C...

What exactly does the “smell of money” smell like?

Hello, this is Science Popularization China. In a...

KOL promotion strategies that new media professionals must know

I’m a bit cautious with my marketing budget. In m...

TikTok’s advertising types and placements

TikTok is now well known to many domestic sellers...

How to find deleted WeChat payment records? Official tips

Nowadays, mobile payment has become the norm when...

What are those plants that look like garlic? Recognize them, beware of poison!

Produced by: Science Popularization China Author:...

Why Google is eating itself

[[145201]] On Monday, Google CEO Larry Page annou...

How to promote your products effectively?

In most cases, many people are easily influenced ...

Gou Wenqiang's "31 Posture Correction Training Camp" will give you a perfect body

Introduction to the training course content: In 7 ...

8 Facebook marketing tips for the peak season!

It’s no secret that for most businesses, sales sp...

Nanmen Lao Xu's "Practical Training Camp"

Nanmen Lao Xu's "Practical Training Camp...

New broadband players may disrupt the pricing structure

The broadband market has become less peaceful sin...