Add callback for publication of new cluster state

This commit adds a callback for a cluster state task executor that will
be invoked if the execution of a batch of cluster state update tasks
led to a new cluster state and that new cluster state was successfully
published.

Closes #15482
This commit is contained in:
Jason Tedor 2015-12-16 18:43:31 -05:00
parent 12e241ff1a
commit 7e14245c69
3 changed files with 36 additions and 3 deletions

View File

@ -37,6 +37,13 @@ public interface ClusterStateTaskExecutor<T> {
return true; return true;
} }
/**
* Callback invoked after new cluster state is published. Note that
* this method is not invoked if the cluster state was not updated.
*/
default void clusterStatePublished(ClusterState newClusterState) {
}
/** /**
* Represents the result of a batched execution of cluster state update tasks * Represents the result of a batched execution of cluster state update tasks
* @param <T> the type of the cluster state update task * @param <T> the type of the cluster state update task

View File

@ -605,6 +605,8 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState); task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState);
} }
executor.clusterStatePublished(newClusterState);
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS))); TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
logger.debug("processing [{}]: took {} done applying updated cluster_state (version: {}, uuid: {})", source, executionTime, newClusterState.version(), newClusterState.stateUUID()); logger.debug("processing [{}]: took {} done applying updated cluster_state (version: {}, uuid: {})", source, executionTime, newClusterState.version(), newClusterState.stateUUID());
warnAboutSlowTaskIfNeeded(executionTime, source); warnAboutSlowTaskIfNeeded(executionTime, source);

View File

@ -43,7 +43,14 @@ import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.*; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -53,7 +60,11 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
/** /**
* *
@ -753,18 +764,30 @@ public class ClusterServiceIT extends ESIntegTestCase {
class TaskExecutor implements ClusterStateTaskExecutor<Task> { class TaskExecutor implements ClusterStateTaskExecutor<Task> {
private AtomicInteger counter = new AtomicInteger(); private AtomicInteger counter = new AtomicInteger();
private AtomicInteger batches = new AtomicInteger();
private AtomicInteger published = new AtomicInteger();
@Override @Override
public BatchResult<Task> execute(ClusterState currentState, List<Task> tasks) throws Exception { public BatchResult<Task> execute(ClusterState currentState, List<Task> tasks) throws Exception {
tasks.forEach(task -> task.execute()); tasks.forEach(task -> task.execute());
counter.addAndGet(tasks.size()); counter.addAndGet(tasks.size());
return BatchResult.<Task>builder().successes(tasks).build(currentState); ClusterState maybeUpdatedClusterState = currentState;
if (randomBoolean()) {
maybeUpdatedClusterState = ClusterState.builder(currentState).build();
batches.incrementAndGet();
}
return BatchResult.<Task>builder().successes(tasks).build(maybeUpdatedClusterState);
} }
@Override @Override
public boolean runOnlyOnMaster() { public boolean runOnlyOnMaster() {
return false; return false;
} }
@Override
public void clusterStatePublished(ClusterState newClusterState) {
published.incrementAndGet();
}
} }
int numberOfThreads = randomIntBetween(2, 8); int numberOfThreads = randomIntBetween(2, 8);
int tasksSubmittedPerThread = randomIntBetween(1, 1024); int tasksSubmittedPerThread = randomIntBetween(1, 1024);
@ -838,6 +861,7 @@ public class ClusterServiceIT extends ESIntegTestCase {
for (TaskExecutor executor : executors) { for (TaskExecutor executor : executors) {
if (counts.containsKey(executor)) { if (counts.containsKey(executor)) {
assertEquals((int) counts.get(executor), executor.counter.get()); assertEquals((int) counts.get(executor), executor.counter.get());
assertEquals(executor.batches.get(), executor.published.get());
} }
} }