diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java b/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java index ab85d9540f0..fb22c2ca368 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java @@ -37,6 +37,13 @@ public interface ClusterStateTaskExecutor { return true; } + /** + * Callback invoked after new cluster state is published. Note that + * this method is not invoked if the cluster state was not updated. + */ + default void clusterStatePublished(ClusterState newClusterState) { + } + /** * Represents the result of a batched execution of cluster state update tasks * @param the type of the cluster state update task diff --git a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index b8c898a31e9..4888fc9d48e 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -605,6 +605,8 @@ public class InternalClusterService extends AbstractLifecycleComponent { private AtomicInteger counter = new AtomicInteger(); + private AtomicInteger batches = new AtomicInteger(); + private AtomicInteger published = new AtomicInteger(); @Override public BatchResult execute(ClusterState currentState, List tasks) throws Exception { tasks.forEach(task -> task.execute()); counter.addAndGet(tasks.size()); - return BatchResult.builder().successes(tasks).build(currentState); + ClusterState maybeUpdatedClusterState = currentState; + if (randomBoolean()) { + maybeUpdatedClusterState = ClusterState.builder(currentState).build(); + batches.incrementAndGet(); + } + return BatchResult.builder().successes(tasks).build(maybeUpdatedClusterState); } @Override public boolean runOnlyOnMaster() { return false; } + + @Override + public void clusterStatePublished(ClusterState newClusterState) { + published.incrementAndGet(); + } } int numberOfThreads = randomIntBetween(2, 8); int tasksSubmittedPerThread = randomIntBetween(1, 1024); @@ -838,6 +861,7 @@ public class ClusterServiceIT extends ESIntegTestCase { for (TaskExecutor executor : executors) { if (counts.containsKey(executor)) { assertEquals((int) counts.get(executor), executor.counter.get()); + assertEquals(executor.batches.get(), executor.published.get()); } }