BAEL-3759: Guava's Futures and ListenableFuture

* Added new module guava-modules/guava-concurrency
 * Added implementation code for simple usages of ListenableFuture
 * Added implementation code for complex usages of ListenableFuture
This commit is contained in:
Jason 2021-06-27 22:33:38 +05:30
parent fe8ec4460c
commit c343ce694a
6 changed files with 523 additions and 0 deletions

View File

@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>guava-modules</artifactId>
<groupId>com.baeldung</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>guava-concurrency</artifactId>
</project>

View File

@ -0,0 +1,105 @@
package com.baeldung.guava.future;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.baeldung.guava.future.exception.ListenableFutureException;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
public class ListenableFutureService {
private final ListeningExecutorService lExecService;
public ListenableFutureService() {
this.lExecService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
}
public ListenableFutureService(ListeningExecutorService lExecService) {
this.lExecService = lExecService;
}
public ListenableFuture<String> fetchConfig(String configKey) {
return lExecService.submit(() -> {
TimeUnit.MILLISECONDS.sleep(500);
return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
});
}
public FutureTask<String> fetchConfigTask(String configKey) {
return new FutureTask<>(() -> {
TimeUnit.MILLISECONDS.sleep(500);
return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
});
}
public ListenableFutureTask<String> fetchConfigListenableTask(String configKey) {
return ListenableFutureTask.create(() -> {
TimeUnit.MILLISECONDS.sleep(500);
return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
});
}
public ListenableFuture<Integer> succeedingTask() {
return Futures.immediateFuture(new Random().nextInt(Integer.MAX_VALUE));
}
public <T> ListenableFuture<T> failingTask() {
return Futures.immediateFailedFuture(new ListenableFutureException());
}
public ListenableFuture<Integer> getCartId() {
return lExecService.submit(() -> {
TimeUnit.MILLISECONDS.sleep(500);
return new Random().nextInt(Integer.MAX_VALUE);
});
}
public ListenableFuture<String> getCustomerName() {
String[] names = new String[] { "Mark", "Jane", "June" };
return lExecService.submit(() -> {
TimeUnit.MILLISECONDS.sleep(500);
return names[new Random().nextInt(names.length)];
});
}
public ListenableFuture<List<String>> getCartItems() {
String[] items = new String[] { "Apple", "Orange", "Mango", "Pineapple" };
return lExecService.submit(() -> {
TimeUnit.MILLISECONDS.sleep(500);
int noOfItems = new Random().nextInt(items.length);
if (noOfItems == 0) ++noOfItems;
return Arrays.stream(items, 0, noOfItems).collect(Collectors.toList());
});
}
public ListenableFuture<String> generateUsername(String firstName) {
return lExecService.submit(() -> {
TimeUnit.MILLISECONDS.sleep(500);
return firstName.replaceAll("[^a-zA-Z]+","")
.concat("@service.com");
});
}
public ListenableFuture<String> generatePassword(String username) {
return lExecService.submit(() -> {
TimeUnit.MILLISECONDS.sleep(500);
if (username.contains("@")) {
String[] parts = username.split("@");
return parts[0] + "123@" + parts[1];
} else {
return username + "123";
}
});
}
}

View File

@ -0,0 +1,4 @@
package com.baeldung.guava.future.exception;
public class ListenableFutureException extends Exception {
}

View File

@ -0,0 +1,277 @@
package com.baeldung.guava.future;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.UnaryOperator;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.junit.jupiter.api.Test;
import com.baeldung.guava.future.exception.ListenableFutureException;
import com.google.common.base.Function;
import com.google.common.util.concurrent.AsyncCallable;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
public class ListenableFutureComplexUnitTest {
@Test
public void givenAllSucceedingTasks_whenAllAsList_thenAllSuccess() {
final ListeningExecutorService listeningExecService = MoreExecutors.newDirectExecutorService();
final ListenableFutureService service = new ListenableFutureService(listeningExecService);
ListenableFuture<String> task1 = service.fetchConfig("config.0");
ListenableFuture<String> task2 = service.fetchConfig("config.1");
ListenableFuture<String> task3 = service.fetchConfig("config.2");
ListenableFuture<List<String>> configsTask = Futures.allAsList(task1, task2, task3);
Futures.addCallback(configsTask, new FutureCallback<List<String>>() {
@Override
public void onSuccess(@Nullable List<String> configResults) {
assertNotNull(configResults);
assertEquals(3, configResults.size());
for (int i = 0; i < 3; i++) {
assertTrue(configResults.get(i)
.contains("config." + i));
}
}
@Override
public void onFailure(Throwable t) {
fail("Unexpected failure detected", t);
}
}, listeningExecService);
}
@Test
public void givenOneFailingTask_whenAllAsList_thenFailure() {
final ListeningExecutorService listeningExecService = MoreExecutors.newDirectExecutorService();
final ListenableFutureService service = new ListenableFutureService(listeningExecService);
ListenableFuture<String> task1 = service.fetchConfig("config.0");
ListenableFuture<String> task2 = service.failingTask();
ListenableFuture<String> task3 = service.fetchConfig("config.2");
ListenableFuture<List<String>> configsTask = Futures.allAsList(task1, task2, task3);
Futures.addCallback(configsTask, new FutureCallback<List<String>>() {
@Override
public void onSuccess(@Nullable List<String> configResults) {
fail("Expected a failed future");
}
@Override
public void onFailure(Throwable t) {
assertTrue(t instanceof ListenableFutureException);
}
}, listeningExecService);
}
@Test
public void givenOneFailingTask_whenSuccessfulAsList_thenSomeSuccess() {
final ListeningExecutorService listeningExecService = MoreExecutors.newDirectExecutorService();
final ListenableFutureService service = new ListenableFutureService(listeningExecService);
ListenableFuture<String> task1 = service.fetchConfig("config.0");
ListenableFuture<String> task2 = service.failingTask();
ListenableFuture<String> task3 = service.fetchConfig("config.2");
ListenableFuture<List<String>> configsTask = Futures.successfulAsList(task1, task2, task3);
Futures.addCallback(configsTask, new FutureCallback<List<String>>() {
@Override
public void onSuccess(@Nullable List<String> configResults) {
assertNotNull(configResults);
assertTrue(configResults.get(0).contains("config.0"));
assertNull(configResults.get(1));
assertTrue(configResults.get(2).contains("config.2"));
}
@Override
public void onFailure(Throwable t) {
fail("Unexpected failure detected", t);
}
}, listeningExecService);
}
@Test
public void givenAllSucceedingTasks_whenAllSucceed_thenSuccess() {
ListeningExecutorService listeningExecService = MoreExecutors.newDirectExecutorService();
ListenableFutureService service = new ListenableFutureService(listeningExecService);
ListenableFuture<Integer> cartIdTask = service.getCartId();
ListenableFuture<String> customerNameTask = service.getCustomerName();
ListenableFuture<List<String>> cartItemsTask = service.getCartItems();
ListenableFuture<CartInfo> cartInfoTask = Futures.whenAllSucceed(cartIdTask, customerNameTask, cartItemsTask)
.call(() -> {
int cartId = Futures.getDone(cartIdTask);
String customerName = Futures.getDone(customerNameTask);
List<String> cartItems = Futures.getDone(cartItemsTask);
return new CartInfo(cartId, customerName, cartItems);
}, listeningExecService);
Futures.addCallback(cartInfoTask, new FutureCallback<CartInfo>() {
@Override
public void onSuccess(@Nullable CartInfo result) {
assertNotNull(result);
assertTrue(result.cartId >= 0);
assertFalse(result.customerName.isEmpty());
assertFalse(result.cartItems.isEmpty());
}
@Override
public void onFailure(Throwable t) {
fail("Unexpected failure detected", t);
}
}, listeningExecService);
}
@Test
public void givenAllSucceedingTasks_whenAllComplete_thenSomeSuccess() {
ListeningExecutorService listeningExecService = MoreExecutors.newDirectExecutorService();
ListenableFutureService service = new ListenableFutureService(listeningExecService);
ListenableFuture<Integer> cartIdTask = service.getCartId();
ListenableFuture<String> customerNameTask = service.failingTask();
ListenableFuture<List<String>> cartItemsTask = service.getCartItems();
ListenableFuture<CartInfo> cartInfoTask = Futures.whenAllComplete(cartIdTask, customerNameTask, cartItemsTask)
.call(() -> {
Integer cartId = getOrNull(cartIdTask);
String customerName = getOrNull(customerNameTask);
List<String> cartItems = getOrNull(cartItemsTask);
return new CartInfo(cartId, customerName, cartItems);
}, listeningExecService);
Futures.addCallback(cartInfoTask, new FutureCallback<CartInfo>() {
@Override
public void onSuccess(@Nullable CartInfo result) {
assertNotNull(result);
assertTrue(result.cartId >= 0);
assertNull(result.customerName);
assertFalse(result.cartItems.isEmpty());
}
@Override
public void onFailure(Throwable t) {
fail("Unexpected failure detected", t);
}
}, listeningExecService);
}
@Test
public void whenTransform_thenTransformSuccess() {
ListeningExecutorService listenExecService = MoreExecutors.newDirectExecutorService();
ListenableFutureService service = new ListenableFutureService(listenExecService);
ListenableFuture<List<String>> cartItemsTask = service.getCartItems();
Function<List<String>, Integer> itemCountFunc = cartItems -> {
assertNotNull(cartItems);
return cartItems.size();
};
ListenableFuture<Integer> itemCountTask = Futures.transform(cartItemsTask, itemCountFunc, listenExecService);
Futures.addCallback(itemCountTask, new FutureCallback<Integer>() {
@Override
public void onSuccess(@Nullable Integer cartItemCount) {
assertNotNull(cartItemCount);
assertTrue(cartItemCount > 0);
}
@Override
public void onFailure(Throwable t) {
fail("Unexpected failure detected", t);
}
}, listenExecService);
}
@Test
public void whenSubmitAsync_thenSuccess() {
ListeningExecutorService executor = MoreExecutors.newDirectExecutorService();
ListenableFutureService service = new ListenableFutureService(executor);
AsyncCallable<String> asyncConfigTask = () -> {
ListenableFuture<String> configTask = service.fetchConfig("config.a");
TimeUnit.MILLISECONDS.sleep(500); //some long running task
return configTask;
};
ListenableFuture<String> configTask = Futures.submitAsync(asyncConfigTask, executor);
Futures.addCallback(configTask, new FutureCallback<String>() {
@Override
public void onSuccess(@Nullable String result) {
assertNotNull(result);
assertTrue(result.contains("config.a"));
}
@Override
public void onFailure(Throwable t) {
fail("Unexpected failure detected", t);
}
}, executor);
}
@Test
public void whenAsyncTransform_thenSuccess() {
ListeningExecutorService executor = MoreExecutors.newDirectExecutorService();
ListenableFutureService service = new ListenableFutureService(executor);
ListenableFuture<String> usernameTask = service.generateUsername("john");
AsyncFunction<String, String> passwordFunc = username -> {
ListenableFuture<String> generatePasswordTask = service.generatePassword(username);
TimeUnit.MILLISECONDS.sleep(500); // some long running task
return generatePasswordTask;
};
ListenableFuture<String> passwordTask = Futures.transformAsync(usernameTask, passwordFunc, executor);
Futures.addCallback(passwordTask, new FutureCallback<String>() {
@Override
public void onSuccess(@Nullable String password) {
assertNotNull(password);
assertTrue(password.contains("john"));
assertTrue(password.contains("@"));
}
@Override
public void onFailure(Throwable t) {
fail("Unexpected failure detected", t);
}
}, executor);
}
private static <T> T getOrNull(ListenableFuture<T> future) {
try {
return Futures.getDone(future);
} catch (ExecutionException e) {
return null;
}
}
static class CartInfo {
Integer cartId;
String customerName;
List<String> cartItems;
public CartInfo(Integer cartId, String customerName, List<String> cartItems) {
this.cartId = cartId;
this.customerName = customerName;
this.cartItems = cartItems;
}
}
}

View File

@ -0,0 +1,122 @@
package com.baeldung.guava.future;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import com.baeldung.guava.future.exception.ListenableFutureException;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
public class ListenableFutureSimpleUnitTest {
@Test
public void whenSubmitToListeningExecutor_thenSuccess() throws ExecutionException, InterruptedException {
ExecutorService execService = Executors.newSingleThreadExecutor();
ListeningExecutorService listeningExecService = MoreExecutors.listeningDecorator(execService);
ListenableFuture<Integer> asyncTask = listeningExecService.submit(() -> {
TimeUnit.MILLISECONDS.sleep(500); // long running task
return 5;
});
assertEquals(5, asyncTask.get());
}
@Test
public void
givenJavaExecutor_whenSubmitListeningTask_thenSuccess() throws ExecutionException, InterruptedException {
Executor executor = Executors.newSingleThreadExecutor();
ListenableFutureService service = new ListenableFutureService();
FutureTask<String> configFuture = service.fetchConfigTask("future.value");
executor.execute(configFuture);
assertTrue(configFuture.get().contains("future.value"));
ListenableFutureTask<String> configListenableFuture =
service.fetchConfigListenableTask("listenable.value");
executor.execute(configListenableFuture);
assertTrue(configListenableFuture.get().contains("listenable.value"));
}
@Test
public void givenNonFailingTask_whenCallbackListen_thenSuccess() {
Executor listeningExecutor = MoreExecutors.directExecutor();
ListenableFuture<Integer> succeedingTask = new ListenableFutureService().succeedingTask();
Futures.addCallback(succeedingTask, new FutureCallback<Integer>() {
@Override
public void onSuccess(Integer result) {
assertNotNull(result);
assertTrue(result >= 0);
}
@Override
public void onFailure(Throwable t) {
fail("Succeeding task cannot failed", t);
}
}, listeningExecutor);
}
@Test
public void givenFailingTask_whenCallbackListen_thenThrows() {
Executor listeningExecutor = MoreExecutors.directExecutor();
ListenableFuture<Integer> failingTask = new ListenableFutureService().failingTask();
Futures.addCallback(failingTask, new FutureCallback<Integer>() {
@Override
public void onSuccess(Integer result) {
fail("Failing task cannot succeed");
}
@Override
public void onFailure(Throwable t) {
assertTrue(t instanceof ListenableFutureException);
}
}, listeningExecutor);
}
@Test
public void givenNonFailingTask_whenDirectListen_thenListenerExecutes() {
Executor listeningExecutor = MoreExecutors.directExecutor();
int nextTask = 1;
Set<Integer> runningTasks = ConcurrentHashMap.newKeySet();
runningTasks.add(nextTask);
ListenableFuture<Integer> nonFailingTask = new ListenableFutureService().succeedingTask();
nonFailingTask.addListener(() -> runningTasks.remove(nextTask), listeningExecutor);
assertTrue(runningTasks.isEmpty());
}
@Test
public void givenFailingTask_whenDirectListen_thenListenerExecutes() {
final Executor listeningExecutor = MoreExecutors.directExecutor();
int nextTask = 1;
Set<Integer> runningTasks = ConcurrentHashMap.newKeySet();
runningTasks.add(nextTask);
final ListenableFuture<Integer> failingTask = new ListenableFutureService().failingTask();
failingTask.addListener(() -> runningTasks.remove(nextTask),listeningExecutor);
assertTrue(runningTasks.isEmpty());
}
}

View File

@ -24,6 +24,7 @@
<module>guava-collections-list</module> <module>guava-collections-list</module>
<module>guava-collections-map</module> <module>guava-collections-map</module>
<module>guava-collections-set</module> <module>guava-collections-set</module>
<module>guava-concurrency</module>
<module>guava-io</module> <module>guava-io</module>
</modules> </modules>