From c343ce694a460e285af3558538b2129e60358f4a Mon Sep 17 00:00:00 2001 From: Jason Date: Sun, 27 Jun 2021 22:33:38 +0530 Subject: [PATCH] BAEL-3759: Guava's Futures and ListenableFuture * Added new module guava-modules/guava-concurrency * Added implementation code for simple usages of ListenableFuture * Added implementation code for complex usages of ListenableFuture --- guava-modules/guava-concurrency/pom.xml | 14 + .../guava/future/ListenableFutureService.java | 105 +++++++ .../exception/ListenableFutureException.java | 4 + .../ListenableFutureComplexUnitTest.java | 277 ++++++++++++++++++ .../ListenableFutureSimpleUnitTest.java | 122 ++++++++ guava-modules/pom.xml | 1 + 6 files changed, 523 insertions(+) create mode 100644 guava-modules/guava-concurrency/pom.xml create mode 100644 guava-modules/guava-concurrency/src/main/java/com/baeldung/guava/future/ListenableFutureService.java create mode 100644 guava-modules/guava-concurrency/src/main/java/com/baeldung/guava/future/exception/ListenableFutureException.java create mode 100644 guava-modules/guava-concurrency/src/test/java/com/baeldung/guava/future/ListenableFutureComplexUnitTest.java create mode 100644 guava-modules/guava-concurrency/src/test/java/com/baeldung/guava/future/ListenableFutureSimpleUnitTest.java diff --git a/guava-modules/guava-concurrency/pom.xml b/guava-modules/guava-concurrency/pom.xml new file mode 100644 index 0000000000..ef7f756596 --- /dev/null +++ b/guava-modules/guava-concurrency/pom.xml @@ -0,0 +1,14 @@ + + + + guava-modules + com.baeldung + 0.0.1-SNAPSHOT + + 4.0.0 + + guava-concurrency + + \ No newline at end of file diff --git a/guava-modules/guava-concurrency/src/main/java/com/baeldung/guava/future/ListenableFutureService.java b/guava-modules/guava-concurrency/src/main/java/com/baeldung/guava/future/ListenableFutureService.java new file mode 100644 index 0000000000..b6620bd1e2 --- /dev/null +++ b/guava-modules/guava-concurrency/src/main/java/com/baeldung/guava/future/ListenableFutureService.java @@ -0,0 +1,105 @@ +package com.baeldung.guava.future; + +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import com.baeldung.guava.future.exception.ListenableFutureException; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableFutureTask; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +public class ListenableFutureService { + + private final ListeningExecutorService lExecService; + + public ListenableFutureService() { + this.lExecService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + } + + public ListenableFutureService(ListeningExecutorService lExecService) { + this.lExecService = lExecService; + } + + public ListenableFuture fetchConfig(String configKey) { + return lExecService.submit(() -> { + TimeUnit.MILLISECONDS.sleep(500); + return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE)); + }); + } + + public FutureTask fetchConfigTask(String configKey) { + return new FutureTask<>(() -> { + TimeUnit.MILLISECONDS.sleep(500); + return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE)); + }); + } + + public ListenableFutureTask fetchConfigListenableTask(String configKey) { + return ListenableFutureTask.create(() -> { + TimeUnit.MILLISECONDS.sleep(500); + return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE)); + }); + } + + public ListenableFuture succeedingTask() { + return Futures.immediateFuture(new Random().nextInt(Integer.MAX_VALUE)); + } + + public ListenableFuture failingTask() { + return Futures.immediateFailedFuture(new ListenableFutureException()); + } + + public ListenableFuture getCartId() { + return lExecService.submit(() -> { + TimeUnit.MILLISECONDS.sleep(500); + return new Random().nextInt(Integer.MAX_VALUE); + }); + } + + public ListenableFuture getCustomerName() { + String[] names = new String[] { "Mark", "Jane", "June" }; + return lExecService.submit(() -> { + TimeUnit.MILLISECONDS.sleep(500); + return names[new Random().nextInt(names.length)]; + }); + } + + public ListenableFuture> getCartItems() { + String[] items = new String[] { "Apple", "Orange", "Mango", "Pineapple" }; + return lExecService.submit(() -> { + TimeUnit.MILLISECONDS.sleep(500); + + int noOfItems = new Random().nextInt(items.length); + if (noOfItems == 0) ++noOfItems; + + return Arrays.stream(items, 0, noOfItems).collect(Collectors.toList()); + }); + } + + public ListenableFuture generateUsername(String firstName) { + return lExecService.submit(() -> { + TimeUnit.MILLISECONDS.sleep(500); + return firstName.replaceAll("[^a-zA-Z]+","") + .concat("@service.com"); + }); + } + + public ListenableFuture generatePassword(String username) { + return lExecService.submit(() -> { + TimeUnit.MILLISECONDS.sleep(500); + if (username.contains("@")) { + String[] parts = username.split("@"); + return parts[0] + "123@" + parts[1]; + } else { + return username + "123"; + } + }); + } +} \ No newline at end of file diff --git a/guava-modules/guava-concurrency/src/main/java/com/baeldung/guava/future/exception/ListenableFutureException.java b/guava-modules/guava-concurrency/src/main/java/com/baeldung/guava/future/exception/ListenableFutureException.java new file mode 100644 index 0000000000..921c02b54a --- /dev/null +++ b/guava-modules/guava-concurrency/src/main/java/com/baeldung/guava/future/exception/ListenableFutureException.java @@ -0,0 +1,4 @@ +package com.baeldung.guava.future.exception; + +public class ListenableFutureException extends Exception { +} diff --git a/guava-modules/guava-concurrency/src/test/java/com/baeldung/guava/future/ListenableFutureComplexUnitTest.java b/guava-modules/guava-concurrency/src/test/java/com/baeldung/guava/future/ListenableFutureComplexUnitTest.java new file mode 100644 index 0000000000..27a1cc6592 --- /dev/null +++ b/guava-modules/guava-concurrency/src/test/java/com/baeldung/guava/future/ListenableFutureComplexUnitTest.java @@ -0,0 +1,277 @@ +package com.baeldung.guava.future; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.UnaryOperator; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.jupiter.api.Test; + +import com.baeldung.guava.future.exception.ListenableFutureException; +import com.google.common.base.Function; +import com.google.common.util.concurrent.AsyncCallable; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +public class ListenableFutureComplexUnitTest { + + @Test + public void givenAllSucceedingTasks_whenAllAsList_thenAllSuccess() { + final ListeningExecutorService listeningExecService = MoreExecutors.newDirectExecutorService(); + final ListenableFutureService service = new ListenableFutureService(listeningExecService); + + ListenableFuture task1 = service.fetchConfig("config.0"); + ListenableFuture task2 = service.fetchConfig("config.1"); + ListenableFuture task3 = service.fetchConfig("config.2"); + + ListenableFuture> configsTask = Futures.allAsList(task1, task2, task3); + Futures.addCallback(configsTask, new FutureCallback>() { + @Override + public void onSuccess(@Nullable List configResults) { + assertNotNull(configResults); + assertEquals(3, configResults.size()); + for (int i = 0; i < 3; i++) { + assertTrue(configResults.get(i) + .contains("config." + i)); + } + } + + @Override + public void onFailure(Throwable t) { + fail("Unexpected failure detected", t); + } + }, listeningExecService); + } + + @Test + public void givenOneFailingTask_whenAllAsList_thenFailure() { + final ListeningExecutorService listeningExecService = MoreExecutors.newDirectExecutorService(); + final ListenableFutureService service = new ListenableFutureService(listeningExecService); + + ListenableFuture task1 = service.fetchConfig("config.0"); + ListenableFuture task2 = service.failingTask(); + ListenableFuture task3 = service.fetchConfig("config.2"); + + ListenableFuture> configsTask = Futures.allAsList(task1, task2, task3); + Futures.addCallback(configsTask, new FutureCallback>() { + @Override + public void onSuccess(@Nullable List configResults) { + fail("Expected a failed future"); + } + + @Override + public void onFailure(Throwable t) { + assertTrue(t instanceof ListenableFutureException); + } + }, listeningExecService); + } + + @Test + public void givenOneFailingTask_whenSuccessfulAsList_thenSomeSuccess() { + final ListeningExecutorService listeningExecService = MoreExecutors.newDirectExecutorService(); + final ListenableFutureService service = new ListenableFutureService(listeningExecService); + + ListenableFuture task1 = service.fetchConfig("config.0"); + ListenableFuture task2 = service.failingTask(); + ListenableFuture task3 = service.fetchConfig("config.2"); + + ListenableFuture> configsTask = Futures.successfulAsList(task1, task2, task3); + + Futures.addCallback(configsTask, new FutureCallback>() { + @Override + public void onSuccess(@Nullable List configResults) { + assertNotNull(configResults); + assertTrue(configResults.get(0).contains("config.0")); + assertNull(configResults.get(1)); + assertTrue(configResults.get(2).contains("config.2")); + } + + @Override + public void onFailure(Throwable t) { + fail("Unexpected failure detected", t); + } + }, listeningExecService); + } + + @Test + public void givenAllSucceedingTasks_whenAllSucceed_thenSuccess() { + ListeningExecutorService listeningExecService = MoreExecutors.newDirectExecutorService(); + ListenableFutureService service = new ListenableFutureService(listeningExecService); + + ListenableFuture cartIdTask = service.getCartId(); + ListenableFuture customerNameTask = service.getCustomerName(); + ListenableFuture> cartItemsTask = service.getCartItems(); + + ListenableFuture cartInfoTask = Futures.whenAllSucceed(cartIdTask, customerNameTask, cartItemsTask) + .call(() -> { + int cartId = Futures.getDone(cartIdTask); + String customerName = Futures.getDone(customerNameTask); + List cartItems = Futures.getDone(cartItemsTask); + return new CartInfo(cartId, customerName, cartItems); + }, listeningExecService); + + Futures.addCallback(cartInfoTask, new FutureCallback() { + @Override + public void onSuccess(@Nullable CartInfo result) { + assertNotNull(result); + assertTrue(result.cartId >= 0); + assertFalse(result.customerName.isEmpty()); + assertFalse(result.cartItems.isEmpty()); + } + + @Override + public void onFailure(Throwable t) { + fail("Unexpected failure detected", t); + } + }, listeningExecService); + } + + @Test + public void givenAllSucceedingTasks_whenAllComplete_thenSomeSuccess() { + ListeningExecutorService listeningExecService = MoreExecutors.newDirectExecutorService(); + ListenableFutureService service = new ListenableFutureService(listeningExecService); + + ListenableFuture cartIdTask = service.getCartId(); + ListenableFuture customerNameTask = service.failingTask(); + ListenableFuture> cartItemsTask = service.getCartItems(); + + ListenableFuture cartInfoTask = Futures.whenAllComplete(cartIdTask, customerNameTask, cartItemsTask) + .call(() -> { + Integer cartId = getOrNull(cartIdTask); + String customerName = getOrNull(customerNameTask); + List cartItems = getOrNull(cartItemsTask); + return new CartInfo(cartId, customerName, cartItems); + }, listeningExecService); + + Futures.addCallback(cartInfoTask, new FutureCallback() { + @Override + public void onSuccess(@Nullable CartInfo result) { + assertNotNull(result); + assertTrue(result.cartId >= 0); + assertNull(result.customerName); + assertFalse(result.cartItems.isEmpty()); + } + + @Override + public void onFailure(Throwable t) { + fail("Unexpected failure detected", t); + } + }, listeningExecService); + } + + @Test + public void whenTransform_thenTransformSuccess() { + ListeningExecutorService listenExecService = MoreExecutors.newDirectExecutorService(); + ListenableFutureService service = new ListenableFutureService(listenExecService); + + ListenableFuture> cartItemsTask = service.getCartItems(); + + Function, Integer> itemCountFunc = cartItems -> { + assertNotNull(cartItems); + return cartItems.size(); + }; + + ListenableFuture itemCountTask = Futures.transform(cartItemsTask, itemCountFunc, listenExecService); + + Futures.addCallback(itemCountTask, new FutureCallback() { + @Override + public void onSuccess(@Nullable Integer cartItemCount) { + assertNotNull(cartItemCount); + assertTrue(cartItemCount > 0); + } + + @Override + public void onFailure(Throwable t) { + fail("Unexpected failure detected", t); + } + }, listenExecService); + } + + @Test + public void whenSubmitAsync_thenSuccess() { + ListeningExecutorService executor = MoreExecutors.newDirectExecutorService(); + ListenableFutureService service = new ListenableFutureService(executor); + + AsyncCallable asyncConfigTask = () -> { + ListenableFuture configTask = service.fetchConfig("config.a"); + TimeUnit.MILLISECONDS.sleep(500); //some long running task + return configTask; + }; + + ListenableFuture configTask = Futures.submitAsync(asyncConfigTask, executor); + + Futures.addCallback(configTask, new FutureCallback() { + @Override + public void onSuccess(@Nullable String result) { + assertNotNull(result); + assertTrue(result.contains("config.a")); + } + + @Override + public void onFailure(Throwable t) { + fail("Unexpected failure detected", t); + } + }, executor); + } + + @Test + public void whenAsyncTransform_thenSuccess() { + ListeningExecutorService executor = MoreExecutors.newDirectExecutorService(); + ListenableFutureService service = new ListenableFutureService(executor); + + ListenableFuture usernameTask = service.generateUsername("john"); + AsyncFunction passwordFunc = username -> { + ListenableFuture generatePasswordTask = service.generatePassword(username); + TimeUnit.MILLISECONDS.sleep(500); // some long running task + return generatePasswordTask; + }; + + ListenableFuture passwordTask = Futures.transformAsync(usernameTask, passwordFunc, executor); + + Futures.addCallback(passwordTask, new FutureCallback() { + @Override + public void onSuccess(@Nullable String password) { + assertNotNull(password); + assertTrue(password.contains("john")); + assertTrue(password.contains("@")); + } + + @Override + public void onFailure(Throwable t) { + fail("Unexpected failure detected", t); + } + }, executor); + } + + private static T getOrNull(ListenableFuture future) { + try { + return Futures.getDone(future); + } catch (ExecutionException e) { + return null; + } + } + + static class CartInfo { + Integer cartId; + String customerName; + List cartItems; + + public CartInfo(Integer cartId, String customerName, List cartItems) { + this.cartId = cartId; + this.customerName = customerName; + this.cartItems = cartItems; + } + } +} \ No newline at end of file diff --git a/guava-modules/guava-concurrency/src/test/java/com/baeldung/guava/future/ListenableFutureSimpleUnitTest.java b/guava-modules/guava-concurrency/src/test/java/com/baeldung/guava/future/ListenableFutureSimpleUnitTest.java new file mode 100644 index 0000000000..7dce11a33f --- /dev/null +++ b/guava-modules/guava-concurrency/src/test/java/com/baeldung/guava/future/ListenableFutureSimpleUnitTest.java @@ -0,0 +1,122 @@ +package com.baeldung.guava.future; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; + +import com.baeldung.guava.future.exception.ListenableFutureException; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableFutureTask; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +public class ListenableFutureSimpleUnitTest { + + @Test + public void whenSubmitToListeningExecutor_thenSuccess() throws ExecutionException, InterruptedException { + ExecutorService execService = Executors.newSingleThreadExecutor(); + ListeningExecutorService listeningExecService = MoreExecutors.listeningDecorator(execService); + + ListenableFuture asyncTask = listeningExecService.submit(() -> { + TimeUnit.MILLISECONDS.sleep(500); // long running task + return 5; + }); + + assertEquals(5, asyncTask.get()); + } + + @Test + public void + givenJavaExecutor_whenSubmitListeningTask_thenSuccess() throws ExecutionException, InterruptedException { + Executor executor = Executors.newSingleThreadExecutor(); + ListenableFutureService service = new ListenableFutureService(); + + FutureTask configFuture = service.fetchConfigTask("future.value"); + executor.execute(configFuture); + assertTrue(configFuture.get().contains("future.value")); + + ListenableFutureTask configListenableFuture = + service.fetchConfigListenableTask("listenable.value"); + executor.execute(configListenableFuture); + assertTrue(configListenableFuture.get().contains("listenable.value")); + } + + @Test + public void givenNonFailingTask_whenCallbackListen_thenSuccess() { + Executor listeningExecutor = MoreExecutors.directExecutor(); + + ListenableFuture succeedingTask = new ListenableFutureService().succeedingTask(); + Futures.addCallback(succeedingTask, new FutureCallback() { + @Override + public void onSuccess(Integer result) { + assertNotNull(result); + assertTrue(result >= 0); + } + + @Override + public void onFailure(Throwable t) { + fail("Succeeding task cannot failed", t); + } + }, listeningExecutor); + } + + @Test + public void givenFailingTask_whenCallbackListen_thenThrows() { + Executor listeningExecutor = MoreExecutors.directExecutor(); + + ListenableFuture failingTask = new ListenableFutureService().failingTask(); + Futures.addCallback(failingTask, new FutureCallback() { + @Override + public void onSuccess(Integer result) { + fail("Failing task cannot succeed"); + } + + @Override + public void onFailure(Throwable t) { + assertTrue(t instanceof ListenableFutureException); + } + }, listeningExecutor); + } + + @Test + public void givenNonFailingTask_whenDirectListen_thenListenerExecutes() { + Executor listeningExecutor = MoreExecutors.directExecutor(); + + int nextTask = 1; + Set runningTasks = ConcurrentHashMap.newKeySet(); + runningTasks.add(nextTask); + + ListenableFuture nonFailingTask = new ListenableFutureService().succeedingTask(); + nonFailingTask.addListener(() -> runningTasks.remove(nextTask), listeningExecutor); + + assertTrue(runningTasks.isEmpty()); + } + + @Test + public void givenFailingTask_whenDirectListen_thenListenerExecutes() { + final Executor listeningExecutor = MoreExecutors.directExecutor(); + + int nextTask = 1; + Set runningTasks = ConcurrentHashMap.newKeySet(); + runningTasks.add(nextTask); + + final ListenableFuture failingTask = new ListenableFutureService().failingTask(); + failingTask.addListener(() -> runningTasks.remove(nextTask),listeningExecutor); + + assertTrue(runningTasks.isEmpty()); + } +} \ No newline at end of file diff --git a/guava-modules/pom.xml b/guava-modules/pom.xml index 957b8ad166..8ffac98b51 100644 --- a/guava-modules/pom.xml +++ b/guava-modules/pom.xml @@ -24,6 +24,7 @@ guava-collections-list guava-collections-map guava-collections-set + guava-concurrency guava-io