A brief discussion on Java's Fork/Join concurrency framework

A brief discussion on Java's Fork/Join concurrency framework

[[199061]]

1. What is Fork/Join

The official definition given by Oracle is: Fork/Join framework is a multi-threaded processor that implements the ExecutorService interface. It can divide a large task into several small tasks for concurrent execution, making full use of available resources, thereby improving the execution efficiency of the application.

Fork/Join implements ExecutorService, so its tasks also need to be executed in the thread pool. The difference is that it uses a work stealing algorithm, where idle threads can steal tasks from fully loaded threads to help execute. (My personal understanding of work stealing is: since each thread in the thread pool has a queue, and the threads do not affect each other. So each thread gets a task from the head of its own task queue to execute. If a thread's task queue is empty at some point, and there are still tasks in the task queues of other threads, then this thread will take a task from the task queues of other threads to help execute. It's like stealing other people's work)

The core of the Fork/Join framework is the ForkJoinPool class that inherits AbstractExecutorService, which ensures the normal operation of the work-stealing algorithm and ForkJoinTask.

The following is the original text quoted from Oracle's official definition:

The fork/join framework is an implementation of the ExecutorService interface that helps you take advantage of multiple processors. It is designed for work that can be broken into smaller pieces recursively. The goal is to use all the available processing power to enhance the performance of your application.

As with any ExecutorService implementation, the fork/join framework distributes tasks to worker threads in a thread pool. The fork/join framework is distinct because it uses a work-stealing algorithm. Worker threads that run out of things to do can steal tasks from other threads that are still busy.

The center of the fork/join framework is the ForkJoinPool class, an extension of the AbstractExecutorService class. ForkJoinPool implements the core work-stealing algorithm and can execute ForkJoinTask processes.

2. Basic usage of Fork/Join

(1) Fork/Join base class

As mentioned above, Fork/Join is about splitting a large task into several small tasks, so the first step is of course to split the tasks, roughly as follows:

  1. if (this task is small enough) {
  2. Perform the tasks to be done
  3. } else {
  4. Split the task into two smaller parts
  5. Execute two small parts and wait for the execution results
  6. }

To implement FrokJoinTask, we need a base class that inherits RecursiveTask or RecursiveAction, and put the above code into the coupute method of the base class according to our business situation. Both RecursiveTask and RecursiveAction inherit FrokJoinTask. The difference between them is that RecursiveTask has a return value while RecursiveAction does not. The following is a demo I made to select the elements that still have "a" in a string list:

  1. @Override  
  2. protected List<String> compute() {
  3. // When the difference between end and start is less than the threshold, start the actual screening  
  4. if (end - this .start < threshold) {
  5. List<String> temp = list.subList( this .start, end);
  6. return temp.parallelStream().filter(s -> s.contains( "a" )).collect(Collectors.toList());
  7. } else {
  8. // If the difference between end and start is greater than the threshold  
  9. // Break the big task into two small tasks.  
  10. int middle = ( this .start + end) / 2 ;
  11. ForkJoinTest left = new ForkJoinTest(list, this .start, middle, threshold);
  12. ForkJoinTest right = new ForkJoinTest(list, middle, end, threshold);
  13. // Execute two "small tasks" in parallel  
  14. left.fork();
  15. right.fork();
  16. // Combine the results of the two "small tasks"  
  17. List<String> join = left.join();
  18. join.addAll(right.join());
  19. return join;
  20. }
  21. }

(2) Execution class

After the base class is completed, you can start calling. When calling, we first need the Fork/Join thread pool ForkJoinPool, then submit a ForkJoinTask to the thread pool and get the result. The input parameter of the submit method of ForkJoinPool is a ForkJoinTask, and the return value is also a ForkJoinTask. It provides a get method to get the execution result.

The code is as follows:

  1. ForkJoinPool pool = new ForkJoinPool();
  2. // Submit a decomposable ForkJoinTask task  
  3. ForkJoinTask<List<String>> future = pool.submit(forkJoinService);
  4. System.out.println(future.get());
  5. // Close the thread pool  
  6. pool.shutdown();

In this way, we have completed the development of a simple Fork/Join.

Tip: In Java 8, the parallelSort() method of java.util.Arrays and the encapsulated methods in the java.util.streams package also use Fork/Join. (Careful readers may notice that I also use stream in Fork/Join, so this Fork/Join is actually redundant because stream has already implemented Fork/Join. However, this is just a demo display and it doesn’t matter if it has no practical use.)

Quoting the official text:

One such implementation, introduced in Java SE 8, is used by the java.util.Arrays class for its parallelSort() methods. These methods are similar to sort(), but leverage concurrency via the fork/join framework. Parallel sorting of large arrays is faster than sequential sorting when run on multiprocessor systems.

Another implementation of the fork/join framework is used by methods in the java.util.streams package, which is part of Project Lambda scheduled for the Java SE 8 release.

Attached is the complete code for future reference:

1. Define an abstract class (for expansion, which has no practical use in this example and can be left undefined):

  1. import java.util.concurrent.RecursiveTask;
  2.  
  3. /**
  4. * Description: ForkJoin interface
  5. * Designer: jack
  6. * Date: 2017/8/3
  7. * Version: 1.0.0
  8. */  
  9. public   abstract   class ForkJoinService<T> extends RecursiveTask<T>{
  10. @Override  
  11. protected   abstract T compute();
  12. }

2. Define the base class

  1. import java.util.List;
  2. import java.util.stream.Collectors;
  3.  
  4. /**
  5. * Description: ForkJoin base class
  6. * Designer: jack
  7. * Date: 2017/8/3
  8. * Version: 1.0.0
  9. */  
  10. public   class ForkJoinTest extends ForkJoinService<List<String>> {
  11.  
  12. private   static ForkJoinTest forkJoinTest;
  13. private   int threshold; //Threshold  
  14. private List<String> list; //List to be split  
  15.  
  16. private ForkJoinTest(List<String> list, int threshold) {
  17. this .list = list;
  18. this .threshold = threshold;
  19. }
  20.  
  21. @Override  
  22. protected List<String> compute() {
  23. // When the difference between end and start is less than the threshold, start the actual screening  
  24. if (list.size() < threshold) {
  25. return list.parallelStream().filter(s -> s.contains( "a" )).collect(Collectors.toList());
  26. } else {
  27. // If the difference between end and start is greater than the threshold, split the large task into two small tasks.  
  28. int middle = list.size() / 2 ;
  29. List<String> leftList = list.subList( 0 , middle);
  30. List<String> rightList = list.subList(middle, list.size());
  31. ForkJoinTest left = new ForkJoinTest(leftList, threshold);
  32. ForkJoinTest right = new ForkJoinTest(rightList, threshold);
  33. // Execute two "small tasks" in parallel  
  34. left.fork();
  35. right.fork();
  36. // Combine the results of the two "small tasks"  
  37. List<String> join = left.join();
  38. join.addAll(right.join());
  39. return join;
  40. }
  41. }
  42.  
  43. /**
  44. * Get the ForkJoinTest instance
  45. * @param list List to be processed
  46. * @param threshold threshold
  47. * @return ForkJoinTest instance
  48. */  
  49. public   static ForkJoinService<List<String>> getInstance(List<String> list, int threshold) {
  50. if (forkJoinTest == null ) {
  51. synchronized (ForkJoinTest. class ) {
  52. if (forkJoinTest == null ) {
  53. forkJoinTest = new ForkJoinTest(list, threshold);
  54. }
  55. }
  56. }
  57. return forkJoinTest;
  58. }
  59. }

3. Execution class

  1. import java.util.ArrayList;
  2. import java.util.Arrays;
  3. import java.util.List;
  4. import java.util.concurrent.ExecutionException;
  5. import java.util.concurrent.ForkJoinPool;
  6. import java.util.concurrent.ForkJoinTask;
  7.  
  8. /**
  9. * Description: Fork/Join execution class
  10. * Designer: jack
  11. * Date: 2017/8/3
  12. * Version: 1.0.0
  13. */  
  14. public   class Test {
  15.  
  16. public   static   void main(String args[]) throws ExecutionException, InterruptedException {
  17.  
  18. String[] strings = { "a" , "ah" , "b" , "ba" , "ab" , "ac" , "sd" , "fd" , "ar" , "te" , "se" , "te" ,
  19. "sdr" , "gdf" , "df" , "fg" , "gh" , "oa" , "ah" , "qwe" , "re" , "ty" , "ui" };
  20. List<String> stringList = new ArrayList<>(Arrays.asList(strings));
  21.  
  22. ForkJoinPool pool = new ForkJoinPool();
  23. ForkJoinService<List<String>> forkJoinService = ForkJoinTest.getInstance(stringList, 20 );
  24. // Submit a decomposable ForkJoinTask task  
  25. ForkJoinTask<List<String>> future = pool.submit(forkJoinService);
  26. System.out.println(future.get());
  27. // Close the thread pool  
  28. pool.shutdown();
  29.  
  30. }
  31.  
  32. }

<<:  DeepMind: Combining artificial intelligence and neuroscience to achieve a virtuous cycle

>>:  Aiti Tribe Story Collection (25): Analysis of Chain Storage Stack and Code Implementation

Recommend

A wonderful debater teaches 12 lessons on precise expression

A wonderful debater teaches 12 lessons on precise...

Why are all the ads I see on TikTok games?

Capture SSS-level Lingkun at the beginning, 5th t...

Content operation: content distribution model analysis!

In this article, the author will analyze with you...

Illustration | Is returning straw to the fields a good idea?

Crop straw It is the other half of agricultural p...

10 common problems in Kuaishou short video operation, must-read for beginners

The rapid development of the Internet has also le...

BC Station New Site Rapid Ranking SEO Training Case

BC Station’s latest practical SEO training case f...

Ginkgo biloba actually has no fruit! So what is that stinky fruit you eat?

Audit expert: Wang Guoyi Postdoctoral fellow in N...

Rare! A green sky appeared in this city!

Recently, the sky in Sioux Falls, South Dakota, U...

Preview: 7 social media marketing trends for 2019

We all know by now how important social media is ...

How much does a 400 call cost and what does it include?

When many companies apply for 400 telephone numbe...

Apple declares war on mobile phone "bad habits" for 5G and to save itself

Apple held its 2019 spring conference again, and ...