From 080d55a3933776366cdcfe61eb0e2bae96649234 Mon Sep 17 00:00:00 2001 From: Jay Modi Date: Tue, 22 Nov 2016 13:24:15 -0500 Subject: [PATCH] Rethrow ExecutionException from the loader to concurrent callers of Cache#computeIfAbsent This commit clarifies the contract of Cache#computeIfAbsent so that an exception that occurs during the execution of the loader is thrown to all callers. Prior to this commit, the first caller would get the ExecutionException and other callers that called during the load execution would get null, which is confusing. --- .../org/elasticsearch/common/cache/Cache.java | 14 +++++-- .../common/cache/CacheTests.java | 37 +++++++++++++++++++ 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/cache/Cache.java b/core/src/main/java/org/elasticsearch/common/cache/Cache.java index cf8b58d0271..fd83f5f1494 100644 --- a/core/src/main/java/org/elasticsearch/common/cache/Cache.java +++ b/core/src/main/java/org/elasticsearch/common/cache/Cache.java @@ -346,11 +346,14 @@ public class Cache { * value using the given mapping function and enters it into this map unless null. The load method for a given key * will be invoked at most once. * + * Use of different {@link CacheLoader} implementations on the same key concurrently may result in only the first + * loader function being called and the second will be returned the result provided by the first including any exceptions + * thrown during the execution of the first. + * * @param key the key whose associated value is to be returned or computed for if non-existent * @param loader the function to compute a value given a key - * @return the current (existing or computed) value associated with the specified key, or null if the computed - * value is null - * @throws ExecutionException thrown if loader throws an exception + * @return the current (existing or computed) non-null value associated with the specified key + * @throws ExecutionException thrown if loader throws an exception or returns a null value */ public V computeIfAbsent(K key, CacheLoader loader) throws ExecutionException { long now = now(); @@ -410,6 +413,11 @@ public class Cache { try { value = completableValue.get(); + // check to ensure the future hasn't been completed with an exception + if (future.isCompletedExceptionally()) { + future.get(); // call get to force the exception to be thrown for other concurrent callers + throw new IllegalStateException("the future was completed exceptionally but no exception was thrown"); + } } catch (InterruptedException e) { throw new IllegalStateException(e); } diff --git a/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java b/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java index d8dbaa673a0..d7bb90f6c00 100644 --- a/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java +++ b/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java @@ -706,6 +706,43 @@ public class CacheTests extends ESTestCase { barrier.await(); } + public void testExceptionThrownDuringConcurrentComputeIfAbsent() throws BrokenBarrierException, InterruptedException { + int numberOfThreads = randomIntBetween(2, 32); + final Cache cache = CacheBuilder.builder().build(); + + CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); + + final String key = randomAsciiOfLengthBetween(2, 32); + for (int i = 0; i < numberOfThreads; i++) { + Thread thread = new Thread(() -> { + try { + barrier.await(); + for (int j = 0; j < numberOfEntries; j++) { + try { + String value = cache.computeIfAbsent(key, k -> { + throw new RuntimeException("failed to load"); + }); + fail("expected exception but got: " + value); + } catch (ExecutionException e) { + assertNotNull(e.getCause()); + assertThat(e.getCause(), instanceOf(RuntimeException.class)); + assertEquals(e.getCause().getMessage(), "failed to load"); + } + } + barrier.await(); + } catch (BrokenBarrierException | InterruptedException e) { + throw new AssertionError(e); + } + }); + thread.start(); + } + + // wait for all threads to be ready + barrier.await(); + // wait for all threads to finish + barrier.await(); + } + // test that the cache is not corrupted under lots of concurrent modifications, even hitting the same key // here be dragons: this test did catch one subtle bug during development; do not remove lightly public void testTorture() throws BrokenBarrierException, InterruptedException {