From 4c0f5bda47164d5cbced314bf30951d398f8430b Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 6 Jan 2016 15:07:05 -0500 Subject: [PATCH] Use CyclicBarriers for sychronizing driver and test threads This commit modifies some tests to use CyclicBarriers to correctly and simply sychronize driver and test threads. --- .../cluster/ClusterServiceIT.java | 25 ++--- .../common/cache/CacheTests.java | 99 +++++++++---------- 2 files changed, 55 insertions(+), 69 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java index 5093e5c27f5..2d781c866de 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java @@ -886,7 +886,7 @@ public class ClusterServiceIT extends ESIntegTestCase { } } - public void testClusterStateBatchedUpdates() throws InterruptedException { + public void testClusterStateBatchedUpdates() throws BrokenBarrierException, InterruptedException { Settings settings = settingsBuilder() .put("discovery.type", "local") .build(); @@ -974,19 +974,12 @@ public class ClusterServiceIT extends ESIntegTestCase { counts.merge(executor, 1, (previous, one) -> previous + one); } - CountDownLatch startGate = new CountDownLatch(1); - CountDownLatch endGate = new CountDownLatch(numberOfThreads); - AtomicBoolean interrupted = new AtomicBoolean(); + CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); for (int i = 0; i < numberOfThreads; i++) { final int index = i; Thread thread = new Thread(() -> { try { - try { - startGate.await(); - } catch (InterruptedException e) { - interrupted.set(true); - return; - } + barrier.await(); for (int j = 0; j < tasksSubmittedPerThread; j++) { ClusterStateTaskExecutor executor = assignments.get(index * tasksSubmittedPerThread + j); clusterService.submitStateUpdateTask( @@ -996,16 +989,18 @@ public class ClusterServiceIT extends ESIntegTestCase { executor, listener); } - } finally { - endGate.countDown(); + barrier.await(); + } catch (BrokenBarrierException | InterruptedException e) { + throw new AssertionError(e); } }); thread.start(); } - startGate.countDown(); - endGate.await(); - assertFalse(interrupted.get()); + // wait for all threads to be ready + barrier.await(); + // wait for all threads to finish + barrier.await(); // wait until all the cluster state updates have been processed updateLatch.await(); diff --git a/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java b/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java index 0985bc4b88e..a397b75a6aa 100644 --- a/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java +++ b/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java @@ -31,7 +31,9 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -491,25 +493,19 @@ public class CacheTests extends ESTestCase { } } - public void testComputeIfAbsentCallsOnce() throws InterruptedException { + public void testComputeIfAbsentCallsOnce() throws BrokenBarrierException, InterruptedException { int numberOfThreads = randomIntBetween(2, 32); final Cache cache = CacheBuilder.builder().build(); AtomicReferenceArray flags = new AtomicReferenceArray(numberOfEntries); for (int j = 0; j < numberOfEntries; j++) { flags.set(j, false); } - CountDownLatch startGate = new CountDownLatch(1); - CountDownLatch endGate = new CountDownLatch(numberOfThreads); - AtomicBoolean interrupted = new AtomicBoolean(); + + CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); for (int i = 0; i < numberOfThreads; i++) { Thread thread = new Thread(() -> { try { - try { - startGate.await(); - } catch (InterruptedException e) { - interrupted.set(true); - return; - } + barrier.await(); for (int j = 0; j < numberOfEntries; j++) { try { cache.computeIfAbsent(j, key -> { @@ -517,18 +513,21 @@ public class CacheTests extends ESTestCase { return Integer.toString(key); }); } catch (ExecutionException e) { - throw new RuntimeException(e); + fail(e.getMessage()); } } - } finally { - endGate.countDown(); + barrier.await(); + } catch (BrokenBarrierException | InterruptedException e) { + throw new AssertionError(e); } }); thread.start(); } - startGate.countDown(); - endGate.await(); - assertFalse(interrupted.get()); + + // wait for all threads to be ready + barrier.await(); + // wait for all threads to finish + barrier.await(); } public void testComputeIfAbsentThrowsExceptionIfLoaderReturnsANullValue() { @@ -541,7 +540,7 @@ public class CacheTests extends ESTestCase { } } - public void testDependentKeyDeadlock() throws InterruptedException { + public void testDependentKeyDeadlock() throws BrokenBarrierException, InterruptedException { class Key { private final int key; @@ -568,18 +567,17 @@ public class CacheTests extends ESTestCase { int numberOfThreads = randomIntBetween(2, 32); final Cache cache = CacheBuilder.builder().build(); - CountDownLatch startGate = new CountDownLatch(1); + + CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); CountDownLatch deadlockLatch = new CountDownLatch(numberOfThreads); - AtomicBoolean interrupted = new AtomicBoolean(); List threads = new ArrayList<>(); for (int i = 0; i < numberOfThreads; i++) { Thread thread = new Thread(() -> { try { try { - startGate.await(); - } catch (InterruptedException e) { - interrupted.set(true); - return; + barrier.await(); + } catch (BrokenBarrierException | InterruptedException e) { + throw new AssertionError(e); } Random random = new Random(random().nextLong()); for (int j = 0; j < numberOfEntries; j++) { @@ -631,7 +629,7 @@ public class CacheTests extends ESTestCase { }, 1, 1, TimeUnit.SECONDS); // everything is setup, release the hounds - startGate.countDown(); + barrier.await(); // wait for either deadlock to be detected or the threads to terminate deadlockLatch.await(); @@ -642,21 +640,16 @@ public class CacheTests extends ESTestCase { assertFalse("deadlock", deadlock.get()); } - public void testCachePollution() throws InterruptedException { + public void testCachePollution() throws BrokenBarrierException, InterruptedException { int numberOfThreads = randomIntBetween(2, 32); final Cache cache = CacheBuilder.builder().build(); - CountDownLatch startGate = new CountDownLatch(1); - CountDownLatch endGate = new CountDownLatch(numberOfThreads); - AtomicBoolean interrupted = new AtomicBoolean(); + + CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); + for (int i = 0; i < numberOfThreads; i++) { Thread thread = new Thread(() -> { try { - try { - startGate.await(); - } catch (InterruptedException e) { - interrupted.set(true); - return; - } + barrier.await(); Random random = new Random(random().nextLong()); for (int j = 0; j < numberOfEntries; j++) { Integer key = random.nextInt(numberOfEntries); @@ -686,21 +679,23 @@ public class CacheTests extends ESTestCase { cache.get(key); } } - } finally { - endGate.countDown(); + barrier.await(); + } catch (BrokenBarrierException | InterruptedException e) { + throw new AssertionError(e); } }); thread.start(); } - startGate.countDown(); - endGate.await(); - assertFalse(interrupted.get()); + // wait for all threads to be ready + barrier.await(); + // wait for all threads to finish + barrier.await(); } // test that the cache is not corrupted under lots of concurrent modifications, even hitting the same key // here be dragons: this test did catch one subtle bug during development; do not remove lightly - public void testTorture() throws InterruptedException { + public void testTorture() throws BrokenBarrierException, InterruptedException { int numberOfThreads = randomIntBetween(2, 32); final Cache cache = CacheBuilder.builder() @@ -708,32 +703,28 @@ public class CacheTests extends ESTestCase { .weigher((k, v) -> 2) .build(); - CountDownLatch startGate = new CountDownLatch(1); - CountDownLatch endGate = new CountDownLatch(numberOfThreads); - AtomicBoolean interrupted = new AtomicBoolean(); + CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); for (int i = 0; i < numberOfThreads; i++) { Thread thread = new Thread(() -> { try { - try { - startGate.await(); - } catch (InterruptedException e) { - interrupted.set(true); - return; - } + barrier.await(); Random random = new Random(random().nextLong()); for (int j = 0; j < numberOfEntries; j++) { Integer key = random.nextInt(numberOfEntries); cache.put(key, Integer.toString(j)); } - } finally { - endGate.countDown(); + barrier.await(); + } catch (BrokenBarrierException | InterruptedException e) { + throw new AssertionError(e); } }); thread.start(); } - startGate.countDown(); - endGate.await(); - assertFalse(interrupted.get()); + + // wait for all threads to be ready + barrier.await(); + // wait for all threads to finish + barrier.await(); cache.refresh(); assertEquals(500, cache.count());