Java 8 Parallel Stream custom ThreadPool

Carvia Tech | August 23, 2020 | 2 min read | 3,876 views | Multithreading and Concurrency


There are two approaches to configure custom sized thread pool for Java 8 parallel stream operations - configure default common pool and running parallel stream operation inside ForkJoinPool.

Java does not provide any direct mechanism to control the number of threads and ThreadPool used by parallel() method in stream API, but there are two indirect way to configure the same.

Configure default Common Pool

Its documented that parallel() method utilizes the common pool available per classloader per jvm, and we have a mechanism to control the configuration of that default common pool using below 3 System properties

java.util.concurrent.ForkJoinPool.common.parallelism

The parallelism level, a non-negative integer

java.util.concurrent.ForkJoinPool.common.threadFactory

The class name of a ForkJoinPool.ForkJoinWorkerThreadFactory

java.util.concurrent.ForkJoinPool.common.exceptionHandler

The class name of a Thread.UncaughtExceptionHandler

For example, set the System property before calling the parallel stream

Changing the default common pool size JVM wide
public class CustomCommonPoolSize {
    public void testParallelOperation() {
        long start = System.currentTimeMillis();
        IntStream s = IntStream.range(0, 20);
        System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
        s.parallel().forEach(i -> {
            try {
                Thread.sleep(100);
            } catch (Exception ignore) {}
            System.out.print((System.currentTimeMillis() - start) + " ms");
        });
        System.out.println("\nOverall time consumed: "+ (System.currentTimeMillis() - start)+" ms");
    }
}
Program output
192 192 192 192 192 192 192 192 192 192 192 192 192 192 192 192 192 192 192 192
Overall time consumed: 193 ms

Se see that all 20 tasks run in parallel and this the overall time is just 193 ms, even if individual task was taking 192ms each.

Run the parallel() operation inside a custom ForkJoinPool

There actually is a trick how to execute a parallel operation in a specific fork-join pool. If you execute it as a task in a fork-join pool, it stays there and does not use the common one. The trick is based on ForkJoinTask.

Fork documentation which specifies:

Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not in ForkJoinPool()

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;

import static java.lang.Math.sqrt;
import static java.util.stream.Collectors.toList;

class StreamExampleJava8 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool(4); // Configure the number of threads
        forkJoinPool.submit(() -> IntStream.range(1, 1_000_000)
                .parallel()
                .filter(StreamExampleJava8::isPrime).boxed()
                .collect(toList()))
                .get();
        forkJoinPool.shutdown();
    }

    private static boolean isPrime(long n) {
        return n > 1 && IntStream.rangeClosed(2, (int) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

In above code block, fork join pool will create 4 threads and run the parallel operations inside this custom fork join pool.

That’s all for this tutorial.


Top articles in this category:
  1. Custom Thread pool implementation in Java
  2. How will you implement your custom threadsafe Semaphore in Java
  3. Count word frequency in Java
  4. How will you increment each element of an Integer array, using parallel operation
  5. Diamond Problem of Inheritance in Java 8
  6. Can two threads call two different synchronized instance methods of an Object?
  7. ThreadLocal with examples in Java

Find more on this topic:

Recommended books for interview preparation:
Buy interview books

Java & Microservices interview refresher for experienced developers.