Fix issues with failed cache loads

This commit fixes two issues that could arise when a loader throws an
exception during a load in Cache#computeIfAbsent.

The underlying issue is that if the loader throws an exception,
Cache#computeIfAbsent would attempt to remove the polluted entry from
the cache. However, this cleanup was performed outside of the segment
lock. This means another thread could race and expire the polluted
entry (leading to NPEs) or get a polluted entry out of the cache before
the loading thread had a chance to cleanup (leading to ISEs).

The solution to the initial problem of correctly handling failed cached
loads is to check for failed loads in all places where entries are
retrieved from the map backing the segment. In such cases, we treat it
as if there was no entry in the cache, and we clean up the cache on a
best-effort basis. All of this is done outside of the segment lock to
avoid reintroducing the deadlock that was initially a problem when
loads were executed under a segment lock.
This commit is contained in:
Jason Tedor 2015-10-27 12:57:37 -04:00
parent a6bbf73065
commit 06853209aa
2 changed files with 151 additions and 45 deletions

View File

@ -25,12 +25,11 @@ import org.elasticsearch.common.util.concurrent.ReleasableLock;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.ToLongBiFunction;
/**
@ -175,7 +174,7 @@ public class Cache<K, V> {
ReleasableLock readLock = new ReleasableLock(segmentLock.readLock());
ReleasableLock writeLock = new ReleasableLock(segmentLock.writeLock());
Map<K, Future<Entry<K, V>>> map = new HashMap<>();
Map<K, CompletableFuture<Entry<K, V>>> map = new HashMap<>();
SegmentStats segmentStats = new SegmentStats();
@ -187,20 +186,28 @@ public class Cache<K, V> {
* @return the entry if there was one, otherwise null
*/
Entry<K, V> get(K key, long now) {
Future<Entry<K, V>> future;
CompletableFuture<Entry<K, V>> future;
Entry<K, V> entry = null;
try (ReleasableLock ignored = readLock.acquire()) {
future = map.get(key);
}
if (future != null) {
segmentStats.hit();
try {
entry = future.get();
entry.accessTime = now;
} catch (ExecutionException | InterruptedException e) {
throw new IllegalStateException("future should be a completedFuture for which get should not throw", e);
}
} else {
try {
entry = future.handle((ok, ex) -> {
if (ok != null) {
segmentStats.hit();
ok.accessTime = now;
return ok;
} else {
segmentStats.miss();
return null;
}
}).get();
} catch (ExecutionException | InterruptedException e) {
throw new IllegalStateException(e);
}
}
else {
segmentStats.miss();
}
return entry;
@ -216,11 +223,19 @@ public class Cache<K, V> {
*/
Tuple<Entry<K, V>, Entry<K, V>> put(K key, V value, long now) {
Entry<K, V> entry = new Entry<>(key, value, now);
Entry<K, V> existing;
Entry<K, V> existing = null;
try (ReleasableLock ignored = writeLock.acquire()) {
try {
Future<Entry<K, V>> future = map.put(key, CompletableFuture.completedFuture(entry));
existing = future != null ? future.get() : null;
CompletableFuture<Entry<K, V>> future = map.put(key, CompletableFuture.completedFuture(entry));
if (future != null) {
existing = future.handle((ok, ex) -> {
if (ok != null) {
return ok;
} else {
return null;
}
}).get();
}
} catch (ExecutionException | InterruptedException e) {
throw new IllegalStateException("future should be a completedFuture for which get should not throw", e);
}
@ -235,17 +250,23 @@ public class Cache<K, V> {
* @return the removed entry if there was one, otherwise null
*/
Entry<K, V> remove(K key) {
Future<Entry<K, V>> future;
CompletableFuture<Entry<K, V>> future;
Entry<K, V> entry = null;
try (ReleasableLock ignored = writeLock.acquire()) {
future = map.remove(key);
}
if (future != null) {
segmentStats.eviction();
try {
entry = future.get();
entry = future.handle((ok, ex) -> {
if (ok != null) {
segmentStats.eviction();
return ok;
} else {
return null;
}
}).get();
} catch (ExecutionException | InterruptedException e) {
throw new IllegalStateException("future should be a completedFuture for which get should not throw", e);
throw new IllegalStateException(e);
}
}
return entry;
@ -327,39 +348,57 @@ public class Cache<K, V> {
// the segment lock; to do this, we atomically put a future in the map that can load the value, and then
// get the value from this future on the thread that won the race to place the future into the segment map
CacheSegment<K, V> segment = getCacheSegment(key);
Future<Entry<K, V>> future;
FutureTask<Entry<K, V>> task = new FutureTask<>(() -> new Entry<>(key, loader.load(key), now));
CompletableFuture<Entry<K, V>> future;
CompletableFuture<Entry<K, V>> completableFuture = new CompletableFuture<>();
try (ReleasableLock ignored = segment.writeLock.acquire()) {
future = segment.map.putIfAbsent(key, task);
}
if (future == null) {
future = task;
task.run();
future = segment.map.putIfAbsent(key, completableFuture);
}
Entry<K, V> entry;
try {
entry = future.get();
} catch (ExecutionException | InterruptedException e) {
// if the future ended exceptionally, we do not want to pollute the cache
// however, we have to take care to ensure that the polluted entry has not already been replaced
try (ReleasableLock ignored = segment.writeLock.acquire()) {
Future<Entry<K, V>> sanity = segment.map.get(key);
try {
sanity.get();
} catch (ExecutionException | InterruptedException gotcha) {
segment.map.remove(key);
BiFunction<? super Entry<K, V>, Throwable, ? extends V> handler = (ok, ex) -> {
if (ok != null) {
try (ReleasableLock ignored = lruLock.acquire()) {
promote(ok, now);
}
return ok.value;
} else {
try (ReleasableLock ignored = segment.writeLock.acquire()) {
CompletableFuture<Entry<K, V>> sanity = segment.map.get(key);
if (sanity != null && sanity.isCompletedExceptionally()) {
segment.map.remove(key);
}
}
return null;
}
throw (e instanceof ExecutionException) ? (ExecutionException)e : new ExecutionException(e);
};
CompletableFuture<V> completableValue;
if (future == null) {
future = completableFuture;
completableValue = future.handle(handler);
V loaded;
try {
loaded = loader.load(key);
} catch (Exception e) {
future.completeExceptionally(e);
throw new ExecutionException(e);
}
if (loaded == null) {
NullPointerException npe = new NullPointerException("loader returned a null value");
future.completeExceptionally(npe);
throw new ExecutionException(npe);
} else {
future.complete(new Entry<>(key, loaded, now));
}
} else {
completableValue = future.handle(handler);
}
if (entry.value == null) {
throw new ExecutionException(new NullPointerException("loader returned a null value"));
try {
value = completableValue.get();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
try (ReleasableLock ignored = lruLock.acquire()) {
promote(entry, now);
}
value = entry.value;
}
return value;
}

View File

@ -463,6 +463,25 @@ public class CacheTests extends ESTestCase {
assertEquals(replacements, notifications);
}
public void testComputeIfAbsentLoadsSuccessfully() {
Map<Integer, Integer> map = new HashMap<>();
Cache<Integer, Integer> cache = CacheBuilder.<Integer, Integer>builder().build();
for (int i = 0; i < numberOfEntries; i++) {
try {
cache.computeIfAbsent(i, k -> {
int value = randomInt();
map.put(k, value);
return value;
});
} catch (ExecutionException e) {
fail(e.getMessage());
}
}
for (int i = 0; i < numberOfEntries; i++) {
assertEquals(map.get(i), cache.get(i));
}
}
public void testComputeIfAbsentCallsOnce() throws InterruptedException {
int numberOfThreads = randomIntBetween(2, 200);
final Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder().build();
@ -597,6 +616,54 @@ public class CacheTests extends ESTestCase {
assertFalse("deadlock", deadlock.get());
}
public void testCachePollution() throws InterruptedException {
int numberOfThreads = randomIntBetween(2, 200);
final Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder().build();
CountDownLatch latch = new CountDownLatch(1 + numberOfThreads);
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < numberOfThreads; i++) {
Thread thread = new Thread(() -> {
latch.countDown();
Random random = new Random(random().nextLong());
for (int j = 0; j < numberOfEntries; j++) {
Integer key = random.nextInt(numberOfEntries);
boolean first;
boolean second;
do {
first = random.nextBoolean();
second = random.nextBoolean();
} while (first && second);
if (first && !second) {
try {
cache.computeIfAbsent(key, k -> {
if (random.nextBoolean()) {
return Integer.toString(k);
} else {
throw new Exception("testCachePollution");
}
});
} catch (ExecutionException e) {
assertNotNull(e.getCause());
assertThat(e.getCause(), instanceOf(Exception.class));
assertEquals(e.getCause().getMessage(), "testCachePollution");
}
} else if (!first && second) {
cache.invalidate(key);
} else if (!first && !second) {
cache.get(key);
}
}
});
threads.add(thread);
thread.start();
}
latch.countDown();
for (Thread thread : threads) {
thread.join();
}
}
// test that the cache is not corrupted under lots of concurrent modifications, even hitting the same key
// here be dragons: this test did catch one subtle bug during development; do not remove lightly
public void testTorture() throws InterruptedException {