Merge pull request #14334 from jasontedor/cache-deadlock-test
Add cache deadlock test
This commit is contained in:
commit
90528ce5af
|
@ -22,11 +22,14 @@ package org.elasticsearch.common.cache;
|
|||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.management.ThreadMXBean;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
|
||||
|
@ -502,6 +505,91 @@ public class CacheTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testDependentKeyDeadlock() throws InterruptedException {
|
||||
class Key {
|
||||
private final int key;
|
||||
|
||||
public Key(int key) {
|
||||
this.key = key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
Key key1 = (Key) o;
|
||||
|
||||
return key == key1.key;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return key % 2;
|
||||
}
|
||||
}
|
||||
|
||||
int numberOfThreads = randomIntBetween(2, 256);
|
||||
final Cache<Key, Integer> cache = CacheBuilder.<Key, Integer>builder().build();
|
||||
CountDownLatch latch = new CountDownLatch(1 + numberOfThreads);
|
||||
CountDownLatch deadlockLatch = new CountDownLatch(numberOfThreads);
|
||||
List<Thread> threads = new ArrayList<>();
|
||||
for (int i = 0; i < numberOfThreads; i++) {
|
||||
Thread thread = new Thread(() -> {
|
||||
Random random = new Random(random().nextLong());
|
||||
latch.countDown();
|
||||
for (int j = 0; j < numberOfEntries; j++) {
|
||||
Key key = new Key(random.nextInt(numberOfEntries));
|
||||
try {
|
||||
cache.computeIfAbsent(key, k -> k.key != 0 ? cache.get(new Key(k.key / 2)) : 0);
|
||||
} catch (ExecutionException e) {
|
||||
fail(e.getMessage());
|
||||
}
|
||||
}
|
||||
// successfully avoided deadlock, release the main thread
|
||||
deadlockLatch.countDown();
|
||||
});
|
||||
threads.add(thread);
|
||||
thread.start();
|
||||
}
|
||||
|
||||
AtomicBoolean deadlock = new AtomicBoolean();
|
||||
assert !deadlock.get();
|
||||
|
||||
// start a watchdog service
|
||||
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
|
||||
scheduler.scheduleAtFixedRate(() -> {
|
||||
Set<Long> ids = threads.stream().map(t -> t.getId()).collect(Collectors.toSet());
|
||||
ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();
|
||||
long[] deadlockedThreads = mxBean.findDeadlockedThreads();
|
||||
if (!deadlock.get() && deadlockedThreads != null) {
|
||||
for (long deadlockedThread : deadlockedThreads) {
|
||||
// ensure that we detected deadlock on our threads
|
||||
if (ids.contains(deadlockedThread)) {
|
||||
deadlock.set(true);
|
||||
// release the main test thread to fail the test
|
||||
for (int i = 0; i < numberOfThreads; i++) {
|
||||
deadlockLatch.countDown();
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}, 1, 1, TimeUnit.SECONDS);
|
||||
|
||||
// everything is setup, release the hounds
|
||||
latch.countDown();
|
||||
|
||||
// wait for either deadlock to be detected or the threads to terminate
|
||||
deadlockLatch.await();
|
||||
|
||||
// shutdown the watchdog service
|
||||
scheduler.shutdown();
|
||||
|
||||
assertFalse("deadlock", deadlock.get());
|
||||
}
|
||||
|
||||
// test that the cache is not corrupted under lots of concurrent modifications, even hitting the same key
|
||||
// here be dragons: this test did catch one subtle bug during development; do not remove lightly
|
||||
public void testTorture() throws InterruptedException {
|
||||
|
|
Loading…
Reference in New Issue