Do you really understand real-time computing?

Do you really understand real-time computing?

​​

What is real-time computing?

Please see the following figure:

Let’s take the statistics of hot-selling products as an example and look at the traditional calculation method:

  1. Clean user behavior, log and other information and save them in the database.
  2. Save the order information in the database.
  3. Use triggers or coroutines to create local indexes or remote independent indexes.
  4. Join order information, order details, user information, product information and other tables, aggregate statistics of hot-selling products within 20 minutes, and return the top-10.
  5. Web or app display.

This is a hypothetical scenario, but assuming you have experience dealing with similar scenarios, you should be able to appreciate some of the following issues and difficulties:

  1. Scale-out
    Obviously, if it is an e-commerce website of a certain scale, the amount of data is very large. And because transaction information involves transactions, it is difficult to directly abandon the transaction capabilities of relational databases and migrate to NoSQL databases with better scale-out capabilities.

    Then, sharding is generally done. Historical data is easy to handle, as we can archive it by date and cache the results through batch offline computing.
    However, the requirement here is within 20 minutes, which is difficult.
  2. This performance issue is consistent with scale-out. Suppose we do sharding. Because the table is scattered in various nodes, we need to store it multiple times and perform aggregation calculations at the business layer.

    The question is, how many times do we need to enter the warehouse to meet the 20-minute time requirement?
    What about 10 minutes?
    What about 5 minutes?
    What about real time?
    Moreover, the business layer also faces the limitation of single-point computing power and needs to be expanded horizontally, so the issue of consistency also needs to be considered.
    So, everything seems complicated here.
  3. Business expansion problem: Assume that we not only need to deal with the statistics of hot-selling products, but also count the clicks on advertisements, or quickly determine the characteristics of users based on their access behavior to adjust the information they see to better meet their potential needs, etc., then the business layer will be more complicated.

Maybe you have a better way, but actually, what we need is a new understanding:


What happens in this world is real-time.
So we need a real-time computing model rather than a batch processing model.
The model we need must be able to handle large amounts of data, so it must have good scale-out capabilities. Most importantly, we don’t need to consider too many consistency and replication issues.


Then, this computing model is a real-time computing model, which can also be considered as a streaming computing model.

Now assuming we have such a model, we can happily design new business scenarios:

  1. What is the most forwarded Weibo post?
  2. What are the most popular products?
  3. What are the hot topics that everyone is searching for?
  4. Which of our ads, and in which position, gets the most clicks?

Or, we can ask:


What is happening in this world?


What is the hottest topic on Weibo?

We use a simple sliding window counting problem to unveil the mystery of so-called real-time computing.

Assume that our business requirements are:


Count the 10 hottest Weibo topics within 20 minutes.


To solve this problem, we need to consider:

  1. Here, let’s assume that our data comes from topics pushed via long links on Weibo.
  2. In question modeling, we consider topics that are expanded by #. The hottest topic is the one that appears more often than other topics.
    For example: @foreach_break: Hello, #world#, I love you, #Weibo#.
    "World" and "Weibo" are the topics.
  3. We use Storm as the computing engine.
  4. Defining time

How to define time?

Defining time is a difficult matter and depends on how much precision is required.
In practice, we generally use tick to represent the concept of moment.

In the Storm infrastructure, during the executor startup phase, a timer is used to trigger the "after a period of time" event.
As shown below:

 (defn setup-ticks! [worker executor-data]
(let [storm-conf (:storm-conf executor-data)
tick-time-secs (storm-conf TOPOLOGY-TICK-TUPLE-FREQ-SECS)
receive-queue (:receive-queue executor-data)
context (:worker-context executor-data)]
(when tick-time-secs
(if (or (system-id? (:component-id executor-data))
(and (= false (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
(= :spout (:type executor-data))))
(log-message "Timeouts disabled for executor " (:component-id executor-data) ":" (:executor-id executor-data))
(schedule-recurring
(:user-timer worker)
tick-time-secs
tick-time-secs
(fn []
(disruptor/publish
receive-queue
[[nil (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID)]]
)))))))

In the previous blog post, the relationship between these infrastructures has been analyzed in detail. If you don’t understand, you can look at the previous article.

Every once in a while, such an event is triggered. When the bolt downstream of the stream receives such an event, it can choose whether to increment the count or aggregate the results and send them to the stream.

How does the bolt determine whether the received tuple represents a "tick"?
The executor thread responsible for managing the bolt consumes messages from the message queue it subscribes to, and calls the execute method of the bolt. Then, you can judge in execute as follows:

 public static boolean isTick(Tuple tuple) {
return tuple != null
&& Constants.SYSTEM_COMPONENT_ID .equals(tuple.getSourceComponent())
&& Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
}

Combined with the setup-tick! clojure code above, we can know that SYSTEM_TICK_STREAM_ID is passed to the tuple as a constructor parameter in the callback of the timed event, so how does SYSTEM_COMPONENT_ID come from?
As you can see, in the following code, SYSTEM_TASK_ID is also passed to the tuple:

 ;; Note the SYSTEM_TASK_ID and SYSTEM_TICK_STREAM_ID
(TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID)

Then use the following code to get the SYSTEM_COMPONENT_ID:

 public String getComponentId(int taskId) {
if(taskId==Constants.SYSTEM_TASK_ID) {
return Constants.SYSTEM_COMPONENT_ID;
} else {
return _taskToComponent.get(taskId);
}
}

#p#

Sliding Window

With the above infrastructure, we still need some means to complete the "engineering" and turn the vision into reality.

Here, we look at the sliding window design by Michael G. Noll.


Note: The image is from http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm/

Topology

 String spoutId = "wordGenerator";
String counterId = "counter";
String intermediateRankerId = "intermediateRanker";
String totalRankerId = "finalRanker";
// Here, assume that TestWordSpout is the source of our topic tuples
builder.setSpout(spoutId, new TestWordSpout(), 5);
// The time window of RollingCountBolt is 9 seconds, and the statistical results are sent to the downstream every 3 seconds
builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word"));
// IntermediateRankingsBolt, will complete some aggregation and count the top-n topics
builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields(
"obj"));
// TotalRankingsBolt, completes the full aggregation and counts the top-n topics
builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);

The topology design above is as follows:


Note: The image is from http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm/

Combining Aggregate Calculations with Time

Earlier, we described the tick event, which triggers the execute method of the bolt in the callback. We can do this:

RollingCountBolt:

 @Override
public void execute(Tuple tuple) {
if (TupleUtils.isTick(tuple)) {
LOG.debug("Received tick tuple, triggering emit of current window counts");
// When a tick comes, send the statistical results within the time window and let the window scroll
emitCurrentWindowCounts();
}
else {
// Regular tuple, just count the topics
countObjAndAck(tuple);
}
}

// obj is the topic, add a count count++
// Note that the speed here basically depends on the speed of the stream, which may be millions per second or tens per second.
// Out of memory? Bolt can scale-out.
private void countObjAndAck(Tuple tuple) {
Object obj = tuple.getValue(0);
counter.incrementCount(obj);
collector.ack(tuple);
}

// Send the statistical results to the downstream
private void emitCurrentWindowCounts() {
Map<Object, Long> counts = counter.getCountsThenAdvanceWindow();
int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification();
lastModifiedTracker.markAsModified();
if (actualWindowLengthInSeconds != windowLengthInSeconds) {
LOG.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds));
}
emit(counts, actualWindowLengthInSeconds);
}

The above code may be a bit abstract, but you can understand it by looking at this picture. When the tick arrives, the window scrolls:


Note: The image is from http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm/

IntermediateRankingsBolt & TotalRankingsBolt:

 public final void execute(Tuple tuple, BasicOutputCollector collector) {
if (TupleUtils.isTick(tuple)) {
getLogger().debug("Received tick tuple, triggering emit of current rankings");
// Send the aggregated and sorted results to the downstream
emitRankings(collector);
}
else {
// Aggregate and sort
updateRankingsWithTuple(tuple);
}
}

Among them, the aggregation sorting methods of IntermediateRankingsBolt and TotalRankingsBolt are slightly different:

IntermediateRankingsBolt's aggregate sorting method:

 // IntermediateRankingsBolt's aggregate sorting method:
@Override
void updateRankingsWithTuple(Tuple tuple) {
// In this step, extract the topic and the number of times the topic appears
Rankable rankable = RankableObjectWithFields.from(tuple);
// In this step, aggregate the number of times the topic appears, and then reorder all topics
super.getRankings().updateWith(rankable);
}

TotalRankingsBolt's aggregate sorting method:

 // TotalRankingsBolt's aggregation sorting method
@Override
void updateRankingsWithTuple(Tuple tuple) {
// Pull out the intermediate results from IntermediateRankingsBolt
Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0);
// Aggregate and sort
super.getRankings().updateWith(rankingsToBeMerged);
// Remove 0 to save memory
super.getRankings().pruneZeroCounts();
}

The reordering method is relatively simple and crude, because only the first N are required, and N will not be very large:

 private void rerank() {
Collections.sort(rankedItems);
Collections.reverse(rankedItems);
}

Conclusion

The following figure may be the result we want. We have completed the statistics of hot topics between t0 and t1. The foreach_break is only for anti-piracy: ].

The article explains the concept and key codes of sliding window counting in detail. If you still don’t understand, please refer to the design and source code of Storm at http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm/.

Hope you understand what real-time computing is :]

<<:  "WEAK, STRONG, UNOWNED, for goodness sake!" - References in SWIFT

>>:  Digital progress bar

Recommend

A universal event planning solution

There is actually no shortcut to planning an even...

Case solved, why do we have to lick the lid when drinking yogurt? !

Yogurt is a common dairy product that many people...

DOU+ advertising and buying tips!

The biggest charm of Dou+ is to leverage natural ...

No KOC, no community

KOC , or key opinion consumer, generally refers t...

Kuaishou APP product analysis report!

In this article, I will start with the developmen...

Artificial Intelligence Report: Scaling

Accenture has released a new report, “Artificial ...

Does Stockholm Syndrome really exist?

© BBC Leviathan Press: There are many explanation...

They killed 140,000 goats, and they said it was for ecological protection?

The gunmen on the two helicopters, armed with sho...

How to create a landing page with high conversion rate? Share 2 pictures!

Unlike traditional industries that can bring prod...