Sychronize threads in cluster state update task execution ordering test

This commit uses a CyclicBarrier to correctly and simply sychronize the
driver and test threads in
ClusterServiceIT#testClusterStateUpdateTasksAreExecutedInOrder.
This commit is contained in:
Jason Tedor 2016-01-06 14:20:30 -05:00
parent d1b4cf6778
commit 557b11cc2b
1 changed files with 14 additions and 18 deletions

View File

@ -51,9 +51,11 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -798,7 +800,7 @@ public class ClusterServiceIT extends ESIntegTestCase {
// test that for a single thread, tasks are executed in the order // test that for a single thread, tasks are executed in the order
// that they are submitted // that they are submitted
public void testClusterStateUpdateTasksAreExecutedInOrder() throws InterruptedException { public void testClusterStateUpdateTasksAreExecutedInOrder() throws BrokenBarrierException, InterruptedException {
Settings settings = settingsBuilder() Settings settings = settingsBuilder()
.put("discovery.type", "local") .put("discovery.type", "local")
.build(); .build();
@ -845,34 +847,28 @@ public class ClusterServiceIT extends ESIntegTestCase {
} }
}; };
CountDownLatch startGate = new CountDownLatch(1); CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
CountDownLatch endGate = new CountDownLatch(numberOfThreads);
AtomicBoolean interrupted = new AtomicBoolean();
for (int i = 0; i < numberOfThreads; i++) { for (int i = 0; i < numberOfThreads; i++) {
final int index = i; final int index = i;
Thread thread = new Thread(() -> { Thread thread = new Thread(() -> {
for (int j = 0; j < tasksSubmittedPerThread; j++) { try {
try { barrier.await();
try { for (int j = 0; j < tasksSubmittedPerThread; j++) {
startGate.await();
} catch (InterruptedException e) {
interrupted.set(true);
return;
}
clusterService.submitStateUpdateTask("[" + index + "][" + j + "]", j, ClusterStateTaskConfig.build(randomFrom(Priority.values())), executors[index], listener); clusterService.submitStateUpdateTask("[" + index + "][" + j + "]", j, ClusterStateTaskConfig.build(randomFrom(Priority.values())), executors[index], listener);
} finally {
endGate.countDown();
} }
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new AssertionError(e);
} }
}); });
thread.start(); thread.start();
} }
startGate.countDown(); // wait for all threads to be ready
endGate.await(); barrier.await();
// wait for all threads to finish
assertFalse(interrupted.get()); barrier.await();
updateLatch.await(); updateLatch.await();