From dbbd2a0a12872f8599ea312b24ffd02e36352750 Mon Sep 17 00:00:00 2001 From: emanueltrandafir1993 Date: Sat, 1 Jul 2023 13:24:01 +0200 Subject: [PATCH 1/2] BAEL-6579: completable future's thread pool --- .../threadpool/CustomCompletableFuture.java | 28 +++++++ .../CompletableFutureThreadPoolUnitTest.java | 82 +++++++++++++++++++ 2 files changed, 110 insertions(+) create mode 100644 core-java-modules/core-java-concurrency-basic-3/src/main/java/com/baeldung/concurrent/completablefuture/threadpool/CustomCompletableFuture.java create mode 100644 core-java-modules/core-java-concurrency-basic-3/src/test/java/com/baeldung/concurrent/completablefuture/threadpool/CompletableFutureThreadPoolUnitTest.java diff --git a/core-java-modules/core-java-concurrency-basic-3/src/main/java/com/baeldung/concurrent/completablefuture/threadpool/CustomCompletableFuture.java b/core-java-modules/core-java-concurrency-basic-3/src/main/java/com/baeldung/concurrent/completablefuture/threadpool/CustomCompletableFuture.java new file mode 100644 index 0000000000..1f3997768e --- /dev/null +++ b/core-java-modules/core-java-concurrency-basic-3/src/main/java/com/baeldung/concurrent/completablefuture/threadpool/CustomCompletableFuture.java @@ -0,0 +1,28 @@ +package com.baeldung.concurrent.completablefuture.threadpool; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.function.Supplier; + +public class CustomCompletableFuture extends CompletableFuture { + private static final Executor executor = Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "Custom-Single-Thread")); + + public static CustomCompletableFuture supplyAsync(Supplier supplier) { + CustomCompletableFuture future = new CustomCompletableFuture<>(); + executor.execute(() -> { + try { + future.complete(supplier.get()); + } catch (Exception ex) { + future.completeExceptionally(ex); + } + }); + return future; + } + + @Override + public Executor defaultExecutor() { + return executor; + } + +} \ No newline at end of file diff --git a/core-java-modules/core-java-concurrency-basic-3/src/test/java/com/baeldung/concurrent/completablefuture/threadpool/CompletableFutureThreadPoolUnitTest.java b/core-java-modules/core-java-concurrency-basic-3/src/test/java/com/baeldung/concurrent/completablefuture/threadpool/CompletableFutureThreadPoolUnitTest.java new file mode 100644 index 0000000000..4f94f36131 --- /dev/null +++ b/core-java-modules/core-java-concurrency-basic-3/src/test/java/com/baeldung/concurrent/completablefuture/threadpool/CompletableFutureThreadPoolUnitTest.java @@ -0,0 +1,82 @@ +package com.baeldung.concurrent.completablefuture.threadpool; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +import org.junit.jupiter.api.Test; + +public class CompletableFutureThreadPoolUnitTest { + + @Test + void whenUsingNonAsync_thenUsesMainThread() { + CompletableFuture name = CompletableFuture.supplyAsync(() -> "Baeldung"); + + CompletableFuture nameLength = name.thenApply(value -> { + printCurrentThread(); + return value.length(); + }); + + assertThat(nameLength).isCompletedWithValue(8); + } + + @Test + void whenUsingNonAsync_thenUsesCallersThread() throws InterruptedException { + Runnable test = () -> { + CompletableFuture name = CompletableFuture.supplyAsync(() -> "Baeldung"); + + CompletableFuture nameLength = name.thenApply(value -> { + printCurrentThread(); + return value.length(); + }); + + assertThat(nameLength).isCompletedWithValue(8); + }; + + new Thread(test, "test-thread").start(); + Thread.sleep(100l); + } + + @Test + void whenUsingAsync_thenUsesCommonPool() { + CompletableFuture name = CompletableFuture.supplyAsync(() -> "Baeldung"); + + CompletableFuture nameLength = name.thenApplyAsync(value -> { + printCurrentThread(); + return value.length(); + }); + + assertThat(nameLength).isCompletedWithValue(8); + } + + @Test + void whenUsingAsync_thenUsesCustomExecutor() { + Executor testExecutor = Executors.newFixedThreadPool(5); + CompletableFuture name = CompletableFuture.supplyAsync(() -> "Baeldung"); + + CompletableFuture nameLength = name.thenApplyAsync(value -> { + printCurrentThread(); + return value.length(); + }, testExecutor); + + assertThat(nameLength).isCompletedWithValue(8); + } + + @Test + void whenOverridingDefaultThreadPool_thenUsesCustomExecutor() { + CompletableFuture name = CustomCompletableFuture.supplyAsync(() -> "Baeldung"); + + CompletableFuture nameLength = name.thenApplyAsync(value -> { + printCurrentThread(); + return value.length(); + }); + + assertThat(nameLength).isCompletedWithValue(8); + } + + private static void printCurrentThread() { + System.out.println(Thread.currentThread().getName()); + } +} From 09c62a2d700bc7b92e110742c676812ae6a28d57 Mon Sep 17 00:00:00 2001 From: emanueltrandafir1993 Date: Sat, 1 Jul 2023 13:33:40 +0200 Subject: [PATCH 2/2] BAEL-6579: fixing tests --- .../CompletableFutureThreadPoolUnitTest.java | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/core-java-modules/core-java-concurrency-basic-3/src/test/java/com/baeldung/concurrent/completablefuture/threadpool/CompletableFutureThreadPoolUnitTest.java b/core-java-modules/core-java-concurrency-basic-3/src/test/java/com/baeldung/concurrent/completablefuture/threadpool/CompletableFutureThreadPoolUnitTest.java index 4f94f36131..9deadfb906 100644 --- a/core-java-modules/core-java-concurrency-basic-3/src/test/java/com/baeldung/concurrent/completablefuture/threadpool/CompletableFutureThreadPoolUnitTest.java +++ b/core-java-modules/core-java-concurrency-basic-3/src/test/java/com/baeldung/concurrent/completablefuture/threadpool/CompletableFutureThreadPoolUnitTest.java @@ -1,8 +1,10 @@ package com.baeldung.concurrent.completablefuture.threadpool; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -11,7 +13,7 @@ import org.junit.jupiter.api.Test; public class CompletableFutureThreadPoolUnitTest { @Test - void whenUsingNonAsync_thenUsesMainThread() { + void whenUsingNonAsync_thenUsesMainThread() throws ExecutionException, InterruptedException { CompletableFuture name = CompletableFuture.supplyAsync(() -> "Baeldung"); CompletableFuture nameLength = name.thenApply(value -> { @@ -19,7 +21,7 @@ public class CompletableFutureThreadPoolUnitTest { return value.length(); }); - assertThat(nameLength).isCompletedWithValue(8); + assertThat(nameLength.get()).isEqualTo(8); } @Test @@ -32,7 +34,11 @@ public class CompletableFutureThreadPoolUnitTest { return value.length(); }); - assertThat(nameLength).isCompletedWithValue(8); + try { + assertThat(nameLength.get()).isEqualTo(8); + } catch (Exception e) { + fail(e.getMessage()); + } }; new Thread(test, "test-thread").start(); @@ -40,7 +46,7 @@ public class CompletableFutureThreadPoolUnitTest { } @Test - void whenUsingAsync_thenUsesCommonPool() { + void whenUsingAsync_thenUsesCommonPool() throws ExecutionException, InterruptedException { CompletableFuture name = CompletableFuture.supplyAsync(() -> "Baeldung"); CompletableFuture nameLength = name.thenApplyAsync(value -> { @@ -48,11 +54,11 @@ public class CompletableFutureThreadPoolUnitTest { return value.length(); }); - assertThat(nameLength).isCompletedWithValue(8); + assertThat(nameLength.get()).isEqualTo(8); } @Test - void whenUsingAsync_thenUsesCustomExecutor() { + void whenUsingAsync_thenUsesCustomExecutor() throws ExecutionException, InterruptedException { Executor testExecutor = Executors.newFixedThreadPool(5); CompletableFuture name = CompletableFuture.supplyAsync(() -> "Baeldung"); @@ -61,11 +67,11 @@ public class CompletableFutureThreadPoolUnitTest { return value.length(); }, testExecutor); - assertThat(nameLength).isCompletedWithValue(8); + assertThat(nameLength.get()).isEqualTo(8); } @Test - void whenOverridingDefaultThreadPool_thenUsesCustomExecutor() { + void whenOverridingDefaultThreadPool_thenUsesCustomExecutor() throws ExecutionException, InterruptedException { CompletableFuture name = CustomCompletableFuture.supplyAsync(() -> "Baeldung"); CompletableFuture nameLength = name.thenApplyAsync(value -> { @@ -73,7 +79,7 @@ public class CompletableFutureThreadPoolUnitTest { return value.length(); }); - assertThat(nameLength).isCompletedWithValue(8); + assertThat(nameLength.get()).isEqualTo(8); } private static void printCurrentThread() {