Add test that cluster state update tasks are executed in order
This commit adds a test that ensures that cluster state update tasks are executed in order from the perspective of a single thread.
This commit is contained in:
parent
71d146b940
commit
270b08b302
|
@ -796,6 +796,100 @@ public class ClusterServiceIT extends ESIntegTestCase {
|
|||
assertTrue(published.get());
|
||||
}
|
||||
|
||||
// test that for a single thread, tasks are executed in the order
|
||||
// that they are submitted
|
||||
public void testClusterStateUpdateTasksAreExecutedInOrder() throws InterruptedException {
|
||||
Settings settings = settingsBuilder()
|
||||
.put("discovery.type", "local")
|
||||
.build();
|
||||
internalCluster().startNode(settings);
|
||||
ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
|
||||
|
||||
class TaskExecutor implements ClusterStateTaskExecutor<Integer> {
|
||||
int tracking = -1;
|
||||
|
||||
@Override
|
||||
public BatchResult<Integer> execute(ClusterState currentState, List<Integer> tasks) throws Exception {
|
||||
for (Integer task : tasks) {
|
||||
try {
|
||||
assertEquals("task was executed out of order", tracking + 1, (int)task);
|
||||
tracking++;
|
||||
} catch (AssertionError e) {
|
||||
return BatchResult.<Integer>builder().failures(tasks, e).build(currentState);
|
||||
}
|
||||
}
|
||||
return BatchResult.<Integer>builder().successes(tasks).build(ClusterState.builder(currentState).build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean runOnlyOnMaster() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
int numberOfThreads = randomIntBetween(2, 8);
|
||||
TaskExecutor[] executors = new TaskExecutor[numberOfThreads];
|
||||
for (int i = 0; i < numberOfThreads; i++) {
|
||||
executors[i] = new TaskExecutor();
|
||||
}
|
||||
|
||||
int tasksSubmittedPerThread = randomIntBetween(2, 1024);
|
||||
|
||||
AtomicBoolean failure = new AtomicBoolean();
|
||||
CountDownLatch updateLatch = new CountDownLatch(numberOfThreads * tasksSubmittedPerThread);
|
||||
|
||||
ClusterStateTaskListener listener = new ClusterStateTaskListener() {
|
||||
@Override
|
||||
public void onFailure(String source, Throwable t) {
|
||||
logger.debug("failure: [{}]", t, source);
|
||||
failure.set(true);
|
||||
updateLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
updateLatch.countDown();
|
||||
}
|
||||
};
|
||||
|
||||
CountDownLatch startGate = new CountDownLatch(1);
|
||||
CountDownLatch endGate = new CountDownLatch(numberOfThreads);
|
||||
AtomicBoolean interrupted = new AtomicBoolean();
|
||||
|
||||
for (int i = 0; i < numberOfThreads; i++) {
|
||||
final int index = i;
|
||||
Thread thread = new Thread(() -> {
|
||||
for (int j = 0; j < tasksSubmittedPerThread; j++) {
|
||||
try {
|
||||
try {
|
||||
startGate.await();
|
||||
} catch (InterruptedException e) {
|
||||
interrupted.set(true);
|
||||
return;
|
||||
}
|
||||
clusterService.submitStateUpdateTask("[" + index + "][" + j + "]", j, ClusterStateTaskConfig.build(randomFrom(Priority.values())), executors[index], listener);
|
||||
} finally {
|
||||
endGate.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
thread.start();
|
||||
}
|
||||
|
||||
startGate.countDown();
|
||||
endGate.await();
|
||||
|
||||
assertFalse(interrupted.get());
|
||||
|
||||
updateLatch.await();
|
||||
|
||||
assertFalse(failure.get());
|
||||
|
||||
for (int i = 0; i < numberOfThreads; i++) {
|
||||
assertEquals(tasksSubmittedPerThread - 1, executors[i].tracking);
|
||||
}
|
||||
}
|
||||
|
||||
public void testClusterStateBatchedUpdates() throws InterruptedException {
|
||||
Settings settings = settingsBuilder()
|
||||
.put("discovery.type", "local")
|
||||
|
|
Loading…
Reference in New Issue