From 270b08b302ba64aa742b740f86be833b26f79b10 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 29 Dec 2015 13:10:56 -0500 Subject: [PATCH] Add test that cluster state update tasks are executed in order This commit adds a test that ensures that cluster state update tasks are executed in order from the perspective of a single thread. --- .../cluster/ClusterServiceIT.java | 94 +++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java index 6e7e338d8b9..9e0b65b6f79 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java @@ -796,6 +796,100 @@ public class ClusterServiceIT extends ESIntegTestCase { assertTrue(published.get()); } + // test that for a single thread, tasks are executed in the order + // that they are submitted + public void testClusterStateUpdateTasksAreExecutedInOrder() throws InterruptedException { + Settings settings = settingsBuilder() + .put("discovery.type", "local") + .build(); + internalCluster().startNode(settings); + ClusterService clusterService = internalCluster().getInstance(ClusterService.class); + + class TaskExecutor implements ClusterStateTaskExecutor { + int tracking = -1; + + @Override + public BatchResult execute(ClusterState currentState, List tasks) throws Exception { + for (Integer task : tasks) { + try { + assertEquals("task was executed out of order", tracking + 1, (int)task); + tracking++; + } catch (AssertionError e) { + return BatchResult.builder().failures(tasks, e).build(currentState); + } + } + return BatchResult.builder().successes(tasks).build(ClusterState.builder(currentState).build()); + } + + @Override + public boolean runOnlyOnMaster() { + return false; + } + } + + int numberOfThreads = randomIntBetween(2, 8); + TaskExecutor[] executors = new TaskExecutor[numberOfThreads]; + for (int i = 0; i < numberOfThreads; i++) { + executors[i] = new TaskExecutor(); + } + + int tasksSubmittedPerThread = randomIntBetween(2, 1024); + + AtomicBoolean failure = new AtomicBoolean(); + CountDownLatch updateLatch = new CountDownLatch(numberOfThreads * tasksSubmittedPerThread); + + ClusterStateTaskListener listener = new ClusterStateTaskListener() { + @Override + public void onFailure(String source, Throwable t) { + logger.debug("failure: [{}]", t, source); + failure.set(true); + updateLatch.countDown(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + updateLatch.countDown(); + } + }; + + CountDownLatch startGate = new CountDownLatch(1); + CountDownLatch endGate = new CountDownLatch(numberOfThreads); + AtomicBoolean interrupted = new AtomicBoolean(); + + for (int i = 0; i < numberOfThreads; i++) { + final int index = i; + Thread thread = new Thread(() -> { + for (int j = 0; j < tasksSubmittedPerThread; j++) { + try { + try { + startGate.await(); + } catch (InterruptedException e) { + interrupted.set(true); + return; + } + clusterService.submitStateUpdateTask("[" + index + "][" + j + "]", j, ClusterStateTaskConfig.build(randomFrom(Priority.values())), executors[index], listener); + } finally { + endGate.countDown(); + } + } + }); + thread.start(); + } + + startGate.countDown(); + endGate.await(); + + assertFalse(interrupted.get()); + + updateLatch.await(); + + assertFalse(failure.get()); + + for (int i = 0; i < numberOfThreads; i++) { + assertEquals(tasksSubmittedPerThread - 1, executors[i].tracking); + } + } + public void testClusterStateBatchedUpdates() throws InterruptedException { Settings settings = settingsBuilder() .put("discovery.type", "local")