Fix cache compute if absent for expired entries

When a cache entry expires, it remains in the cache (both the segment
that it belongs to, and the LRU list) until an eviction occurs. The
problem here is that the compute if absent implementation relies on
there not being an association to a key that we are trying to put
because it internally uses put if absent on the underlying segment. If
we try to put an association for a key that has expired but not been
evicted, then compute if absent will return as if there is nothing in
the cache for the given key, yet no call to compute if absent will
succeed in putting a new association for the key. To remedy this, we
modify the internal get method for the cache to let the caller take
action if the entry they are retrieving is expired. This allows the
compute if absent method to take the action of evicting the entry from
the cache, thus allowing the put if absent method used by compute if
absent to succeed for one of the callers trying to compute if absent a
new association.

Relates #26516
This commit is contained in:
Jason Tedor 2017-09-06 13:44:20 -04:00 committed by GitHub
parent ecf39bc0c1
commit 9c795bd838
2 changed files with 51 additions and 12 deletions

View File

@ -34,6 +34,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.ToLongBiFunction;
@ -195,14 +196,15 @@ public class Cache<K, V> {
/**
* get an entry from the segment; expired entries will be returned as null but not removed from the cache until the LRU list is
* pruned or a manual {@link Cache#refresh()} is performed
* pruned or a manual {@link Cache#refresh()} is performed however a caller can take action using the provided callback
*
* @param key the key of the entry to get from the cache
* @param now the access time of this entry
* @param isExpired test if the entry is expired
* @param onExpiration a callback if the entry associated to the key is expired
* @return the entry if there was one, otherwise null
*/
Entry<K, V> get(K key, long now, Predicate<Entry<K, V>> isExpired) {
Entry<K, V> get(K key, long now, Predicate<Entry<K, V>> isExpired, Consumer<Entry<K, V>> onExpiration) {
CompletableFuture<Entry<K, V>> future;
Entry<K, V> entry = null;
try (ReleasableLock ignored = readLock.acquire()) {
@ -217,6 +219,10 @@ public class Cache<K, V> {
return ok;
} else {
segmentStats.miss();
if (ok != null) {
assert isExpired.test(ok);
onExpiration.accept(ok);
}
return null;
}
}).get();
@ -330,12 +336,12 @@ public class Cache<K, V> {
* @return the value to which the specified key is mapped, or null if this map contains no mapping for the key
*/
public V get(K key) {
return get(key, now());
return get(key, now(), e -> {});
}
private V get(K key, long now) {
private V get(K key, long now, Consumer<Entry<K, V>> onExpiration) {
CacheSegment<K, V> segment = getCacheSegment(key);
Entry<K, V> entry = segment.get(key, now, e -> isExpired(e, now));
Entry<K, V> entry = segment.get(key, now, e -> isExpired(e, now), onExpiration);
if (entry == null) {
return null;
} else {
@ -360,7 +366,12 @@ public class Cache<K, V> {
*/
public V computeIfAbsent(K key, CacheLoader<K, V> loader) throws ExecutionException {
long now = now();
V value = get(key, now);
// we have to eagerly evict expired entries or our putIfAbsent call below will fail
V value = get(key, now, e -> {
try (ReleasableLock ignored = lruLock.acquire()) {
evictEntry(e);
}
});
if (value == null) {
// we need to synchronize loading of a value for a given key; however, holding the segment lock while
// invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we
@ -691,15 +702,20 @@ public class Cache<K, V> {
assert lruLock.isHeldByCurrentThread();
while (tail != null && shouldPrune(tail, now)) {
CacheSegment<K, V> segment = getCacheSegment(tail.key);
Entry<K, V> entry = tail;
if (segment != null) {
segment.remove(tail.key);
}
delete(entry, RemovalNotification.RemovalReason.EVICTED);
evictEntry(tail);
}
}
private void evictEntry(Entry<K, V> entry) {
assert lruLock.isHeldByCurrentThread();
CacheSegment<K, V> segment = getCacheSegment(entry.key);
if (segment != null) {
segment.remove(entry.key);
}
delete(entry, RemovalNotification.RemovalReason.EVICTED);
}
private void delete(Entry<K, V> entry, RemovalNotification.RemovalReason removalReason) {
assert lruLock.isHeldByCurrentThread();

View File

@ -319,6 +319,29 @@ public class CacheTests extends ESTestCase {
}
}
public void testComputeIfAbsentAfterExpiration() throws ExecutionException {
AtomicLong now = new AtomicLong();
Cache<Integer, String> cache = new Cache<Integer, String>() {
@Override
protected long now() {
return now.get();
}
};
cache.setExpireAfterAccessNanos(1);
now.set(0);
for (int i = 0; i < numberOfEntries; i++) {
cache.put(i, Integer.toString(i) + "-first");
}
now.set(2);
for (int i = 0; i < numberOfEntries; i++) {
cache.computeIfAbsent(i, k -> Integer.toString(k) + "-second");
}
for (int i = 0; i < numberOfEntries; i++) {
assertEquals(i + "-second", cache.get(i));
}
assertEquals(numberOfEntries, cache.stats().getEvictions());
}
// randomly promote some entries, step the clock forward, then check that the promoted entries remain and the
// non-promoted entries were removed
public void testPromotion() {