Split main ClusterService method into smaller chunks #21666
Splits the main method in ClusterService into smaller chunks so that it's easier to understand and simpler to modify in subsequent PRs.
This commit is contained in:
parent
c521219b2f
commit
50e25912c8
|
@ -21,7 +21,6 @@ package org.elasticsearch.cluster;
|
|||
import java.util.IdentityHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public interface ClusterStateTaskExecutor<T> {
|
||||
/**
|
||||
|
@ -149,18 +148,5 @@ public interface ClusterStateTaskExecutor<T> {
|
|||
assert !isSuccess();
|
||||
return failure;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle the execution result with the provided consumers
|
||||
* @param onSuccess handler to invoke on success
|
||||
* @param onFailure handler to invoke on failure; the throwable passed through will not be null
|
||||
*/
|
||||
public void handle(Runnable onSuccess, Consumer<Exception> onFailure) {
|
||||
if (failure == null) {
|
||||
onSuccess.run();
|
||||
} else {
|
||||
onFailure.accept(failure);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.cluster.ClusterState.Builder;
|
|||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskConfig;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskExecutor.BatchResult;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskListener;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.LocalNodeMasterListener;
|
||||
|
@ -107,7 +108,7 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|||
|
||||
private TimeValue slowTaskLoggingThreshold;
|
||||
|
||||
private volatile PrioritizedEsThreadPoolExecutor updateTasksExecutor;
|
||||
private volatile PrioritizedEsThreadPoolExecutor threadPoolExecutor;
|
||||
|
||||
/**
|
||||
* Those 3 state listeners are changing infrequently - CopyOnWriteArrayList is just fine
|
||||
|
@ -240,7 +241,7 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|||
updateState(css -> new ClusterServiceState(
|
||||
ClusterState.builder(css.getClusterState()).blocks(initialBlocks).build(),
|
||||
css.getClusterStateStatus()));
|
||||
this.updateTasksExecutor = EsExecutors.newSinglePrioritizing(UPDATE_THREAD_NAME, daemonThreadFactory(settings, UPDATE_THREAD_NAME),
|
||||
this.threadPoolExecutor = EsExecutors.newSinglePrioritizing(UPDATE_THREAD_NAME, daemonThreadFactory(settings, UPDATE_THREAD_NAME),
|
||||
threadPool.getThreadContext());
|
||||
}
|
||||
|
||||
|
@ -255,12 +256,12 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|||
logger.debug("failed to notify listeners on shutdown", ex);
|
||||
}
|
||||
}
|
||||
ThreadPool.terminate(updateTasksExecutor, 10, TimeUnit.SECONDS);
|
||||
ThreadPool.terminate(threadPoolExecutor, 10, TimeUnit.SECONDS);
|
||||
// close timeout listeners that did not have an ongoing timeout
|
||||
postAppliedListeners
|
||||
.stream()
|
||||
.filter(listener -> listener instanceof TimeoutClusterStateListener)
|
||||
.map(listener -> (TimeoutClusterStateListener)listener)
|
||||
.map(listener -> (TimeoutClusterStateListener) listener)
|
||||
.forEach(TimeoutClusterStateListener::onClose);
|
||||
remove(localNodeMasterListeners);
|
||||
}
|
||||
|
@ -364,7 +365,7 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|||
}
|
||||
// call the post added notification on the same event thread
|
||||
try {
|
||||
updateTasksExecutor.execute(new SourcePrioritizedRunnable(Priority.HIGH, "_add_listener_") {
|
||||
threadPoolExecutor.execute(new SourcePrioritizedRunnable(Priority.HIGH, "_add_listener_") {
|
||||
@Override
|
||||
public void run() {
|
||||
if (timeout != null) {
|
||||
|
@ -448,54 +449,33 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|||
return;
|
||||
}
|
||||
try {
|
||||
@SuppressWarnings("unchecked")
|
||||
ClusterStateTaskExecutor<Object> taskExecutor = (ClusterStateTaskExecutor<Object>) executor;
|
||||
// convert to an identity map to check for dups based on update tasks semantics of using identity instead of equal
|
||||
final IdentityHashMap<T, ClusterStateTaskListener> tasksIdentity = new IdentityHashMap<>(tasks);
|
||||
final List<UpdateTask<T>> updateTasks = tasksIdentity.entrySet().stream().map(
|
||||
entry -> new UpdateTask<>(source, entry.getKey(), config.priority(), executor, safe(entry.getValue(), logger))
|
||||
final IdentityHashMap<Object, ClusterStateTaskListener> tasksIdentity = new IdentityHashMap<>(tasks);
|
||||
final List<UpdateTask> updateTasks = tasksIdentity.entrySet().stream().map(
|
||||
entry -> new UpdateTask(source, entry.getKey(), config.priority(), taskExecutor, safe(entry.getValue(), logger))
|
||||
).collect(Collectors.toList());
|
||||
|
||||
synchronized (updateTasksPerExecutor) {
|
||||
LinkedHashSet<UpdateTask> existingTasks = updateTasksPerExecutor.computeIfAbsent(executor,
|
||||
k -> new LinkedHashSet<>(updateTasks.size()));
|
||||
for (@SuppressWarnings("unchecked") UpdateTask<T> existing : existingTasks) {
|
||||
for (UpdateTask existing : existingTasks) {
|
||||
if (tasksIdentity.containsKey(existing.task)) {
|
||||
throw new IllegalStateException("task [" + executor.describeTasks(Collections.singletonList(existing.task)) +
|
||||
throw new IllegalStateException("task [" + taskExecutor.describeTasks(Collections.singletonList(existing.task)) +
|
||||
"] with source [" + source + "] is already queued");
|
||||
}
|
||||
}
|
||||
existingTasks.addAll(updateTasks);
|
||||
}
|
||||
|
||||
final UpdateTask<T> firstTask = updateTasks.get(0);
|
||||
final UpdateTask firstTask = updateTasks.get(0);
|
||||
|
||||
final TimeValue timeout = config.timeout();
|
||||
if (timeout != null) {
|
||||
updateTasksExecutor.execute(firstTask, threadPool.scheduler(), timeout, () -> threadPool.generic().execute(() -> {
|
||||
final ArrayList<UpdateTask<T>> toRemove = new ArrayList<>();
|
||||
for (UpdateTask<T> task : updateTasks) {
|
||||
if (task.processed.getAndSet(true) == false) {
|
||||
logger.debug("cluster state update task [{}] timed out after [{}]", source, timeout);
|
||||
toRemove.add(task);
|
||||
}
|
||||
}
|
||||
if (toRemove.isEmpty() == false) {
|
||||
ClusterStateTaskExecutor<T> clusterStateTaskExecutor = toRemove.get(0).executor;
|
||||
synchronized (updateTasksPerExecutor) {
|
||||
LinkedHashSet<UpdateTask> existingTasks = updateTasksPerExecutor.get(clusterStateTaskExecutor);
|
||||
if (existingTasks != null) {
|
||||
existingTasks.removeAll(toRemove);
|
||||
if (existingTasks.isEmpty()) {
|
||||
updateTasksPerExecutor.remove(clusterStateTaskExecutor);
|
||||
}
|
||||
}
|
||||
}
|
||||
for (UpdateTask<T> task : toRemove) {
|
||||
task.listener.onFailure(source, new ProcessClusterEventTimeoutException(timeout, source));
|
||||
}
|
||||
}
|
||||
}));
|
||||
threadPoolExecutor.execute(firstTask, threadPool.scheduler(), timeout, () -> onTimeout(updateTasks, source, timeout));
|
||||
} else {
|
||||
updateTasksExecutor.execute(firstTask);
|
||||
threadPoolExecutor.execute(firstTask);
|
||||
}
|
||||
} catch (EsRejectedExecutionException e) {
|
||||
// ignore cases where we are shutting down..., there is really nothing interesting
|
||||
|
@ -506,11 +486,38 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|||
}
|
||||
}
|
||||
|
||||
private void onTimeout(List<UpdateTask> updateTasks, String source, TimeValue timeout) {
|
||||
threadPool.generic().execute(() -> {
|
||||
final ArrayList<UpdateTask> toRemove = new ArrayList<>();
|
||||
for (UpdateTask task : updateTasks) {
|
||||
if (task.processed.getAndSet(true) == false) {
|
||||
logger.debug("cluster state update task [{}] timed out after [{}]", source, timeout);
|
||||
toRemove.add(task);
|
||||
}
|
||||
}
|
||||
if (toRemove.isEmpty() == false) {
|
||||
ClusterStateTaskExecutor<Object> clusterStateTaskExecutor = toRemove.get(0).executor;
|
||||
synchronized (updateTasksPerExecutor) {
|
||||
LinkedHashSet<UpdateTask> existingTasks = updateTasksPerExecutor.get(clusterStateTaskExecutor);
|
||||
if (existingTasks != null) {
|
||||
existingTasks.removeAll(toRemove);
|
||||
if (existingTasks.isEmpty()) {
|
||||
updateTasksPerExecutor.remove(clusterStateTaskExecutor);
|
||||
}
|
||||
}
|
||||
}
|
||||
for (UpdateTask task : toRemove) {
|
||||
task.listener.onFailure(source, new ProcessClusterEventTimeoutException(timeout, source));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the tasks that are pending.
|
||||
*/
|
||||
public List<PendingClusterTask> pendingTasks() {
|
||||
PrioritizedEsThreadPoolExecutor.Pending[] pendings = updateTasksExecutor.getPending();
|
||||
PrioritizedEsThreadPoolExecutor.Pending[] pendings = threadPoolExecutor.getPending();
|
||||
List<PendingClusterTask> pendingClusterTasks = new ArrayList<>(pendings.length);
|
||||
for (PrioritizedEsThreadPoolExecutor.Pending pending : pendings) {
|
||||
final String source;
|
||||
|
@ -539,7 +546,7 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|||
* Returns the number of currently pending tasks.
|
||||
*/
|
||||
public int numberOfPendingTasks() {
|
||||
return updateTasksExecutor.getNumberOfPendingTasks();
|
||||
return threadPoolExecutor.getNumberOfPendingTasks();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -548,7 +555,7 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|||
* @return A zero time value if the queue is empty, otherwise the time value oldest task waiting in the queue
|
||||
*/
|
||||
public TimeValue getMaxTaskWaitTime() {
|
||||
return updateTasksExecutor.getMaxTaskWaitTime();
|
||||
return threadPoolExecutor.getMaxTaskWaitTime();
|
||||
}
|
||||
|
||||
/** asserts that the current thread is the cluster state update thread */
|
||||
|
@ -582,47 +589,90 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|||
}
|
||||
}
|
||||
|
||||
<T> void runTasksForExecutor(ClusterStateTaskExecutor<T> executor) {
|
||||
final ArrayList<UpdateTask<T>> toExecute = new ArrayList<>();
|
||||
final Map<String, ArrayList<T>> processTasksBySource = new HashMap<>();
|
||||
synchronized (updateTasksPerExecutor) {
|
||||
LinkedHashSet<UpdateTask> pending = updateTasksPerExecutor.remove(executor);
|
||||
if (pending != null) {
|
||||
for (UpdateTask<T> task : pending) {
|
||||
if (task.processed.getAndSet(true) == false) {
|
||||
logger.trace("will process {}", task);
|
||||
toExecute.add(task);
|
||||
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task.task);
|
||||
} else {
|
||||
logger.trace("skipping {}, already processed", task);
|
||||
}
|
||||
}
|
||||
void runTasks(TaskInputs taskInputs) {
|
||||
if (!lifecycle.started()) {
|
||||
logger.debug("processing [{}]: ignoring, cluster service not started", taskInputs.summary);
|
||||
return;
|
||||
}
|
||||
|
||||
logger.debug("processing [{}]: execute", taskInputs.summary);
|
||||
ClusterServiceState previousClusterServiceState = clusterServiceState();
|
||||
ClusterState previousClusterState = previousClusterServiceState.getClusterState();
|
||||
|
||||
if (!previousClusterState.nodes().isLocalNodeElectedMaster() && taskInputs.runOnlyOnMaster()) {
|
||||
logger.debug("failing [{}]: local node is no longer master", taskInputs.summary);
|
||||
taskInputs.onNoLongerMaster();
|
||||
return;
|
||||
}
|
||||
|
||||
long startTimeNS = currentTimeInNanos();
|
||||
TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterServiceState, startTimeNS);
|
||||
taskOutputs.notifyFailedTasks();
|
||||
|
||||
if (taskOutputs.clusterStateUnchanged()) {
|
||||
taskOutputs.notifySuccessfulTasksOnUnchangedClusterState();
|
||||
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
|
||||
logger.debug("processing [{}]: took [{}] no change in cluster_state", taskInputs.summary, executionTime);
|
||||
warnAboutSlowTaskIfNeeded(executionTime, taskInputs.summary);
|
||||
} else {
|
||||
ClusterState newClusterState = taskOutputs.newClusterServiceState.getClusterState();
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("cluster state updated, source [{}]\n{}", taskInputs.summary, newClusterState);
|
||||
} else if (logger.isDebugEnabled()) {
|
||||
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), taskInputs.summary);
|
||||
}
|
||||
try {
|
||||
publishAndApplyChanges(taskInputs, taskOutputs);
|
||||
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
|
||||
logger.debug("processing [{}]: took [{}] done applying updated cluster_state (version: {}, uuid: {})", taskInputs.summary,
|
||||
executionTime, newClusterState.version(), newClusterState.stateUUID());
|
||||
warnAboutSlowTaskIfNeeded(executionTime, taskInputs.summary);
|
||||
} catch (Exception e) {
|
||||
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
|
||||
final long version = newClusterState.version();
|
||||
final String stateUUID = newClusterState.stateUUID();
|
||||
final String fullState = newClusterState.toString();
|
||||
logger.warn(
|
||||
(Supplier<?>) () -> new ParameterizedMessage(
|
||||
"failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}",
|
||||
executionTime,
|
||||
version,
|
||||
stateUUID,
|
||||
taskInputs.summary,
|
||||
fullState),
|
||||
e);
|
||||
// TODO: do we want to call updateTask.onFailure here?
|
||||
}
|
||||
}
|
||||
if (toExecute.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
final String tasksSummary = processTasksBySource.entrySet().stream().map(entry -> {
|
||||
String tasks = executor.describeTasks(entry.getValue());
|
||||
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
|
||||
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
|
||||
}
|
||||
|
||||
if (!lifecycle.started()) {
|
||||
logger.debug("processing [{}]: ignoring, cluster_service not started", tasksSummary);
|
||||
return;
|
||||
public TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterServiceState previousClusterServiceState, long startTimeNS) {
|
||||
ClusterState previousClusterState = previousClusterServiceState.getClusterState();
|
||||
BatchResult<Object> batchResult = executeTasks(taskInputs, startTimeNS, previousClusterState);
|
||||
ClusterState newClusterState = batchResult.resultingState;
|
||||
// extract those that are waiting for results
|
||||
List<UpdateTask> nonFailedTasks = new ArrayList<>();
|
||||
for (UpdateTask updateTask : taskInputs.updateTasks) {
|
||||
assert batchResult.executionResults.containsKey(updateTask.task) : "missing " + updateTask;
|
||||
final ClusterStateTaskExecutor.TaskResult taskResult =
|
||||
batchResult.executionResults.get(updateTask.task);
|
||||
if (taskResult.isSuccess()) {
|
||||
nonFailedTasks.add(updateTask);
|
||||
}
|
||||
}
|
||||
logger.debug("processing [{}]: execute", tasksSummary);
|
||||
ClusterState previousClusterState = clusterServiceState().getClusterState();
|
||||
if (!previousClusterState.nodes().isLocalNodeElectedMaster() && executor.runOnlyOnMaster()) {
|
||||
logger.debug("failing [{}]: local node is no longer master", tasksSummary);
|
||||
toExecute.stream().forEach(task -> task.listener.onNoLongerMaster(task.source));
|
||||
return;
|
||||
}
|
||||
ClusterStateTaskExecutor.BatchResult<T> batchResult;
|
||||
long startTimeNS = currentTimeInNanos();
|
||||
newClusterState = patchVersions(previousClusterState, newClusterState);
|
||||
|
||||
ClusterServiceState newClusterServiceState = new ClusterServiceState(newClusterState, ClusterStateStatus.BEING_APPLIED);
|
||||
|
||||
return new TaskOutputs(taskInputs, previousClusterServiceState, newClusterServiceState, nonFailedTasks,
|
||||
batchResult.executionResults);
|
||||
}
|
||||
|
||||
private BatchResult<Object> executeTasks(TaskInputs taskInputs, long startTimeNS, ClusterState previousClusterState) {
|
||||
BatchResult<Object> batchResult;
|
||||
try {
|
||||
List<T> inputs = toExecute.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList());
|
||||
batchResult = executor.execute(previousClusterState, inputs);
|
||||
List<Object> inputs = taskInputs.updateTasks.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList());
|
||||
batchResult = taskInputs.executor.execute(previousClusterState, inputs);
|
||||
} catch (Exception e) {
|
||||
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
|
||||
if (logger.isTraceEnabled()) {
|
||||
|
@ -631,65 +681,36 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|||
"failed to execute cluster state update in [{}], state:\nversion [{}], source [{}]\n{}{}{}",
|
||||
executionTime,
|
||||
previousClusterState.version(),
|
||||
tasksSummary,
|
||||
taskInputs.summary,
|
||||
previousClusterState.nodes(),
|
||||
previousClusterState.routingTable(),
|
||||
previousClusterState.getRoutingNodes()),
|
||||
e);
|
||||
}
|
||||
warnAboutSlowTaskIfNeeded(executionTime, tasksSummary);
|
||||
batchResult = ClusterStateTaskExecutor.BatchResult.<T>builder()
|
||||
.failures(toExecute.stream().map(updateTask -> updateTask.task)::iterator, e)
|
||||
.build(previousClusterState);
|
||||
warnAboutSlowTaskIfNeeded(executionTime, taskInputs.summary);
|
||||
batchResult = BatchResult.builder()
|
||||
.failures(taskInputs.updateTasks.stream().map(updateTask -> updateTask.task)::iterator, e)
|
||||
.build(previousClusterState);
|
||||
}
|
||||
|
||||
assert batchResult.executionResults != null;
|
||||
assert batchResult.executionResults.size() == toExecute.size()
|
||||
: String.format(Locale.ROOT, "expected [%d] task result%s but was [%d]", toExecute.size(),
|
||||
toExecute.size() == 1 ? "" : "s", batchResult.executionResults.size());
|
||||
assert batchResult.executionResults.size() == taskInputs.updateTasks.size()
|
||||
: String.format(Locale.ROOT, "expected [%d] task result%s but was [%d]", taskInputs.updateTasks.size(),
|
||||
taskInputs.updateTasks.size() == 1 ? "" : "s", batchResult.executionResults.size());
|
||||
boolean assertsEnabled = false;
|
||||
assert (assertsEnabled = true);
|
||||
if (assertsEnabled) {
|
||||
for (UpdateTask<T> updateTask : toExecute) {
|
||||
for (UpdateTask updateTask : taskInputs.updateTasks) {
|
||||
assert batchResult.executionResults.containsKey(updateTask.task) :
|
||||
"missing task result for " + updateTask;
|
||||
}
|
||||
}
|
||||
|
||||
ClusterState newClusterState = batchResult.resultingState;
|
||||
final ArrayList<UpdateTask<T>> proccessedListeners = new ArrayList<>();
|
||||
// fail all tasks that have failed and extract those that are waiting for results
|
||||
for (UpdateTask<T> updateTask : toExecute) {
|
||||
assert batchResult.executionResults.containsKey(updateTask.task) : "missing " + updateTask;
|
||||
final ClusterStateTaskExecutor.TaskResult executionResult =
|
||||
batchResult.executionResults.get(updateTask.task);
|
||||
executionResult.handle(
|
||||
() -> proccessedListeners.add(updateTask),
|
||||
ex -> {
|
||||
logger.debug(
|
||||
(Supplier<?>)
|
||||
() -> new ParameterizedMessage("cluster state update task {} failed", updateTask), ex);
|
||||
updateTask.listener.onFailure(updateTask.source, ex);
|
||||
}
|
||||
);
|
||||
}
|
||||
return batchResult;
|
||||
}
|
||||
|
||||
if (previousClusterState == newClusterState) {
|
||||
for (UpdateTask<T> task : proccessedListeners) {
|
||||
if (task.listener instanceof AckedClusterStateTaskListener) {
|
||||
//no need to wait for ack if nothing changed, the update can be counted as acknowledged
|
||||
((AckedClusterStateTaskListener) task.listener).onAllNodesAcked(null);
|
||||
}
|
||||
task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState);
|
||||
}
|
||||
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
|
||||
logger.debug("processing [{}]: took [{}] no change in cluster_state", tasksSummary, executionTime);
|
||||
warnAboutSlowTaskIfNeeded(executionTime, tasksSummary);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
ArrayList<Discovery.AckListener> ackListeners = new ArrayList<>();
|
||||
private ClusterState patchVersions(ClusterState previousClusterState, ClusterState newClusterState) {
|
||||
if (previousClusterState != newClusterState) {
|
||||
if (newClusterState.nodes().isLocalNodeElectedMaster()) {
|
||||
// only the master controls the version numbers
|
||||
Builder builder = ClusterState.builder(newClusterState).incrementVersion();
|
||||
|
@ -701,152 +722,221 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|||
builder.metaData(MetaData.builder(newClusterState.metaData()).version(newClusterState.metaData().version() + 1));
|
||||
}
|
||||
newClusterState = builder.build();
|
||||
for (UpdateTask<T> task : proccessedListeners) {
|
||||
if (task.listener instanceof AckedClusterStateTaskListener) {
|
||||
final AckedClusterStateTaskListener ackedListener = (AckedClusterStateTaskListener) task.listener;
|
||||
if (ackedListener.ackTimeout() == null || ackedListener.ackTimeout().millis() == 0) {
|
||||
ackedListener.onAckTimeout();
|
||||
} else {
|
||||
try {
|
||||
ackListeners.add(new AckCountDownListener(ackedListener, newClusterState.version(), newClusterState.nodes(),
|
||||
threadPool));
|
||||
} catch (EsRejectedExecutionException ex) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Couldn't schedule timeout thread - node might be shutting down", ex);
|
||||
}
|
||||
//timeout straightaway, otherwise we could wait forever as the timeout thread has not started
|
||||
ackedListener.onAckTimeout();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
final Discovery.AckListener ackListener = new DelegetingAckListener(ackListeners);
|
||||
}
|
||||
return newClusterState;
|
||||
}
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("cluster state updated, source [{}]\n{}", tasksSummary, newClusterState);
|
||||
} else if (logger.isDebugEnabled()) {
|
||||
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), tasksSummary);
|
||||
private void publishAndApplyChanges(TaskInputs taskInputs, TaskOutputs taskOutputs) {
|
||||
ClusterState previousClusterState = taskOutputs.previousClusterServiceState.getClusterState();
|
||||
ClusterState newClusterState = taskOutputs.newClusterServiceState.getClusterState();
|
||||
|
||||
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(taskInputs.summary, newClusterState, previousClusterState);
|
||||
// new cluster state, notify all listeners
|
||||
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
|
||||
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
|
||||
String summary = nodesDelta.shortSummary();
|
||||
if (summary.length() > 0) {
|
||||
logger.info("{}, reason: {}", summary, taskInputs.summary);
|
||||
}
|
||||
|
||||
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(tasksSummary, newClusterState, previousClusterState);
|
||||
// new cluster state, notify all listeners
|
||||
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
|
||||
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
|
||||
String summary = nodesDelta.shortSummary();
|
||||
if (summary.length() > 0) {
|
||||
logger.info("{}, reason: {}", summary, tasksSummary);
|
||||
}
|
||||
}
|
||||
|
||||
nodeConnectionsService.connectToNodes(clusterChangedEvent.nodesDelta().addedNodes());
|
||||
|
||||
// if we are the master, publish the new state to all nodes
|
||||
// we publish here before we send a notification to all the listeners, since if it fails
|
||||
// we don't want to notify
|
||||
if (newClusterState.nodes().isLocalNodeElectedMaster()) {
|
||||
logger.debug("publishing cluster state version [{}]", newClusterState.version());
|
||||
try {
|
||||
clusterStatePublisher.accept(clusterChangedEvent, ackListener);
|
||||
} catch (Discovery.FailedToCommitClusterStateException t) {
|
||||
final long version = newClusterState.version();
|
||||
logger.warn(
|
||||
(Supplier<?>) () -> new ParameterizedMessage(
|
||||
"failing [{}]: failed to commit cluster state version [{}]", tasksSummary, version),
|
||||
t);
|
||||
// ensure that list of connected nodes in NodeConnectionsService is in-sync with the nodes of the current cluster state
|
||||
nodeConnectionsService.disconnectFromNodes(clusterChangedEvent.nodesDelta().addedNodes());
|
||||
proccessedListeners.forEach(task -> task.listener.onFailure(task.source, t));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// update the current cluster state
|
||||
ClusterState finalNewClusterState = newClusterState;
|
||||
updateState(css -> new ClusterServiceState(finalNewClusterState, ClusterStateStatus.BEING_APPLIED));
|
||||
logger.debug("set local cluster state to version {}", newClusterState.version());
|
||||
try {
|
||||
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
|
||||
if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) {
|
||||
final Settings incomingSettings = clusterChangedEvent.state().metaData().settings();
|
||||
clusterSettings.applySettings(incomingSettings);
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
logger.warn("failed to apply cluster settings", ex);
|
||||
}
|
||||
for (ClusterStateListener listener : preAppliedListeners) {
|
||||
try {
|
||||
logger.trace("calling [{}] with change to version [{}]", listener, newClusterState.version());
|
||||
listener.clusterChanged(clusterChangedEvent);
|
||||
} catch (Exception ex) {
|
||||
logger.warn("failed to notify ClusterStateListener", ex);
|
||||
}
|
||||
}
|
||||
|
||||
nodeConnectionsService.disconnectFromNodes(clusterChangedEvent.nodesDelta().removedNodes());
|
||||
|
||||
updateState(css -> new ClusterServiceState(css.getClusterState(), ClusterStateStatus.APPLIED));
|
||||
|
||||
for (ClusterStateListener listener : postAppliedListeners) {
|
||||
try {
|
||||
logger.trace("calling [{}] with change to version [{}]", listener, newClusterState.version());
|
||||
listener.clusterChanged(clusterChangedEvent);
|
||||
} catch (Exception ex) {
|
||||
logger.warn("failed to notify ClusterStateListener", ex);
|
||||
}
|
||||
}
|
||||
|
||||
//manual ack only from the master at the end of the publish
|
||||
if (newClusterState.nodes().isLocalNodeElectedMaster()) {
|
||||
try {
|
||||
ackListener.onNodeAck(newClusterState.nodes().getLocalNode(), null);
|
||||
} catch (Exception e) {
|
||||
final DiscoveryNode localNode = newClusterState.nodes().getLocalNode();
|
||||
logger.debug(
|
||||
(Supplier<?>) () -> new ParameterizedMessage("error while processing ack for master node [{}]", localNode),
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
for (UpdateTask<T> task : proccessedListeners) {
|
||||
task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState);
|
||||
}
|
||||
|
||||
try {
|
||||
executor.clusterStatePublished(clusterChangedEvent);
|
||||
} catch (Exception e) {
|
||||
logger.error(
|
||||
(Supplier<?>) () -> new ParameterizedMessage(
|
||||
"exception thrown while notifying executor of new cluster state publication [{}]",
|
||||
tasksSummary),
|
||||
e);
|
||||
}
|
||||
|
||||
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
|
||||
logger.debug("processing [{}]: took [{}] done applying updated cluster_state (version: {}, uuid: {})", tasksSummary,
|
||||
executionTime, newClusterState.version(), newClusterState.stateUUID());
|
||||
warnAboutSlowTaskIfNeeded(executionTime, tasksSummary);
|
||||
} catch (Exception e) {
|
||||
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
|
||||
final long version = newClusterState.version();
|
||||
final String stateUUID = newClusterState.stateUUID();
|
||||
final String fullState = newClusterState.toString();
|
||||
logger.warn(
|
||||
(Supplier<?>) () -> new ParameterizedMessage(
|
||||
"failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}",
|
||||
executionTime,
|
||||
version,
|
||||
stateUUID,
|
||||
tasksSummary,
|
||||
fullState),
|
||||
e);
|
||||
// TODO: do we want to call updateTask.onFailure here?
|
||||
}
|
||||
|
||||
final Discovery.AckListener ackListener = newClusterState.nodes().isLocalNodeElectedMaster() ?
|
||||
taskOutputs.createAckListener(threadPool, newClusterState) :
|
||||
null;
|
||||
|
||||
nodeConnectionsService.connectToNodes(clusterChangedEvent.nodesDelta().addedNodes());
|
||||
|
||||
// if we are the master, publish the new state to all nodes
|
||||
// we publish here before we send a notification to all the listeners, since if it fails
|
||||
// we don't want to notify
|
||||
if (newClusterState.nodes().isLocalNodeElectedMaster()) {
|
||||
logger.debug("publishing cluster state version [{}]", newClusterState.version());
|
||||
try {
|
||||
clusterStatePublisher.accept(clusterChangedEvent, ackListener);
|
||||
} catch (Discovery.FailedToCommitClusterStateException t) {
|
||||
final long version = newClusterState.version();
|
||||
logger.warn(
|
||||
(Supplier<?>) () -> new ParameterizedMessage(
|
||||
"failing [{}]: failed to commit cluster state version [{}]", taskInputs.summary, version),
|
||||
t);
|
||||
// ensure that list of connected nodes in NodeConnectionsService is in-sync with the nodes of the current cluster state
|
||||
nodeConnectionsService.disconnectFromNodes(clusterChangedEvent.nodesDelta().addedNodes());
|
||||
taskOutputs.publishingFailed(t);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// update the current cluster state
|
||||
updateState(css -> taskOutputs.newClusterServiceState);
|
||||
logger.debug("set local cluster state to version {}", newClusterState.version());
|
||||
try {
|
||||
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
|
||||
if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) {
|
||||
final Settings incomingSettings = clusterChangedEvent.state().metaData().settings();
|
||||
clusterSettings.applySettings(incomingSettings);
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
logger.warn("failed to apply cluster settings", ex);
|
||||
}
|
||||
for (ClusterStateListener listener : preAppliedListeners) {
|
||||
try {
|
||||
logger.trace("calling [{}] with change to version [{}]", listener, newClusterState.version());
|
||||
listener.clusterChanged(clusterChangedEvent);
|
||||
} catch (Exception ex) {
|
||||
logger.warn("failed to notify ClusterStateListener", ex);
|
||||
}
|
||||
}
|
||||
|
||||
nodeConnectionsService.disconnectFromNodes(clusterChangedEvent.nodesDelta().removedNodes());
|
||||
|
||||
updateState(css -> new ClusterServiceState(css.getClusterState(), ClusterStateStatus.APPLIED));
|
||||
|
||||
for (ClusterStateListener listener : postAppliedListeners) {
|
||||
try {
|
||||
logger.trace("calling [{}] with change to version [{}]", listener, newClusterState.version());
|
||||
listener.clusterChanged(clusterChangedEvent);
|
||||
} catch (Exception ex) {
|
||||
logger.warn("failed to notify ClusterStateListener", ex);
|
||||
}
|
||||
}
|
||||
|
||||
//manual ack only from the master at the end of the publish
|
||||
if (newClusterState.nodes().isLocalNodeElectedMaster()) {
|
||||
try {
|
||||
ackListener.onNodeAck(newClusterState.nodes().getLocalNode(), null);
|
||||
} catch (Exception e) {
|
||||
final DiscoveryNode localNode = newClusterState.nodes().getLocalNode();
|
||||
logger.debug(
|
||||
(Supplier<?>) () -> new ParameterizedMessage("error while processing ack for master node [{}]", localNode),
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
taskOutputs.processedDifferentClusterState(previousClusterState, newClusterState);
|
||||
|
||||
try {
|
||||
taskOutputs.clusterStatePublished(clusterChangedEvent);
|
||||
} catch (Exception e) {
|
||||
logger.error(
|
||||
(Supplier<?>) () -> new ParameterizedMessage(
|
||||
"exception thrown while notifying executor of new cluster state publication [{}]",
|
||||
taskInputs.summary),
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a set of tasks to be processed together with their executor
|
||||
*/
|
||||
class TaskInputs {
|
||||
public final String summary;
|
||||
public final ArrayList<UpdateTask> updateTasks;
|
||||
public final ClusterStateTaskExecutor<Object> executor;
|
||||
|
||||
TaskInputs(ClusterStateTaskExecutor<Object> executor, ArrayList<UpdateTask> updateTasks, String summary) {
|
||||
this.summary = summary;
|
||||
this.executor = executor;
|
||||
this.updateTasks = updateTasks;
|
||||
}
|
||||
|
||||
public boolean runOnlyOnMaster() {
|
||||
return executor.runOnlyOnMaster();
|
||||
}
|
||||
|
||||
public void onNoLongerMaster() {
|
||||
updateTasks.stream().forEach(task -> task.listener.onNoLongerMaster(task.source));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Output created by executing a set of tasks provided as TaskInputs
|
||||
*/
|
||||
class TaskOutputs {
|
||||
public final TaskInputs taskInputs;
|
||||
public final ClusterServiceState previousClusterServiceState;
|
||||
public final ClusterServiceState newClusterServiceState;
|
||||
public final List<UpdateTask> nonFailedTasks;
|
||||
public final Map<Object, ClusterStateTaskExecutor.TaskResult> executionResults;
|
||||
|
||||
public TaskOutputs(TaskInputs taskInputs, ClusterServiceState previousClusterServiceState,
|
||||
ClusterServiceState newClusterServiceState, List<UpdateTask> nonFailedTasks,
|
||||
Map<Object, ClusterStateTaskExecutor.TaskResult> executionResults) {
|
||||
this.taskInputs = taskInputs;
|
||||
this.previousClusterServiceState = previousClusterServiceState;
|
||||
this.newClusterServiceState = newClusterServiceState;
|
||||
this.nonFailedTasks = nonFailedTasks;
|
||||
this.executionResults = executionResults;
|
||||
}
|
||||
|
||||
public void publishingFailed(Discovery.FailedToCommitClusterStateException t) {
|
||||
nonFailedTasks.forEach(task -> task.listener.onFailure(task.source, t));
|
||||
}
|
||||
|
||||
public void processedDifferentClusterState(ClusterState previousClusterState, ClusterState newClusterState) {
|
||||
nonFailedTasks.forEach(task -> task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState));
|
||||
}
|
||||
|
||||
public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
|
||||
taskInputs.executor.clusterStatePublished(clusterChangedEvent);
|
||||
}
|
||||
|
||||
public Discovery.AckListener createAckListener(ThreadPool threadPool, ClusterState newClusterState) {
|
||||
ArrayList<Discovery.AckListener> ackListeners = new ArrayList<>();
|
||||
|
||||
//timeout straightaway, otherwise we could wait forever as the timeout thread has not started
|
||||
nonFailedTasks.stream().filter(task -> task.listener instanceof AckedClusterStateTaskListener).forEach(task -> {
|
||||
final AckedClusterStateTaskListener ackedListener = (AckedClusterStateTaskListener) task.listener;
|
||||
if (ackedListener.ackTimeout() == null || ackedListener.ackTimeout().millis() == 0) {
|
||||
ackedListener.onAckTimeout();
|
||||
} else {
|
||||
try {
|
||||
ackListeners.add(new AckCountDownListener(ackedListener, newClusterState.version(), newClusterState.nodes(),
|
||||
threadPool));
|
||||
} catch (EsRejectedExecutionException ex) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Couldn't schedule timeout thread - node might be shutting down", ex);
|
||||
}
|
||||
//timeout straightaway, otherwise we could wait forever as the timeout thread has not started
|
||||
ackedListener.onAckTimeout();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return new DelegetingAckListener(ackListeners);
|
||||
}
|
||||
|
||||
public boolean clusterStateUnchanged() {
|
||||
return previousClusterServiceState.getClusterState() == newClusterServiceState.getClusterState();
|
||||
}
|
||||
|
||||
public void notifyFailedTasks() {
|
||||
// fail all tasks that have failed
|
||||
for (UpdateTask updateTask : taskInputs.updateTasks) {
|
||||
assert executionResults.containsKey(updateTask.task) : "missing " + updateTask;
|
||||
final ClusterStateTaskExecutor.TaskResult taskResult = executionResults.get(updateTask.task);
|
||||
if (taskResult.isSuccess() == false) {
|
||||
updateTask.listener.onFailure(updateTask.source, taskResult.getFailure());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void notifySuccessfulTasksOnUnchangedClusterState() {
|
||||
ClusterState clusterState = newClusterServiceState.getClusterState();
|
||||
nonFailedTasks.forEach(task -> {
|
||||
if (task.listener instanceof AckedClusterStateTaskListener) {
|
||||
//no need to wait for ack if nothing changed, the update can be counted as acknowledged
|
||||
((AckedClusterStateTaskListener) task.listener).onAllNodesAcked(null);
|
||||
}
|
||||
task.listener.clusterStateProcessed(task.source, clusterState, clusterState);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// this one is overridden in tests so we can control time
|
||||
protected long currentTimeInNanos() {return System.nanoTime();}
|
||||
protected long currentTimeInNanos() {
|
||||
return System.nanoTime();
|
||||
}
|
||||
|
||||
private static SafeClusterStateTaskListener safe(ClusterStateTaskListener listener, Logger logger) {
|
||||
if (listener instanceof AckedClusterStateTaskListener) {
|
||||
|
@ -943,14 +1033,15 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|||
}
|
||||
}
|
||||
|
||||
class UpdateTask<T> extends SourcePrioritizedRunnable {
|
||||
class UpdateTask extends SourcePrioritizedRunnable {
|
||||
|
||||
public final T task;
|
||||
public final Object task;
|
||||
public final ClusterStateTaskListener listener;
|
||||
private final ClusterStateTaskExecutor<T> executor;
|
||||
private final ClusterStateTaskExecutor<Object> executor;
|
||||
public final AtomicBoolean processed = new AtomicBoolean();
|
||||
|
||||
UpdateTask(String source, T task, Priority priority, ClusterStateTaskExecutor<T> executor, ClusterStateTaskListener listener) {
|
||||
UpdateTask(String source, Object task, Priority priority, ClusterStateTaskExecutor<Object> executor,
|
||||
ClusterStateTaskListener listener) {
|
||||
super(priority, source);
|
||||
this.task = task;
|
||||
this.executor = executor;
|
||||
|
@ -962,7 +1053,31 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|||
// if this task is already processed, the executor shouldn't execute other tasks (that arrived later),
|
||||
// to give other executors a chance to execute their tasks.
|
||||
if (processed.get() == false) {
|
||||
runTasksForExecutor(executor);
|
||||
final ArrayList<UpdateTask> toExecute = new ArrayList<>();
|
||||
final Map<String, ArrayList<Object>> processTasksBySource = new HashMap<>();
|
||||
synchronized (updateTasksPerExecutor) {
|
||||
LinkedHashSet<UpdateTask> pending = updateTasksPerExecutor.remove(executor);
|
||||
if (pending != null) {
|
||||
for (UpdateTask task : pending) {
|
||||
if (task.processed.getAndSet(true) == false) {
|
||||
logger.trace("will process {}", task);
|
||||
toExecute.add(task);
|
||||
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task.task);
|
||||
} else {
|
||||
logger.trace("skipping {}, already processed", task);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (toExecute.isEmpty() == false) {
|
||||
final String tasksSummary = processTasksBySource.entrySet().stream().map(entry -> {
|
||||
String tasks = executor.describeTasks(entry.getValue());
|
||||
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
|
||||
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
|
||||
|
||||
runTasks(new TaskInputs(executor, toExecute, tasksSummary));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1130,12 +1245,7 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|||
countDown = Math.max(1, countDown);
|
||||
logger.trace("expecting {} acknowledgements for cluster_state update (version: {})", countDown, clusterStateVersion);
|
||||
this.countDown = new CountDown(countDown);
|
||||
this.ackTimeoutCallback = threadPool.schedule(ackedTaskListener.ackTimeout(), ThreadPool.Names.GENERIC, new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
onTimeout();
|
||||
}
|
||||
});
|
||||
this.ackTimeoutCallback = threadPool.schedule(ackedTaskListener.ackTimeout(), ThreadPool.Names.GENERIC, () -> onTimeout());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue