diff --git a/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java b/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java index 4f64f0baca7..29e741e47ac 100644 --- a/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java +++ b/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java @@ -22,11 +22,14 @@ package org.elasticsearch.common.cache; import org.elasticsearch.test.ESTestCase; import org.junit.Before; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; import java.util.*; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.stream.Collectors; import static org.hamcrest.CoreMatchers.instanceOf; @@ -502,6 +505,91 @@ public class CacheTests extends ESTestCase { } } + public void testDependentKeyDeadlock() throws InterruptedException { + class Key { + private final int key; + + public Key(int key) { + this.key = key; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Key key1 = (Key) o; + + return key == key1.key; + + } + + @Override + public int hashCode() { + return key % 2; + } + } + + int numberOfThreads = randomIntBetween(2, 256); + final Cache cache = CacheBuilder.builder().build(); + CountDownLatch latch = new CountDownLatch(1 + numberOfThreads); + CountDownLatch deadlockLatch = new CountDownLatch(numberOfThreads); + List threads = new ArrayList<>(); + for (int i = 0; i < numberOfThreads; i++) { + Thread thread = new Thread(() -> { + Random random = new Random(random().nextLong()); + latch.countDown(); + for (int j = 0; j < numberOfEntries; j++) { + Key key = new Key(random.nextInt(numberOfEntries)); + try { + cache.computeIfAbsent(key, k -> k.key != 0 ? cache.get(new Key(k.key / 2)) : 0); + } catch (ExecutionException e) { + fail(e.getMessage()); + } + } + // successfully avoided deadlock, release the main thread + deadlockLatch.countDown(); + }); + threads.add(thread); + thread.start(); + } + + AtomicBoolean deadlock = new AtomicBoolean(); + assert !deadlock.get(); + + // start a watchdog service + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + scheduler.scheduleAtFixedRate(() -> { + Set ids = threads.stream().map(t -> t.getId()).collect(Collectors.toSet()); + ThreadMXBean mxBean = ManagementFactory.getThreadMXBean(); + long[] deadlockedThreads = mxBean.findDeadlockedThreads(); + if (!deadlock.get() && deadlockedThreads != null) { + for (long deadlockedThread : deadlockedThreads) { + // ensure that we detected deadlock on our threads + if (ids.contains(deadlockedThread)) { + deadlock.set(true); + // release the main test thread to fail the test + for (int i = 0; i < numberOfThreads; i++) { + deadlockLatch.countDown(); + } + break; + } + } + } + }, 1, 1, TimeUnit.SECONDS); + + // everything is setup, release the hounds + latch.countDown(); + + // wait for either deadlock to be detected or the threads to terminate + deadlockLatch.await(); + + // shutdown the watchdog service + scheduler.shutdown(); + + assertFalse("deadlock", deadlock.get()); + } + // test that the cache is not corrupted under lots of concurrent modifications, even hitting the same key // here be dragons: this test did catch one subtle bug during development; do not remove lightly public void testTorture() throws InterruptedException {