diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java index a7417b4362f..5a59341ef35 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java @@ -54,6 +54,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -762,6 +763,11 @@ public class ClusterServiceIT extends ESIntegTestCase { } } + int numberOfThreads = randomIntBetween(2, 8); + int tasksSubmittedPerThread = randomIntBetween(1, 1024); + int numberOfExecutors = Math.max(1, numberOfThreads / 4); + final Semaphore semaphore = new Semaphore(numberOfExecutors); + class TaskExecutor implements ClusterStateTaskExecutor { private AtomicInteger counter = new AtomicInteger(); private AtomicInteger batches = new AtomicInteger(); @@ -775,6 +781,7 @@ public class ClusterServiceIT extends ESIntegTestCase { if (randomBoolean()) { maybeUpdatedClusterState = ClusterState.builder(currentState).build(); batches.incrementAndGet(); + semaphore.acquire(); } return BatchResult.builder().successes(tasks).build(maybeUpdatedClusterState); } @@ -787,10 +794,9 @@ public class ClusterServiceIT extends ESIntegTestCase { @Override public void clusterStatePublished(ClusterState newClusterState) { published.incrementAndGet(); + semaphore.release(); } } - int numberOfThreads = randomIntBetween(2, 8); - int tasksSubmittedPerThread = randomIntBetween(1, 1024); ConcurrentMap counters = new ConcurrentHashMap<>(); CountDownLatch updateLatch = new CountDownLatch(numberOfThreads * tasksSubmittedPerThread); @@ -807,7 +813,6 @@ public class ClusterServiceIT extends ESIntegTestCase { } }; - int numberOfExecutors = Math.max(1, numberOfThreads / 4); List executors = new ArrayList<>(); for (int i = 0; i < numberOfExecutors; i++) { executors.add(new TaskExecutor()); @@ -853,6 +858,8 @@ public class ClusterServiceIT extends ESIntegTestCase { // wait until all the cluster state updates have been processed updateLatch.await(); + // and until all of the publication callbacks have completed + semaphore.acquire(numberOfExecutors); // assert the number of executed tasks is correct assertEquals(numberOfThreads * tasksSubmittedPerThread, counter.get());