Add cache deadlock test

This commit adds a unit test for a deadlock issue that existed prior to
commit 1d0b93f766. While commit
1d0b93f766 seems to have addressed the
deadlock issue it would be more robust to have a unit test for it and a
unit test will reduce the risk that future maintenance on Cache will
reintroduce the deadlock issue. This test reliably fails prior to but
passes after commit 1d0b93f766.
This commit is contained in:
Jason Tedor 2015-10-28 08:30:22 -04:00
parent 1672bcc21c
commit baf361f1f9
1 changed files with 90 additions and 2 deletions

View File

@ -22,11 +22,14 @@ package org.elasticsearch.common.cache;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.instanceOf;
@ -502,6 +505,91 @@ public class CacheTests extends ESTestCase {
}
}
public void testDependentKeyDeadlock() throws InterruptedException {
class Key {
private final int key;
public Key(int key) {
this.key = key;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Key key1 = (Key) o;
return key == key1.key;
}
@Override
public int hashCode() {
return key % 2;
}
}
int numberOfThreads = randomIntBetween(2, 256);
final Cache<Key, Integer> cache = CacheBuilder.<Key, Integer>builder().build();
CountDownLatch latch = new CountDownLatch(1 + numberOfThreads);
CountDownLatch deadlockLatch = new CountDownLatch(numberOfThreads);
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < numberOfThreads; i++) {
Thread thread = new Thread(() -> {
Random random = new Random(random().nextLong());
latch.countDown();
for (int j = 0; j < numberOfEntries; j++) {
Key key = new Key(random.nextInt(numberOfEntries));
try {
cache.computeIfAbsent(key, k -> k.key != 0 ? cache.get(new Key(k.key / 2)) : 0);
} catch (ExecutionException e) {
fail(e.getMessage());
}
}
// successfully avoided deadlock, release the main thread
deadlockLatch.countDown();
});
threads.add(thread);
thread.start();
}
AtomicBoolean deadlock = new AtomicBoolean();
assert !deadlock.get();
// start a watchdog service
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
Set<Long> ids = threads.stream().map(t -> t.getId()).collect(Collectors.toSet());
ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();
long[] deadlockedThreads = mxBean.findDeadlockedThreads();
if (!deadlock.get() && deadlockedThreads != null) {
for (long deadlockedThread : deadlockedThreads) {
// ensure that we detected deadlock on our threads
if (ids.contains(deadlockedThread)) {
deadlock.set(true);
// release the main test thread to fail the test
for (int i = 0; i < numberOfThreads; i++) {
deadlockLatch.countDown();
}
break;
}
}
}
}, 1, 1, TimeUnit.SECONDS);
// everything is setup, release the hounds
latch.countDown();
// wait for either deadlock to be detected or the threads to terminate
deadlockLatch.await();
// shutdown the watchdog service
scheduler.shutdown();
assertFalse("deadlock", deadlock.get());
}
// test that the cache is not corrupted under lots of concurrent modifications, even hitting the same key
// here be dragons: this test did catch one subtle bug during development; do not remove lightly
public void testTorture() throws InterruptedException {