From 7e14245c698b6206824fe320c8b64c384493d852 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 16 Dec 2015 18:43:31 -0500 Subject: [PATCH] Add callback for publication of new cluster state This commit adds a callback for a cluster state task executor that will be invoked if the execution of a batch of cluster state update tasks led to a new cluster state and that new cluster state was successfully published. Closes #15482 --- .../cluster/ClusterStateTaskExecutor.java | 7 +++++ .../service/InternalClusterService.java | 2 ++ .../cluster/ClusterServiceIT.java | 30 +++++++++++++++++-- 3 files changed, 36 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java b/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java index ab85d9540f0..fb22c2ca368 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java @@ -37,6 +37,13 @@ public interface ClusterStateTaskExecutor { return true; } + /** + * Callback invoked after new cluster state is published. Note that + * this method is not invoked if the cluster state was not updated. + */ + default void clusterStatePublished(ClusterState newClusterState) { + } + /** * Represents the result of a batched execution of cluster state update tasks * @param the type of the cluster state update task diff --git a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index b8c898a31e9..4888fc9d48e 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -605,6 +605,8 @@ public class InternalClusterService extends AbstractLifecycleComponent { private AtomicInteger counter = new AtomicInteger(); + private AtomicInteger batches = new AtomicInteger(); + private AtomicInteger published = new AtomicInteger(); @Override public BatchResult execute(ClusterState currentState, List tasks) throws Exception { tasks.forEach(task -> task.execute()); counter.addAndGet(tasks.size()); - return BatchResult.builder().successes(tasks).build(currentState); + ClusterState maybeUpdatedClusterState = currentState; + if (randomBoolean()) { + maybeUpdatedClusterState = ClusterState.builder(currentState).build(); + batches.incrementAndGet(); + } + return BatchResult.builder().successes(tasks).build(maybeUpdatedClusterState); } @Override public boolean runOnlyOnMaster() { return false; } + + @Override + public void clusterStatePublished(ClusterState newClusterState) { + published.incrementAndGet(); + } } int numberOfThreads = randomIntBetween(2, 8); int tasksSubmittedPerThread = randomIntBetween(1, 1024); @@ -838,6 +861,7 @@ public class ClusterServiceIT extends ESIntegTestCase { for (TaskExecutor executor : executors) { if (counts.containsKey(executor)) { assertEquals((int) counts.get(executor), executor.counter.get()); + assertEquals(executor.batches.get(), executor.published.get()); } }