From 557b11cc2b7577f1d34a7f650f14b0d9f91f7107 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 6 Jan 2016 14:20:30 -0500 Subject: [PATCH] Sychronize threads in cluster state update task execution ordering test This commit uses a CyclicBarrier to correctly and simply sychronize the driver and test threads in ClusterServiceIT#testClusterStateUpdateTasksAreExecutedInOrder. --- .../cluster/ClusterServiceIT.java | 32 ++++++++----------- 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java index e87a353649e..a334010c071 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java @@ -51,9 +51,11 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -798,7 +800,7 @@ public class ClusterServiceIT extends ESIntegTestCase { // test that for a single thread, tasks are executed in the order // that they are submitted - public void testClusterStateUpdateTasksAreExecutedInOrder() throws InterruptedException { + public void testClusterStateUpdateTasksAreExecutedInOrder() throws BrokenBarrierException, InterruptedException { Settings settings = settingsBuilder() .put("discovery.type", "local") .build(); @@ -845,34 +847,28 @@ public class ClusterServiceIT extends ESIntegTestCase { } }; - CountDownLatch startGate = new CountDownLatch(1); - CountDownLatch endGate = new CountDownLatch(numberOfThreads); - AtomicBoolean interrupted = new AtomicBoolean(); + CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); for (int i = 0; i < numberOfThreads; i++) { final int index = i; Thread thread = new Thread(() -> { - for (int j = 0; j < tasksSubmittedPerThread; j++) { - try { - try { - startGate.await(); - } catch (InterruptedException e) { - interrupted.set(true); - return; - } + try { + barrier.await(); + for (int j = 0; j < tasksSubmittedPerThread; j++) { clusterService.submitStateUpdateTask("[" + index + "][" + j + "]", j, ClusterStateTaskConfig.build(randomFrom(Priority.values())), executors[index], listener); - } finally { - endGate.countDown(); } + barrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + throw new AssertionError(e); } }); thread.start(); } - startGate.countDown(); - endGate.await(); - - assertFalse(interrupted.get()); + // wait for all threads to be ready + barrier.await(); + // wait for all threads to finish + barrier.await(); updateLatch.await();