diff --git a/core/src/main/java/org/elasticsearch/common/cache/Cache.java b/core/src/main/java/org/elasticsearch/common/cache/Cache.java index 709dbcec8ed..ae07f5d5289 100644 --- a/core/src/main/java/org/elasticsearch/common/cache/Cache.java +++ b/core/src/main/java/org/elasticsearch/common/cache/Cache.java @@ -187,16 +187,13 @@ public class Cache { * @param key the key of the entry to add to the cache * @param value the value of the entry to add to the cache * @param now the access time of this entry - * @param onlyIfAbsent whether or not an existing entry should be replaced * @return a tuple of the new entry and the existing entry, if there was one otherwise null */ - Tuple, Entry> put(K key, V value, long now, boolean onlyIfAbsent) { + Tuple, Entry> put(K key, V value, long now) { Entry entry = new Entry<>(key, value, now); - Entry existing = null; + Entry existing; try (ReleasableLock ignored = writeLock.acquire()) { - if (!onlyIfAbsent || (onlyIfAbsent && map.get(key) == null)) { - existing = map.put(key, entry); - } + existing = map.put(key, entry); } return Tuple.tuple(entry, existing); } @@ -282,9 +279,15 @@ public class Cache { long now = now(); V value = get(key); if (value == null) { - value = mappingFunction.apply(key); - if (value != null) { - put(key, value, now, true); + CacheSegment segment = getCacheSegment(key); + try (ReleasableLock ignored = segment.writeLock.acquire()) { + value = get(key); + if (value == null) { + value = mappingFunction.apply(key); + } + if (value != null) { + put(key, value, now); + } } } return value; @@ -299,12 +302,12 @@ public class Cache { */ public void put(K key, V value) { long now = now(); - put(key, value, now, false); + put(key, value, now); } - private void put(K key, V value, long now, boolean onlyIfAbsent) { + private void put(K key, V value, long now) { CacheSegment segment = getCacheSegment(key); - Tuple, Entry> tuple = segment.put(key, value, now, onlyIfAbsent); + Tuple, Entry> tuple = segment.put(key, value, now); boolean replaced = false; try (ReleasableLock ignored = lock.acquire()) { if (tuple.v2() != null && tuple.v2().state == State.EXISTING) { diff --git a/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java b/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java index 3934a1c2239..bb348b378f9 100644 --- a/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java +++ b/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java @@ -25,6 +25,7 @@ import org.junit.Before; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceArray; import static org.hamcrest.Matchers.not; @@ -435,6 +436,31 @@ public class CacheTests extends ESTestCase { assertEquals(replacements, notifications); } + public void testComputeIfAbsentCallsOnce() throws InterruptedException { + int numberOfThreads = randomIntBetween(2, 200); + final Cache cache = CacheBuilder.builder().build(); + List threads = new ArrayList<>(); + AtomicReferenceArray flags = new AtomicReferenceArray(numberOfEntries); + for (int j = 0; j < numberOfEntries; j++) { + flags.set(j, false); + } + for (int i = 0; i < numberOfThreads; i++) { + Thread thread = new Thread(() -> { + for (int j = 0; j < numberOfEntries; j++) { + cache.computeIfAbsent(j, key -> { + assertTrue(flags.compareAndSet(key, false, true)); + return Integer.toString(key); + }); + } + }); + threads.add(thread); + thread.start(); + } + for (Thread thread : threads) { + thread.join(); + } + } + // test that the cache is not corrupted under lots of concurrent modifications, even hitting the same key // here be dragons: this test did catch one subtle bug during development; do not remove lightly public void testTorture() throws InterruptedException {