All about Parallel Streams

In this article, we assume that you are already working with Streams introduced in Java 8 and know how to use the different kinds of available stream operations. If you are a beginner to Streams, please refer to this article http://talks.skilltoz.com/streams-in-java-8/

Overview

Parallel streams are streams that are capable of operating on multiple threads ie; the stream is split into multiple portions and a thread pool is used to process these portions independently. This is used to increase runtime performance when working on large data sets by leveraging multi-core architectures. 

Creating a Parallel Stream

Let us look at how we can perform stream operations in parallel without having to write any actual multithreading code. When you create a stream, it is always a serial stream unless we explicitly specify that it should be a parallel stream. 

A parallel Stream can be created by invoking the parallel() method on Stream or the parallelStream() method on a Collection.

Consider the printInSerial() method in the below example.

public static void printInSerial() {
Stream.of("USA ", "Canada ", "India ", "China ", "Japan  ")          .forEach(System.out::print);
System.out.println("-Printed Serially");
}

It always prints the given country names in exactly the given order, as it prints serially. 

You can make it a parallel Stream by invoking the parallel() method as shown below. Here, the country names may be printed in a different order for each execution. This happens because the portions of the Stream are processed by multiple threads.

public static void printInParallel() {
 Stream.of("USA ", "Canada ", "India ", "China ", "Japan ")
   .parallel().forEach(System.out::print);
 System.out.println("-Printed Parallelly");
}

Ordering in Parallel Streams

When parallel streams are executed, the Java compiler and runtime determine the order in which the stream’s elements are processed so as to maximize the benefits of parallel computing (unless otherwise specified by the stream operation). Hence, you cannot expect order to be maintained. 

Consider the below example where a collection is filtered, mapped and then sorted before the forEach operation.

List<String> flowers = Arrays.asList("Jasmine", "Rose", "Lotus", "Lily", "Chrysanthemum","Marigold","Daffodils");
flowers
   .parallelStream()
   .filter(str->str.length() > 4) 
   .map(String::toUpperCase)
   .sorted()
   .forEach(System.out::println);

Here, the elements of the list appear in an apparently random order in spite of the sorting. To maintain the ordering in the above example, we need to rewrite forEach in the last statement with forEachOrdered.

forEachOrdered processes the elements of the stream in the order specified by its source. However, if you use operations like forEachOrdered with parallel streams, the benefits of parallelism might be lost.

Getting the Thread Information

When a stream executes in parallel, the Java runtime partitions the stream into multiple substreams and executes in parallel using threads from the common ForkJoinPool

By default, this pool has one less thread as you have processors (assuming that parallel streams use all the available processors because they also use the main thread). This is demonstrated below. 

// Print the available number of processors. Assume it is 4
System.out.println(Runtime.getRuntime().availableProcessors());
    
// Get the parallelism for the common ForkJoinPool. If the number of processors is 4, this prints 3 by default
ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());

In the below example, let us debug the output to understand which threads are used for execution.

Arrays.asList("BAT", "WALL", "BALL", "DOLL", "CALL")
  .parallelStream()
  .filter(s -> {
     System.out.format("filter: %s [%s]\n", s,   Thread.currentThread().getName());
     return s.startsWith("B"); // Only starting with “B”
   })
  .map(s -> {
       System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName());
       return s.toLowerCase(); // Map to lowercase
   })
  .forEach(s -> System.out.format("for: %s [%s]\n", s, Thread.currentThread().getName()));

The output obtained may be as follows.

filter: BALL [main]
map: BALL [main]
for: ball [main]
filter: CALL [ForkJoinPool.commonPool-worker-3]
filter: WALL [ForkJoinPool.commonPool-worker-5]
filter: BAT [ForkJoinPool.commonPool-worker-7]
map: BAT [ForkJoinPool.commonPool-worker-7]
filter: DOLL [ForkJoinPool.commonPool-worker-5]
for: bat [ForkJoinPool.commonPool-worker-7]

You can see how the processes are happening in parallel using multiple threads. In consecutive executions, the output might be different because the threads actually used cannot be predicted.

Points to note

  • Parallelism is not always faster than performing operations serially, it depends on the available data and processor cores. Prefer only for large data sets
  • Use only for stateless operations. Operations such as sorted() which are stateful should be avoided
  • Use only for operations for which order is not important. Operations such as findFirst() for which order is important are to be avoided
  • Make sure that the code is thread-safe. If the parallel processes modify shared data, the outcome may be unpredictable
  • Avoid unnecessary autoboxing as it can affect performance

Practice Questions

For certification questions on the topic of parallel streams, visit http://talks.skilltoz.com/how-to-use-parallel-streams/

For the complete set of certification questions, visit http://talks.skilltoz.com/java-11-certification-exam-questions/

Leave a Reply

Your email address will not be published. Required fields are marked *