diff --git a/core/src/main/java/org/elasticsearch/common/cache/Cache.java b/core/src/main/java/org/elasticsearch/common/cache/Cache.java index df30123c35b..91d011ba03c 100644 --- a/core/src/main/java/org/elasticsearch/common/cache/Cache.java +++ b/core/src/main/java/org/elasticsearch/common/cache/Cache.java @@ -34,6 +34,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Predicate; import java.util.function.ToLongBiFunction; @@ -195,14 +196,15 @@ public class Cache { /** * get an entry from the segment; expired entries will be returned as null but not removed from the cache until the LRU list is - * pruned or a manual {@link Cache#refresh()} is performed + * pruned or a manual {@link Cache#refresh()} is performed however a caller can take action using the provided callback * * @param key the key of the entry to get from the cache * @param now the access time of this entry * @param isExpired test if the entry is expired + * @param onExpiration a callback if the entry associated to the key is expired * @return the entry if there was one, otherwise null */ - Entry get(K key, long now, Predicate> isExpired) { + Entry get(K key, long now, Predicate> isExpired, Consumer> onExpiration) { CompletableFuture> future; Entry entry = null; try (ReleasableLock ignored = readLock.acquire()) { @@ -217,6 +219,10 @@ public class Cache { return ok; } else { segmentStats.miss(); + if (ok != null) { + assert isExpired.test(ok); + onExpiration.accept(ok); + } return null; } }).get(); @@ -330,12 +336,12 @@ public class Cache { * @return the value to which the specified key is mapped, or null if this map contains no mapping for the key */ public V get(K key) { - return get(key, now()); + return get(key, now(), e -> {}); } - private V get(K key, long now) { + private V get(K key, long now, Consumer> onExpiration) { CacheSegment segment = getCacheSegment(key); - Entry entry = segment.get(key, now, e -> isExpired(e, now)); + Entry entry = segment.get(key, now, e -> isExpired(e, now), onExpiration); if (entry == null) { return null; } else { @@ -360,7 +366,12 @@ public class Cache { */ public V computeIfAbsent(K key, CacheLoader loader) throws ExecutionException { long now = now(); - V value = get(key, now); + // we have to eagerly evict expired entries or our putIfAbsent call below will fail + V value = get(key, now, e -> { + try (ReleasableLock ignored = lruLock.acquire()) { + evictEntry(e); + } + }); if (value == null) { // we need to synchronize loading of a value for a given key; however, holding the segment lock while // invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we @@ -691,15 +702,20 @@ public class Cache { assert lruLock.isHeldByCurrentThread(); while (tail != null && shouldPrune(tail, now)) { - CacheSegment segment = getCacheSegment(tail.key); - Entry entry = tail; - if (segment != null) { - segment.remove(tail.key); - } - delete(entry, RemovalNotification.RemovalReason.EVICTED); + evictEntry(tail); } } + private void evictEntry(Entry entry) { + assert lruLock.isHeldByCurrentThread(); + + CacheSegment segment = getCacheSegment(entry.key); + if (segment != null) { + segment.remove(entry.key); + } + delete(entry, RemovalNotification.RemovalReason.EVICTED); + } + private void delete(Entry entry, RemovalNotification.RemovalReason removalReason) { assert lruLock.isHeldByCurrentThread(); diff --git a/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java b/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java index 7dbaba02897..5675a7b524b 100644 --- a/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java +++ b/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java @@ -319,6 +319,29 @@ public class CacheTests extends ESTestCase { } } + public void testComputeIfAbsentAfterExpiration() throws ExecutionException { + AtomicLong now = new AtomicLong(); + Cache cache = new Cache() { + @Override + protected long now() { + return now.get(); + } + }; + cache.setExpireAfterAccessNanos(1); + now.set(0); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i) + "-first"); + } + now.set(2); + for (int i = 0; i < numberOfEntries; i++) { + cache.computeIfAbsent(i, k -> Integer.toString(k) + "-second"); + } + for (int i = 0; i < numberOfEntries; i++) { + assertEquals(i + "-second", cache.get(i)); + } + assertEquals(numberOfEntries, cache.stats().getEvictions()); + } + // randomly promote some entries, step the clock forward, then check that the promoted entries remain and the // non-promoted entries were removed public void testPromotion() {