From 6fbb8980593821decdcccb8fc0a27bad2c1f2d64 Mon Sep 17 00:00:00 2001 From: "emanuel.trandafir" Date: Sat, 15 Jul 2023 19:53:51 +0200 Subject: [PATCH] BAEL-6578: CompletableFuture allOf.join() vs join() --- .../CompletableFutureAllOffUnitTest.java | 112 ++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 core-java-modules/core-java-concurrency-basic-3/src/test/java/com/baeldung/concurrent/completablefuture/allofvsjoin/CompletableFutureAllOffUnitTest.java diff --git a/core-java-modules/core-java-concurrency-basic-3/src/test/java/com/baeldung/concurrent/completablefuture/allofvsjoin/CompletableFutureAllOffUnitTest.java b/core-java-modules/core-java-concurrency-basic-3/src/test/java/com/baeldung/concurrent/completablefuture/allofvsjoin/CompletableFutureAllOffUnitTest.java new file mode 100644 index 0000000000..7ff0831c5a --- /dev/null +++ b/core-java-modules/core-java-concurrency-basic-3/src/test/java/com/baeldung/concurrent/completablefuture/allofvsjoin/CompletableFutureAllOffUnitTest.java @@ -0,0 +1,112 @@ +package com.baeldung.concurrent.completablefuture.allofvsjoin; + +import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.time.LocalDateTime; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Stream; + +import org.junit.jupiter.api.Test; + +public class CompletableFutureAllOffUnitTest { + + @Test + void whenCallingJoin_thenBlocksThreadAndGetValue() { + CompletableFuture future = waitAndReturn(1_000, "Harry"); + assertEquals("Harry", future.join()); // waits one second + } + + @Test + void whenCallingJoin_thenBlocksThreadAndThrowException() { + CompletableFuture futureError = waitAndThrow(1_000); + assertThrows(RuntimeException.class, futureError::join); // waits one second + } + + @Test + void whenCallingJoinTwoTimes_thenBlocksThreadAndGetValues() { + CompletableFuture f1 = waitAndReturn(1_000, "Harry"); + CompletableFuture f2 = waitAndReturn(2_000, "Ron"); + + assertEquals("Harry", f1.join()); // waits for one second + assertEquals("Ron", f2.join()); // waits for one more second + } + + + @Test + void whenCallingAllOfJoin_thenBlocksThreadAndGetValues() { + CompletableFuture f1 = waitAndReturn(1_000, "Harry"); + CompletableFuture f2 = waitAndReturn(2_000, "Ron"); + + CompletableFuture combinedFutures = CompletableFuture.allOf(f1, f2); // does NOT wait + combinedFutures.join(); // waits for two seconds + + assertEquals("Harry", f1.join()); // does NOT wait + assertEquals("Ron", f2.join()); // does NOT wait + } + + @Test + void whenCallingJoinInaLoop_thenProcessesDataPartially() { + CompletableFuture f1 = waitAndReturn(1_000, "Harry"); + CompletableFuture f2 = waitAndThrow(2_000); + CompletableFuture f3 = waitAndReturn(1_000, "Ron"); + + assertThrows(RuntimeException.class, () -> Stream.of(f1, f2, f3) + .map(CompletableFuture::join) + .forEach(this::sayHello)); + } + + @Test + void whenCallingAllOfJoin_thenFailsForAll() { + CompletableFuture f1 = waitAndReturn(1_000, "Harry"); + CompletableFuture f2 = waitAndThrow(2_000); + CompletableFuture f3 = waitAndReturn(1_000, "Ron"); + + assertThrows(RuntimeException.class, () -> CompletableFuture.allOf(f1, f2, f3) + .join()); + } + + @Test + void whenCallingExceptionally_thenRecoversWithDefaultValue() { + CompletableFuture f1 = waitAndReturn(1_000, "Harry"); + CompletableFuture f2 = waitAndThrow(2_000); + CompletableFuture f3 = waitAndReturn(1_000, "Ron"); + + CompletableFuture names = CompletableFuture.allOf(f1, f2, f3) + .thenApply(__ -> f1.join() + "," + f2.join() + "," + f3.join()) + .exceptionally(err -> { + System.out.println("oops, there was a problem! " + err.getMessage()); + return "names not found!"; + }); + + assertEquals("names not found!", names.join()); + } + + private void sayHello(String name) { + System.out.println(LocalDateTime.now() + " - " + name); + } + + private CompletableFuture waitAndReturn(long millis, String value) { + return CompletableFuture.supplyAsync(() -> { + try { + // Thread.sleep() is commented to avoid slowing down the pipeline + // Thread.sleep(millis); + return value; + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + private CompletableFuture waitAndThrow(long millis) { + return CompletableFuture.supplyAsync(() -> { + try { + // Thread.sleep() is commented to avoid slowing down the pipeline + // Thread.sleep(millis); + } finally { + throw new RuntimeException(); + } + }); + } + +}