From 43ba5e80a42c07b811e2f80343b87cb9b5475caf Mon Sep 17 00:00:00 2001 From: "thibault.faure" Date: Thu, 13 Oct 2022 01:38:18 +0200 Subject: [PATCH] BAEL-58616 Code for the Check If All Runnable Are Done article --- ...ompletionCheckerWithCompletableFuture.java | 46 +++++++++++++++++ ...mpletionCheckerWithThreadPoolExecutor.java | 51 +++++++++++++++++++ 2 files changed, 97 insertions(+) create mode 100644 core-java-modules/core-java-concurrency-2/src/main/java/com/baeldung/donerunnables/RunnableCompletionCheckerWithCompletableFuture.java create mode 100644 core-java-modules/core-java-concurrency-2/src/main/java/com/baeldung/donerunnables/RunnableCompletionCheckerWithThreadPoolExecutor.java diff --git a/core-java-modules/core-java-concurrency-2/src/main/java/com/baeldung/donerunnables/RunnableCompletionCheckerWithCompletableFuture.java b/core-java-modules/core-java-concurrency-2/src/main/java/com/baeldung/donerunnables/RunnableCompletionCheckerWithCompletableFuture.java new file mode 100644 index 0000000000..dc46ca997b --- /dev/null +++ b/core-java-modules/core-java-concurrency-2/src/main/java/com/baeldung/donerunnables/RunnableCompletionCheckerWithCompletableFuture.java @@ -0,0 +1,46 @@ +package com.baeldung.donerunnables; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RunnableCompletionCheckerWithCompletableFuture { + + private static final Logger LOGGER = LoggerFactory.getLogger(RunnableCompletionCheckerWithCompletableFuture.class); + private static final int NUMBER_OF_RUNNABLES = 5; + private static final int PAUSE_MILLIS = 1000; + + private static Runnable RUNNABLE = () -> { + try { + LOGGER.info("launching runnable"); + Thread.sleep(PAUSE_MILLIS); + } catch (InterruptedException e) { + } + }; + + public static void main(String args[]) throws InterruptedException { + List runnables = IntStream.range(0, NUMBER_OF_RUNNABLES) + .mapToObj(x -> RUNNABLE) + .collect(Collectors.toList()); + CompletableFuture[] completableFutures = runAsynchronousTasks(runnables); + LOGGER.info("Right after the creation of the completable future array, every completable future is done: {}", isEveryCompletableFutureDone(completableFutures)); + Thread.sleep((NUMBER_OF_RUNNABLES + 1) * PAUSE_MILLIS); + LOGGER.info("After {} seconds, every completable future is done: {}", NUMBER_OF_RUNNABLES + 1, isEveryCompletableFutureDone(completableFutures)); + } + + public static CompletableFuture[] runAsynchronousTasks(List runnables) { + return runnables.stream() + .map(CompletableFuture::runAsync) + .toArray(CompletableFuture[]::new); + } + + public static boolean isEveryCompletableFutureDone(CompletableFuture[] completableFutures) { + return CompletableFuture.allOf(completableFutures) + .isDone(); + } + +} diff --git a/core-java-modules/core-java-concurrency-2/src/main/java/com/baeldung/donerunnables/RunnableCompletionCheckerWithThreadPoolExecutor.java b/core-java-modules/core-java-concurrency-2/src/main/java/com/baeldung/donerunnables/RunnableCompletionCheckerWithThreadPoolExecutor.java new file mode 100644 index 0000000000..17a13a1c19 --- /dev/null +++ b/core-java-modules/core-java-concurrency-2/src/main/java/com/baeldung/donerunnables/RunnableCompletionCheckerWithThreadPoolExecutor.java @@ -0,0 +1,51 @@ +package com.baeldung.donerunnables; + +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RunnableCompletionCheckerWithThreadPoolExecutor { + + private static final Logger LOGGER = LoggerFactory.getLogger(RunnableCompletionCheckerWithCompletableFuture.class); + private static final int NUMBER_OF_RUNNABLES = 5; + private static final int PAUSE_MILLIS = 1000; + private static final int NUMBER_OF_THREADS = 5; + + private static Runnable RUNNABLE = () -> { + try { + LOGGER.info("launching runnable"); + Thread.sleep(PAUSE_MILLIS); + } catch (InterruptedException e) { + } + }; + + public static void main(String args[]) throws InterruptedException { + List runnables = IntStream.range(0, NUMBER_OF_RUNNABLES) + .mapToObj(x -> RUNNABLE) + .collect(Collectors.toList()); + ThreadPoolExecutor executor = createThreadPoolExecutor(runnables); + executor.shutdown(); + LOGGER.info("After a timeout of 0 seconds, every Runnable is done: {}", isEveryRunnableDone(executor, 0)); + Thread.sleep(100); + LOGGER.info("After a timeout of 100 milliseconds, every Runnable is done: {}", isEveryRunnableDone(executor, 100)); + Thread.sleep(2000); + LOGGER.info("After a timeout of 2 seconds, every Runnable is done: {}", isEveryRunnableDone(executor, 1500)); + } + + public static ThreadPoolExecutor createThreadPoolExecutor(List runnables) { + ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(NUMBER_OF_THREADS); + runnables.forEach(executor::execute); + return executor; + } + + public static boolean isEveryRunnableDone(ThreadPoolExecutor executor, int timeout) throws InterruptedException { + return executor.awaitTermination(timeout, TimeUnit.MILLISECONDS); + } + +}