Merge pull request #14315 from jasontedor/failed-cache-loads

Fix issues with failed cache loads
This commit is contained in:
Jason Tedor 2015-10-28 14:29:26 -04:00
commit c1156283c7
2 changed files with 151 additions and 45 deletions

View File

@ -25,12 +25,11 @@ import org.elasticsearch.common.util.concurrent.ReleasableLock;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.ToLongBiFunction;
/**
@ -175,7 +174,7 @@ public class Cache<K, V> {
ReleasableLock readLock = new ReleasableLock(segmentLock.readLock());
ReleasableLock writeLock = new ReleasableLock(segmentLock.writeLock());
Map<K, Future<Entry<K, V>>> map = new HashMap<>();
Map<K, CompletableFuture<Entry<K, V>>> map = new HashMap<>();
SegmentStats segmentStats = new SegmentStats();
@ -187,20 +186,28 @@ public class Cache<K, V> {
* @return the entry if there was one, otherwise null
*/
Entry<K, V> get(K key, long now) {
Future<Entry<K, V>> future;
CompletableFuture<Entry<K, V>> future;
Entry<K, V> entry = null;
try (ReleasableLock ignored = readLock.acquire()) {
future = map.get(key);
}
if (future != null) {
segmentStats.hit();
try {
entry = future.get();
entry.accessTime = now;
} catch (ExecutionException | InterruptedException e) {
throw new IllegalStateException("future should be a completedFuture for which get should not throw", e);
}
} else {
try {
entry = future.handle((ok, ex) -> {
if (ok != null) {
segmentStats.hit();
ok.accessTime = now;
return ok;
} else {
segmentStats.miss();
return null;
}
}).get();
} catch (ExecutionException | InterruptedException e) {
throw new IllegalStateException(e);
}
}
else {
segmentStats.miss();
}
return entry;
@ -216,11 +223,19 @@ public class Cache<K, V> {
*/
Tuple<Entry<K, V>, Entry<K, V>> put(K key, V value, long now) {
Entry<K, V> entry = new Entry<>(key, value, now);
Entry<K, V> existing;
Entry<K, V> existing = null;
try (ReleasableLock ignored = writeLock.acquire()) {
try {
Future<Entry<K, V>> future = map.put(key, CompletableFuture.completedFuture(entry));
existing = future != null ? future.get() : null;
CompletableFuture<Entry<K, V>> future = map.put(key, CompletableFuture.completedFuture(entry));
if (future != null) {
existing = future.handle((ok, ex) -> {
if (ok != null) {
return ok;
} else {
return null;
}
}).get();
}
} catch (ExecutionException | InterruptedException e) {
throw new IllegalStateException("future should be a completedFuture for which get should not throw", e);
}
@ -235,17 +250,23 @@ public class Cache<K, V> {
* @return the removed entry if there was one, otherwise null
*/
Entry<K, V> remove(K key) {
Future<Entry<K, V>> future;
CompletableFuture<Entry<K, V>> future;
Entry<K, V> entry = null;
try (ReleasableLock ignored = writeLock.acquire()) {
future = map.remove(key);
}
if (future != null) {
segmentStats.eviction();
try {
entry = future.get();
entry = future.handle((ok, ex) -> {
if (ok != null) {
segmentStats.eviction();
return ok;
} else {
return null;
}
}).get();
} catch (ExecutionException | InterruptedException e) {
throw new IllegalStateException("future should be a completedFuture for which get should not throw", e);
throw new IllegalStateException(e);
}
}
return entry;
@ -327,39 +348,57 @@ public class Cache<K, V> {
// the segment lock; to do this, we atomically put a future in the map that can load the value, and then
// get the value from this future on the thread that won the race to place the future into the segment map
CacheSegment<K, V> segment = getCacheSegment(key);
Future<Entry<K, V>> future;
FutureTask<Entry<K, V>> task = new FutureTask<>(() -> new Entry<>(key, loader.load(key), now));
CompletableFuture<Entry<K, V>> future;
CompletableFuture<Entry<K, V>> completableFuture = new CompletableFuture<>();
try (ReleasableLock ignored = segment.writeLock.acquire()) {
future = segment.map.putIfAbsent(key, task);
}
if (future == null) {
future = task;
task.run();
future = segment.map.putIfAbsent(key, completableFuture);
}
Entry<K, V> entry;
try {
entry = future.get();
} catch (ExecutionException | InterruptedException e) {
// if the future ended exceptionally, we do not want to pollute the cache
// however, we have to take care to ensure that the polluted entry has not already been replaced
try (ReleasableLock ignored = segment.writeLock.acquire()) {
Future<Entry<K, V>> sanity = segment.map.get(key);
try {
sanity.get();
} catch (ExecutionException | InterruptedException gotcha) {
segment.map.remove(key);
BiFunction<? super Entry<K, V>, Throwable, ? extends V> handler = (ok, ex) -> {
if (ok != null) {
try (ReleasableLock ignored = lruLock.acquire()) {
promote(ok, now);
}
return ok.value;
} else {
try (ReleasableLock ignored = segment.writeLock.acquire()) {
CompletableFuture<Entry<K, V>> sanity = segment.map.get(key);
if (sanity != null && sanity.isCompletedExceptionally()) {
segment.map.remove(key);
}
}
return null;
}
throw (e instanceof ExecutionException) ? (ExecutionException)e : new ExecutionException(e);
};
CompletableFuture<V> completableValue;
if (future == null) {
future = completableFuture;
completableValue = future.handle(handler);
V loaded;
try {
loaded = loader.load(key);
} catch (Exception e) {
future.completeExceptionally(e);
throw new ExecutionException(e);
}
if (loaded == null) {
NullPointerException npe = new NullPointerException("loader returned a null value");
future.completeExceptionally(npe);
throw new ExecutionException(npe);
} else {
future.complete(new Entry<>(key, loaded, now));
}
} else {
completableValue = future.handle(handler);
}
if (entry.value == null) {
throw new ExecutionException(new NullPointerException("loader returned a null value"));
try {
value = completableValue.get();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
try (ReleasableLock ignored = lruLock.acquire()) {
promote(entry, now);
}
value = entry.value;
}
return value;
}

View File

@ -463,6 +463,25 @@ public class CacheTests extends ESTestCase {
assertEquals(replacements, notifications);
}
public void testComputeIfAbsentLoadsSuccessfully() {
Map<Integer, Integer> map = new HashMap<>();
Cache<Integer, Integer> cache = CacheBuilder.<Integer, Integer>builder().build();
for (int i = 0; i < numberOfEntries; i++) {
try {
cache.computeIfAbsent(i, k -> {
int value = randomInt();
map.put(k, value);
return value;
});
} catch (ExecutionException e) {
fail(e.getMessage());
}
}
for (int i = 0; i < numberOfEntries; i++) {
assertEquals(map.get(i), cache.get(i));
}
}
public void testComputeIfAbsentCallsOnce() throws InterruptedException {
int numberOfThreads = randomIntBetween(2, 200);
final Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder().build();
@ -597,6 +616,54 @@ public class CacheTests extends ESTestCase {
assertFalse("deadlock", deadlock.get());
}
public void testCachePollution() throws InterruptedException {
int numberOfThreads = randomIntBetween(2, 200);
final Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder().build();
CountDownLatch latch = new CountDownLatch(1 + numberOfThreads);
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < numberOfThreads; i++) {
Thread thread = new Thread(() -> {
latch.countDown();
Random random = new Random(random().nextLong());
for (int j = 0; j < numberOfEntries; j++) {
Integer key = random.nextInt(numberOfEntries);
boolean first;
boolean second;
do {
first = random.nextBoolean();
second = random.nextBoolean();
} while (first && second);
if (first && !second) {
try {
cache.computeIfAbsent(key, k -> {
if (random.nextBoolean()) {
return Integer.toString(k);
} else {
throw new Exception("testCachePollution");
}
});
} catch (ExecutionException e) {
assertNotNull(e.getCause());
assertThat(e.getCause(), instanceOf(Exception.class));
assertEquals(e.getCause().getMessage(), "testCachePollution");
}
} else if (!first && second) {
cache.invalidate(key);
} else if (!first && !second) {
cache.get(key);
}
}
});
threads.add(thread);
thread.start();
}
latch.countDown();
for (Thread thread : threads) {
thread.join();
}
}
// test that the cache is not corrupted under lots of concurrent modifications, even hitting the same key
// here be dragons: this test did catch one subtle bug during development; do not remove lightly
public void testTorture() throws InterruptedException {