diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml
index 6e62b8ec346..2ed1b68fb8a 100644
--- a/buildSrc/src/main/resources/checkstyle_suppressions.xml
+++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml
@@ -1047,7 +1047,7 @@
-
+
diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java
index 44c604dc8b8..8924f81a86c 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java
@@ -194,14 +194,14 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
}
private boolean validateRequest(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor) {
- ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.numberOfPendingTasks(),
- gatewayAllocator.getNumberOfInFlightFetch(), clusterService.getMaxTaskWaitTime());
+ ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.getMasterService().numberOfPendingTasks(),
+ gatewayAllocator.getNumberOfInFlightFetch(), clusterService.getMasterService().getMaxTaskWaitTime());
return prepareResponse(request, response, clusterState, waitFor);
}
private ClusterHealthResponse getResponse(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor, boolean timedOut) {
- ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.numberOfPendingTasks(),
- gatewayAllocator.getNumberOfInFlightFetch(), clusterService.getMaxTaskWaitTime());
+ ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.getMasterService().numberOfPendingTasks(),
+ gatewayAllocator.getNumberOfInFlightFetch(), clusterService.getMasterService().getMaxTaskWaitTime());
boolean valid = prepareResponse(request, response, clusterState, waitFor);
assert valid || timedOut;
// we check for a timeout here since this method might be called from the wait_for_events
diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/tasks/TransportPendingClusterTasksAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/tasks/TransportPendingClusterTasksAction.java
index c15758de3cb..cd58bb8d6d4 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/cluster/tasks/TransportPendingClusterTasksAction.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/tasks/TransportPendingClusterTasksAction.java
@@ -65,7 +65,7 @@ public class TransportPendingClusterTasksAction extends TransportMasterNodeReadA
@Override
protected void masterOperation(PendingClusterTasksRequest request, ClusterState state, ActionListener listener) {
logger.trace("fetching pending tasks from cluster service");
- final List pendingTasks = clusterService.pendingTasks();
+ final List pendingTasks = clusterService.getMasterService().pendingTasks();
logger.trace("done fetching pending tasks from cluster service");
listener.onResponse(new PendingClusterTasksResponse(pendingTasks));
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterState.java b/core/src/main/java/org/elasticsearch/cluster/ClusterState.java
index 5ef06f178a8..89b3727c9f4 100644
--- a/core/src/main/java/org/elasticsearch/cluster/ClusterState.java
+++ b/core/src/main/java/org/elasticsearch/cluster/ClusterState.java
@@ -22,7 +22,6 @@ package org.elasticsearch.cluster;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
-import org.elasticsearch.Version;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java b/core/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java
index e0c35a12c22..d191bc01756 100644
--- a/core/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java
+++ b/core/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java
@@ -21,6 +21,7 @@ package org.elasticsearch.cluster;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
@@ -42,7 +43,7 @@ public class ClusterStateObserver {
private final Predicate MATCH_ALL_CHANGES_PREDICATE = state -> true;
- private final ClusterService clusterService;
+ private final ClusterApplierService clusterApplierService;
private final ThreadContext contextHolder;
volatile TimeValue timeOutValue;
@@ -74,7 +75,12 @@ public class ClusterStateObserver {
*/
public ClusterStateObserver(ClusterState initialState, ClusterService clusterService, @Nullable TimeValue timeout, Logger logger,
ThreadContext contextHolder) {
- this.clusterService = clusterService;
+ this(initialState, clusterService.getClusterApplierService(), timeout, logger, contextHolder);
+ }
+
+ public ClusterStateObserver(ClusterState initialState, ClusterApplierService clusterApplierService, @Nullable TimeValue timeout,
+ Logger logger, ThreadContext contextHolder) {
+ this.clusterApplierService = clusterApplierService;
this.lastObservedState = new AtomicReference<>(new StoredState(initialState));
this.timeOutValue = timeout;
if (timeOutValue != null) {
@@ -89,7 +95,7 @@ public class ClusterStateObserver {
if (observingContext.get() != null) {
throw new ElasticsearchException("cannot set current cluster state while waiting for a cluster state change");
}
- ClusterState clusterState = clusterService.state();
+ ClusterState clusterState = clusterApplierService.state();
lastObservedState.set(new StoredState(clusterState));
return clusterState;
}
@@ -135,7 +141,7 @@ public class ClusterStateObserver {
logger.trace("observer timed out. notifying listener. timeout setting [{}], time since start [{}]", timeOutValue, new TimeValue(timeSinceStartMS));
// update to latest, in case people want to retry
timedOut = true;
- lastObservedState.set(new StoredState(clusterService.state()));
+ lastObservedState.set(new StoredState(clusterApplierService.state()));
listener.onTimeout(timeOutValue);
return;
}
@@ -151,7 +157,7 @@ public class ClusterStateObserver {
// sample a new state. This state maybe *older* than the supplied state if we are called from an applier,
// which wants to wait for something else to happen
- ClusterState newState = clusterService.state();
+ ClusterState newState = clusterApplierService.state();
if (lastObservedState.get().isOlderOrDifferentMaster(newState) && statePredicate.test(newState)) {
// good enough, let's go.
logger.trace("observer: sampled state accepted by predicate ({})", newState);
@@ -163,7 +169,7 @@ public class ClusterStateObserver {
if (!observingContext.compareAndSet(null, context)) {
throw new ElasticsearchException("already waiting for a cluster state change");
}
- clusterService.addTimeoutListener(timeoutTimeLeftMS == null ? null : new TimeValue(timeoutTimeLeftMS), clusterStateListener);
+ clusterApplierService.addTimeoutListener(timeoutTimeLeftMS == null ? null : new TimeValue(timeoutTimeLeftMS), clusterStateListener);
}
}
@@ -179,7 +185,7 @@ public class ClusterStateObserver {
final ClusterState state = event.state();
if (context.statePredicate.test(state)) {
if (observingContext.compareAndSet(context, null)) {
- clusterService.removeTimeoutListener(this);
+ clusterApplierService.removeTimeoutListener(this);
logger.trace("observer: accepting cluster state change ({})", state);
lastObservedState.set(new StoredState(state));
context.listener.onNewClusterState(state);
@@ -198,12 +204,12 @@ public class ClusterStateObserver {
// No need to remove listener as it is the responsibility of the thread that set observingContext to null
return;
}
- ClusterState newState = clusterService.state();
+ ClusterState newState = clusterApplierService.state();
if (lastObservedState.get().isOlderOrDifferentMaster(newState) && context.statePredicate.test(newState)) {
// double check we're still listening
if (observingContext.compareAndSet(context, null)) {
logger.trace("observer: post adding listener: accepting current cluster state ({})", newState);
- clusterService.removeTimeoutListener(this);
+ clusterApplierService.removeTimeoutListener(this);
lastObservedState.set(new StoredState(newState));
context.listener.onNewClusterState(newState);
} else {
@@ -220,7 +226,7 @@ public class ClusterStateObserver {
if (context != null) {
logger.trace("observer: cluster service closed. notifying listener.");
- clusterService.removeTimeoutListener(this);
+ clusterApplierService.removeTimeoutListener(this);
context.listener.onClusterServiceClose();
}
}
@@ -229,11 +235,11 @@ public class ClusterStateObserver {
public void onTimeout(TimeValue timeout) {
ObservingContext context = observingContext.getAndSet(null);
if (context != null) {
- clusterService.removeTimeoutListener(this);
+ clusterApplierService.removeTimeoutListener(this);
long timeSinceStartMS = TimeValue.nsecToMSec(System.nanoTime() - startTimeNS);
logger.trace("observer: timeout notification from cluster service. timeout setting [{}], time since start [{}]", timeOutValue, new TimeValue(timeSinceStartMS));
// update to latest, in case people want to retry
- lastObservedState.set(new StoredState(clusterService.state()));
+ lastObservedState.set(new StoredState(clusterApplierService.state()));
timedOut = true;
context.listener.onTimeout(timeOutValue);
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java b/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java
index 3693447cfb6..8e50fddb9b1 100644
--- a/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java
+++ b/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java
@@ -71,21 +71,18 @@ public interface ClusterStateTaskExecutor {
* @param the type of the cluster state update task
*/
class ClusterTasksResult {
- public final boolean noMaster;
@Nullable
public final ClusterState resultingState;
public final Map executionResults;
/**
* Construct an execution result instance with a correspondence between the tasks and their execution result
- * @param noMaster whether this node steps down as master or has lost connection to the master
* @param resultingState the resulting cluster state
* @param executionResults the correspondence between tasks and their outcome
*/
- ClusterTasksResult(boolean noMaster, ClusterState resultingState, Map executionResults) {
+ ClusterTasksResult(ClusterState resultingState, Map executionResults) {
this.resultingState = resultingState;
this.executionResults = executionResults;
- this.noMaster = noMaster;
}
public static Builder builder() {
@@ -124,11 +121,11 @@ public interface ClusterStateTaskExecutor {
}
public ClusterTasksResult build(ClusterState resultingState) {
- return new ClusterTasksResult<>(false, resultingState, executionResults);
+ return new ClusterTasksResult<>(resultingState, executionResults);
}
ClusterTasksResult build(ClusterTasksResult result, ClusterState previousState) {
- return new ClusterTasksResult<>(result.noMaster, result.resultingState == null ? previousState : result.resultingState,
+ return new ClusterTasksResult<>(result.resultingState == null ? previousState : result.resultingState,
executionResults);
}
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskListener.java b/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskListener.java
index 757c8b0c82e..b2ab13fac2a 100644
--- a/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskListener.java
+++ b/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskListener.java
@@ -18,6 +18,8 @@
*/
package org.elasticsearch.cluster;
+import org.elasticsearch.cluster.service.MasterService;
+
import java.util.List;
public interface ClusterStateTaskListener {
@@ -28,7 +30,8 @@ public interface ClusterStateTaskListener {
void onFailure(String source, Exception e);
/**
- * called when the task was rejected because the local node is no longer master
+ * called when the task was rejected because the local node is no longer master.
+ * Used only for tasks submitted to {@link MasterService}.
*/
default void onNoLongerMaster(String source) {
onFailure(source, new NotMasterException("no longer master. source: [" + source + "]"));
diff --git a/core/src/main/java/org/elasticsearch/cluster/LocalClusterUpdateTask.java b/core/src/main/java/org/elasticsearch/cluster/LocalClusterUpdateTask.java
index 9692ff8d4e1..89d85fa2e14 100644
--- a/core/src/main/java/org/elasticsearch/cluster/LocalClusterUpdateTask.java
+++ b/core/src/main/java/org/elasticsearch/cluster/LocalClusterUpdateTask.java
@@ -50,25 +50,11 @@ public abstract class LocalClusterUpdateTask implements ClusterStateTaskConfig,
return ClusterTasksResult.builder().successes(tasks).build(result, currentState);
}
- /**
- * node stepped down as master or has lost connection to the master
- */
- public static ClusterTasksResult noMaster() {
- return new ClusterTasksResult(true, null, null);
- }
-
/**
* no changes were made to the cluster state. Useful to execute a runnable on the cluster state applier thread
*/
public static ClusterTasksResult unchanged() {
- return new ClusterTasksResult(false, null, null);
- }
-
- /**
- * locally apply cluster state received from a master
- */
- public static ClusterTasksResult newState(ClusterState clusterState) {
- return new ClusterTasksResult(false, clusterState, null);
+ return new ClusterTasksResult<>(null, null);
}
@Override
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/DelayedAllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/DelayedAllocationService.java
index 4522dfcf98f..fd7f8f6811f 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/DelayedAllocationService.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/DelayedAllocationService.java
@@ -178,8 +178,8 @@ public class DelayedAllocationService extends AbstractLifecycleComponent impleme
/**
* Figure out if an existing scheduled reroute is good enough or whether we need to cancel and reschedule.
*/
- private void scheduleIfNeeded(long currentNanoTime, ClusterState state) {
- assertClusterStateThread();
+ private synchronized void scheduleIfNeeded(long currentNanoTime, ClusterState state) {
+ assertClusterOrMasterStateThread();
long nextDelayNanos = UnassignedInfo.findNextDelayedAllocation(currentNanoTime, state);
if (nextDelayNanos < 0) {
logger.trace("no need to schedule reroute - no delayed unassigned shards");
@@ -214,7 +214,7 @@ public class DelayedAllocationService extends AbstractLifecycleComponent impleme
}
// protected so that it can be overridden (and disabled) by unit tests
- protected void assertClusterStateThread() {
- ClusterService.assertClusterStateThread();
+ protected void assertClusterOrMasterStateThread() {
+ assert ClusterService.assertClusterOrMasterStateThread();
}
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/service/ClusterApplier.java b/core/src/main/java/org/elasticsearch/cluster/service/ClusterApplier.java
new file mode 100644
index 00000000000..ef3135af24d
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/cluster/service/ClusterApplier.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster.service;
+
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateTaskListener;
+
+import java.util.function.Supplier;
+
+@FunctionalInterface
+public interface ClusterApplier {
+ /**
+ * Method to invoke when a new cluster state is available to be applied
+ *
+ * @param source information where the cluster state came from
+ * @param clusterStateSupplier the cluster state supplier which provides the latest cluster state to apply
+ * @param listener callback that is invoked after cluster state is applied
+ */
+ void onNewClusterState(String source, Supplier clusterStateSupplier, ClusterStateTaskListener listener);
+}
diff --git a/core/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/core/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java
new file mode 100644
index 00000000000..540881718fc
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java
@@ -0,0 +1,655 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster.service;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.apache.logging.log4j.util.Supplier;
+import org.elasticsearch.cluster.ClusterChangedEvent;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateApplier;
+import org.elasticsearch.cluster.ClusterStateListener;
+import org.elasticsearch.cluster.ClusterStateObserver;
+import org.elasticsearch.cluster.ClusterStateTaskConfig;
+import org.elasticsearch.cluster.ClusterStateTaskListener;
+import org.elasticsearch.cluster.LocalNodeMasterListener;
+import org.elasticsearch.cluster.NodeConnectionsService;
+import org.elasticsearch.cluster.TimeoutClusterStateListener;
+import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.component.AbstractLifecycleComponent;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+import org.elasticsearch.common.util.concurrent.FutureUtils;
+import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
+import org.elasticsearch.common.util.iterable.Iterables;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static org.elasticsearch.cluster.service.ClusterService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING;
+import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
+
+public class ClusterApplierService extends AbstractLifecycleComponent implements ClusterApplier {
+
+ public static final String CLUSTER_UPDATE_THREAD_NAME = "clusterApplierService#updateTask";
+
+ private final ClusterSettings clusterSettings;
+ protected final ThreadPool threadPool;
+
+ private volatile TimeValue slowTaskLoggingThreshold;
+
+ private volatile PrioritizedEsThreadPoolExecutor threadPoolExecutor;
+
+ /**
+ * Those 3 state listeners are changing infrequently - CopyOnWriteArrayList is just fine
+ */
+ private final Collection highPriorityStateAppliers = new CopyOnWriteArrayList<>();
+ private final Collection normalPriorityStateAppliers = new CopyOnWriteArrayList<>();
+ private final Collection lowPriorityStateAppliers = new CopyOnWriteArrayList<>();
+ private final Iterable clusterStateAppliers = Iterables.concat(highPriorityStateAppliers,
+ normalPriorityStateAppliers, lowPriorityStateAppliers);
+
+ private final Collection clusterStateListeners = new CopyOnWriteArrayList<>();
+ private final Collection timeoutClusterStateListeners =
+ Collections.newSetFromMap(new ConcurrentHashMap());
+
+ private final LocalNodeMasterListeners localNodeMasterListeners;
+
+ private final Queue onGoingTimeouts = ConcurrentCollections.newQueue();
+
+ private final AtomicReference state; // last applied state
+
+ private NodeConnectionsService nodeConnectionsService;
+
+ public ClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
+ super(settings);
+ this.clusterSettings = clusterSettings;
+ this.threadPool = threadPool;
+ this.state = new AtomicReference<>();
+ this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
+ this.localNodeMasterListeners = new LocalNodeMasterListeners(threadPool);
+ }
+
+ public void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
+ this.slowTaskLoggingThreshold = slowTaskLoggingThreshold;
+ }
+
+ public synchronized void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService) {
+ assert this.nodeConnectionsService == null : "nodeConnectionsService is already set";
+ this.nodeConnectionsService = nodeConnectionsService;
+ }
+
+ public void setInitialState(ClusterState initialState) {
+ if (lifecycle.started()) {
+ throw new IllegalStateException("can't set initial state when started");
+ }
+ assert state.get() == null : "state is already set";
+ state.set(initialState);
+ }
+
+ @Override
+ protected synchronized void doStart() {
+ Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting");
+ Objects.requireNonNull(state.get(), "please set initial state before starting");
+ addListener(localNodeMasterListeners);
+ threadPoolExecutor = EsExecutors.newSinglePrioritizing(CLUSTER_UPDATE_THREAD_NAME,
+ daemonThreadFactory(settings, CLUSTER_UPDATE_THREAD_NAME), threadPool.getThreadContext(), threadPool.scheduler());
+ }
+
+ class UpdateTask extends SourcePrioritizedRunnable implements Function {
+ final ClusterStateTaskListener listener;
+ final Function updateFunction;
+
+ UpdateTask(Priority priority, String source, ClusterStateTaskListener listener,
+ Function updateFunction) {
+ super(priority, source);
+ this.listener = listener;
+ this.updateFunction = updateFunction;
+ }
+
+ @Override
+ public ClusterState apply(ClusterState clusterState) {
+ return updateFunction.apply(clusterState);
+ }
+
+ @Override
+ public void run() {
+ runTask(this);
+ }
+ }
+
+ @Override
+ protected synchronized void doStop() {
+ for (NotifyTimeout onGoingTimeout : onGoingTimeouts) {
+ onGoingTimeout.cancel();
+ try {
+ onGoingTimeout.cancel();
+ onGoingTimeout.listener.onClose();
+ } catch (Exception ex) {
+ logger.debug("failed to notify listeners on shutdown", ex);
+ }
+ }
+ ThreadPool.terminate(threadPoolExecutor, 10, TimeUnit.SECONDS);
+ // close timeout listeners that did not have an ongoing timeout
+ timeoutClusterStateListeners.forEach(TimeoutClusterStateListener::onClose);
+ removeListener(localNodeMasterListeners);
+ }
+
+ @Override
+ protected synchronized void doClose() {
+ }
+
+ /**
+ * The current cluster state.
+ * Should be renamed to appliedClusterState
+ */
+ public ClusterState state() {
+ assert assertNotCalledFromClusterStateApplier("the applied cluster state is not yet available");
+ ClusterState clusterState = this.state.get();
+ assert clusterState != null : "initial cluster state not set yet";
+ return clusterState;
+ }
+
+ /**
+ * Adds a high priority applier of updated cluster states.
+ */
+ public void addHighPriorityApplier(ClusterStateApplier applier) {
+ highPriorityStateAppliers.add(applier);
+ }
+
+ /**
+ * Adds an applier which will be called after all high priority and normal appliers have been called.
+ */
+ public void addLowPriorityApplier(ClusterStateApplier applier) {
+ lowPriorityStateAppliers.add(applier);
+ }
+
+ /**
+ * Adds a applier of updated cluster states.
+ */
+ public void addStateApplier(ClusterStateApplier applier) {
+ normalPriorityStateAppliers.add(applier);
+ }
+
+ /**
+ * Removes an applier of updated cluster states.
+ */
+ public void removeApplier(ClusterStateApplier applier) {
+ normalPriorityStateAppliers.remove(applier);
+ highPriorityStateAppliers.remove(applier);
+ lowPriorityStateAppliers.remove(applier);
+ }
+
+ /**
+ * Add a listener for updated cluster states
+ */
+ public void addListener(ClusterStateListener listener) {
+ clusterStateListeners.add(listener);
+ }
+
+ /**
+ * Removes a listener for updated cluster states.
+ */
+ public void removeListener(ClusterStateListener listener) {
+ clusterStateListeners.remove(listener);
+ }
+
+ /**
+ * Removes a timeout listener for updated cluster states.
+ */
+ public void removeTimeoutListener(TimeoutClusterStateListener listener) {
+ timeoutClusterStateListeners.remove(listener);
+ for (Iterator it = onGoingTimeouts.iterator(); it.hasNext(); ) {
+ NotifyTimeout timeout = it.next();
+ if (timeout.listener.equals(listener)) {
+ timeout.cancel();
+ it.remove();
+ }
+ }
+ }
+
+ /**
+ * Add a listener for on/off local node master events
+ */
+ public void addLocalNodeMasterListener(LocalNodeMasterListener listener) {
+ localNodeMasterListeners.add(listener);
+ }
+
+ /**
+ * Remove the given listener for on/off local master events
+ */
+ public void removeLocalNodeMasterListener(LocalNodeMasterListener listener) {
+ localNodeMasterListeners.remove(listener);
+ }
+
+ /**
+ * Adds a cluster state listener that is expected to be removed during a short period of time.
+ * If provided, the listener will be notified once a specific time has elapsed.
+ *
+ * NOTE: the listener is not removed on timeout. This is the responsibility of the caller.
+ */
+ public void addTimeoutListener(@Nullable final TimeValue timeout, final TimeoutClusterStateListener listener) {
+ if (lifecycle.stoppedOrClosed()) {
+ listener.onClose();
+ return;
+ }
+ // call the post added notification on the same event thread
+ try {
+ threadPoolExecutor.execute(new SourcePrioritizedRunnable(Priority.HIGH, "_add_listener_") {
+ @Override
+ public void run() {
+ if (timeout != null) {
+ NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout);
+ notifyTimeout.future = threadPool.schedule(timeout, ThreadPool.Names.GENERIC, notifyTimeout);
+ onGoingTimeouts.add(notifyTimeout);
+ }
+ timeoutClusterStateListeners.add(listener);
+ listener.postAdded();
+ }
+ });
+ } catch (EsRejectedExecutionException e) {
+ if (lifecycle.stoppedOrClosed()) {
+ listener.onClose();
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ public void runOnApplierThread(final String source, Consumer clusterStateConsumer,
+ final ClusterStateTaskListener listener, Priority priority) {
+ submitStateUpdateTask(source, ClusterStateTaskConfig.build(priority),
+ (clusterState) -> {
+ clusterStateConsumer.accept(clusterState);
+ return clusterState;
+ },
+ listener);
+ }
+
+ public void runOnApplierThread(final String source, Consumer clusterStateConsumer,
+ final ClusterStateTaskListener listener) {
+ runOnApplierThread(source, clusterStateConsumer, listener, Priority.HIGH);
+ }
+
+ @Override
+ public void onNewClusterState(final String source, final java.util.function.Supplier clusterStateSupplier,
+ final ClusterStateTaskListener listener) {
+ Function applyFunction = currentState -> {
+ ClusterState nextState = clusterStateSupplier.get();
+ if (nextState != null) {
+ return nextState;
+ } else {
+ return currentState;
+ }
+ };
+ submitStateUpdateTask(source, ClusterStateTaskConfig.build(Priority.HIGH), applyFunction, listener);
+ }
+
+ private void submitStateUpdateTask(final String source, final ClusterStateTaskConfig config,
+ final Function executor,
+ final ClusterStateTaskListener listener) {
+ if (!lifecycle.started()) {
+ return;
+ }
+ try {
+ UpdateTask updateTask = new UpdateTask(config.priority(), source, new SafeClusterStateTaskListener(listener, logger), executor);
+ if (config.timeout() != null) {
+ threadPoolExecutor.execute(updateTask, config.timeout(),
+ () -> threadPool.generic().execute(
+ () -> listener.onFailure(source, new ProcessClusterEventTimeoutException(config.timeout(), source))));
+ } else {
+ threadPoolExecutor.execute(updateTask);
+ }
+ } catch (EsRejectedExecutionException e) {
+ // ignore cases where we are shutting down..., there is really nothing interesting
+ // to be done here...
+ if (!lifecycle.stoppedOrClosed()) {
+ throw e;
+ }
+ }
+ }
+
+ /** asserts that the current thread is the cluster state update thread */
+ public static boolean assertClusterStateUpdateThread() {
+ assert Thread.currentThread().getName().contains(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME) :
+ "not called from the cluster state update thread";
+ return true;
+ }
+
+ /** asserts that the current thread is NOT the cluster state update thread */
+ public static boolean assertNotClusterStateUpdateThread(String reason) {
+ assert Thread.currentThread().getName().contains(CLUSTER_UPDATE_THREAD_NAME) == false :
+ "Expected current thread [" + Thread.currentThread() + "] to not be the cluster state update thread. Reason: [" + reason + "]";
+ return true;
+ }
+
+ /** asserts that the current stack trace does NOT involve a cluster state applier */
+ private static boolean assertNotCalledFromClusterStateApplier(String reason) {
+ if (Thread.currentThread().getName().contains(CLUSTER_UPDATE_THREAD_NAME)) {
+ for (StackTraceElement element : Thread.currentThread().getStackTrace()) {
+ final String className = element.getClassName();
+ final String methodName = element.getMethodName();
+ if (className.equals(ClusterStateObserver.class.getName())) {
+ // people may start an observer from an applier
+ return true;
+ } else if (className.equals(ClusterApplierService.class.getName())
+ && methodName.equals("callClusterStateAppliers")) {
+ throw new AssertionError("should not be called by a cluster state applier. reason [" + reason + "]");
+ }
+ }
+ }
+ return true;
+ }
+
+ protected void runTask(UpdateTask task) {
+ if (!lifecycle.started()) {
+ logger.debug("processing [{}]: ignoring, cluster applier service not started", task.source);
+ return;
+ }
+
+ logger.debug("processing [{}]: execute", task.source);
+ final ClusterState previousClusterState = state.get();
+
+ long startTimeNS = currentTimeInNanos();
+ final ClusterState newClusterState;
+ try {
+ newClusterState = task.apply(previousClusterState);
+ } catch (Exception e) {
+ TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
+ if (logger.isTraceEnabled()) {
+ logger.trace(
+ (Supplier>) () -> new ParameterizedMessage(
+ "failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}{}{}",
+ executionTime,
+ previousClusterState.version(),
+ task.source,
+ previousClusterState.nodes(),
+ previousClusterState.routingTable(),
+ previousClusterState.getRoutingNodes()),
+ e);
+ }
+ warnAboutSlowTaskIfNeeded(executionTime, task.source);
+ task.listener.onFailure(task.source, e);
+ return;
+ }
+
+ if (previousClusterState == newClusterState) {
+ task.listener.clusterStateProcessed(task.source, newClusterState, newClusterState);
+ TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
+ logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime);
+ warnAboutSlowTaskIfNeeded(executionTime, task.source);
+ } else {
+ if (logger.isTraceEnabled()) {
+ logger.trace("cluster state updated, source [{}]\n{}", task.source, newClusterState);
+ } else if (logger.isDebugEnabled()) {
+ logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), task.source);
+ }
+ try {
+ applyChanges(task, previousClusterState, newClusterState);
+ TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
+ logger.debug("processing [{}]: took [{}] done applying updated cluster state (version: {}, uuid: {})", task.source,
+ executionTime, newClusterState.version(),
+ newClusterState.stateUUID());
+ warnAboutSlowTaskIfNeeded(executionTime, task.source);
+ } catch (Exception e) {
+ TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
+ final long version = newClusterState.version();
+ final String stateUUID = newClusterState.stateUUID();
+ final String fullState = newClusterState.toString();
+ logger.warn(
+ (Supplier>) () -> new ParameterizedMessage(
+ "failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}",
+ executionTime,
+ version,
+ stateUUID,
+ task.source,
+ fullState),
+ e);
+ // TODO: do we want to call updateTask.onFailure here?
+ }
+ }
+ }
+
+ private void applyChanges(UpdateTask task, ClusterState previousClusterState, ClusterState newClusterState) {
+ ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(task.source, newClusterState, previousClusterState);
+ // new cluster state, notify all listeners
+ final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
+ if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
+ String summary = nodesDelta.shortSummary();
+ if (summary.length() > 0) {
+ logger.info("{}, reason: {}", summary, task.source);
+ }
+ }
+
+ nodeConnectionsService.connectToNodes(newClusterState.nodes());
+
+ logger.debug("applying cluster state version {}", newClusterState.version());
+ try {
+ // nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
+ if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) {
+ final Settings incomingSettings = clusterChangedEvent.state().metaData().settings();
+ clusterSettings.applySettings(incomingSettings);
+ }
+ } catch (Exception ex) {
+ logger.warn("failed to apply cluster settings", ex);
+ }
+
+ logger.debug("apply cluster state with version {}", newClusterState.version());
+ callClusterStateAppliers(clusterChangedEvent);
+
+ nodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes());
+
+ logger.debug("set locally applied cluster state to version {}", newClusterState.version());
+ state.set(newClusterState);
+
+ callClusterStateListeners(clusterChangedEvent);
+
+ task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState);
+ }
+
+ private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent) {
+ clusterStateAppliers.forEach(applier -> {
+ try {
+ logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());
+ applier.applyClusterState(clusterChangedEvent);
+ } catch (Exception ex) {
+ logger.warn("failed to notify ClusterStateApplier", ex);
+ }
+ });
+ }
+
+ private void callClusterStateListeners(ClusterChangedEvent clusterChangedEvent) {
+ Stream.concat(clusterStateListeners.stream(), timeoutClusterStateListeners.stream()).forEach(listener -> {
+ try {
+ logger.trace("calling [{}] with change to version [{}]", listener, clusterChangedEvent.state().version());
+ listener.clusterChanged(clusterChangedEvent);
+ } catch (Exception ex) {
+ logger.warn("failed to notify ClusterStateListener", ex);
+ }
+ });
+ }
+
+ private static class SafeClusterStateTaskListener implements ClusterStateTaskListener {
+ private final ClusterStateTaskListener listener;
+ private final Logger logger;
+
+ SafeClusterStateTaskListener(ClusterStateTaskListener listener, Logger logger) {
+ this.listener = listener;
+ this.logger = logger;
+ }
+
+ @Override
+ public void onFailure(String source, Exception e) {
+ try {
+ listener.onFailure(source, e);
+ } catch (Exception inner) {
+ inner.addSuppressed(e);
+ logger.error(
+ (Supplier>) () -> new ParameterizedMessage(
+ "exception thrown by listener notifying of failure from [{}]", source), inner);
+ }
+ }
+
+ @Override
+ public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+ try {
+ listener.clusterStateProcessed(source, oldState, newState);
+ } catch (Exception e) {
+ logger.error(
+ (Supplier>) () -> new ParameterizedMessage(
+ "exception thrown by listener while notifying of cluster state processed from [{}], old cluster state:\n" +
+ "{}\nnew cluster state:\n{}",
+ source, oldState, newState),
+ e);
+ }
+ }
+ }
+
+ protected void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source) {
+ if (executionTime.getMillis() > slowTaskLoggingThreshold.getMillis()) {
+ logger.warn("cluster state applier task [{}] took [{}] above the warn threshold of {}", source, executionTime,
+ slowTaskLoggingThreshold);
+ }
+ }
+
+ class NotifyTimeout implements Runnable {
+ final TimeoutClusterStateListener listener;
+ final TimeValue timeout;
+ volatile ScheduledFuture future;
+
+ NotifyTimeout(TimeoutClusterStateListener listener, TimeValue timeout) {
+ this.listener = listener;
+ this.timeout = timeout;
+ }
+
+ public void cancel() {
+ FutureUtils.cancel(future);
+ }
+
+ @Override
+ public void run() {
+ if (future != null && future.isCancelled()) {
+ return;
+ }
+ if (lifecycle.stoppedOrClosed()) {
+ listener.onClose();
+ } else {
+ listener.onTimeout(this.timeout);
+ }
+ // note, we rely on the listener to remove itself in case of timeout if needed
+ }
+ }
+
+ private static class LocalNodeMasterListeners implements ClusterStateListener {
+
+ private final List listeners = new CopyOnWriteArrayList<>();
+ private final ThreadPool threadPool;
+ private volatile boolean master = false;
+
+ private LocalNodeMasterListeners(ThreadPool threadPool) {
+ this.threadPool = threadPool;
+ }
+
+ @Override
+ public void clusterChanged(ClusterChangedEvent event) {
+ if (!master && event.localNodeMaster()) {
+ master = true;
+ for (LocalNodeMasterListener listener : listeners) {
+ java.util.concurrent.Executor executor = threadPool.executor(listener.executorName());
+ executor.execute(new OnMasterRunnable(listener));
+ }
+ return;
+ }
+
+ if (master && !event.localNodeMaster()) {
+ master = false;
+ for (LocalNodeMasterListener listener : listeners) {
+ java.util.concurrent.Executor executor = threadPool.executor(listener.executorName());
+ executor.execute(new OffMasterRunnable(listener));
+ }
+ }
+ }
+
+ private void add(LocalNodeMasterListener listener) {
+ listeners.add(listener);
+ }
+
+ private void remove(LocalNodeMasterListener listener) {
+ listeners.remove(listener);
+ }
+
+ private void clear() {
+ listeners.clear();
+ }
+ }
+
+ private static class OnMasterRunnable implements Runnable {
+
+ private final LocalNodeMasterListener listener;
+
+ private OnMasterRunnable(LocalNodeMasterListener listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public void run() {
+ listener.onMaster();
+ }
+ }
+
+ private static class OffMasterRunnable implements Runnable {
+
+ private final LocalNodeMasterListener listener;
+
+ private OffMasterRunnable(LocalNodeMasterListener listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public void run() {
+ listener.offMaster();
+ }
+ }
+
+ // this one is overridden in tests so we can control time
+ protected long currentTimeInNanos() {
+ return System.nanoTime();
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java
index dd7d7bcbd60..18445e62c7e 100644
--- a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java
+++ b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java
@@ -19,264 +19,82 @@
package org.elasticsearch.cluster.service;
-import org.apache.logging.log4j.Logger;
-import org.apache.logging.log4j.message.ParameterizedMessage;
-import org.apache.logging.log4j.util.Supplier;
-import org.elasticsearch.cluster.AckedClusterStateTaskListener;
-import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterState.Builder;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.ClusterStateListener;
-import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
-import org.elasticsearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.TimeoutClusterStateListener;
-import org.elasticsearch.cluster.block.ClusterBlock;
-import org.elasticsearch.cluster.block.ClusterBlocks;
-import org.elasticsearch.cluster.metadata.MetaData;
-import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.OperationRouting;
-import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.Nullable;
-import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
-import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
-import org.elasticsearch.common.util.concurrent.CountDown;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
-import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
-import org.elasticsearch.common.util.concurrent.FutureUtils;
-import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
-import org.elasticsearch.common.util.iterable.Iterables;
-import org.elasticsearch.discovery.Discovery;
-import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.threadpool.ThreadPool;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
import java.util.Map;
-import java.util.Objects;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiConsumer;
-import java.util.function.UnaryOperator;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
public class ClusterService extends AbstractLifecycleComponent {
+ private final MasterService masterService;
+
+ private final ClusterApplierService clusterApplierService;
+
public static final Setting CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING =
Setting.positiveTimeSetting("cluster.service.slow_task_logging_threshold", TimeValue.timeValueSeconds(30),
Property.Dynamic, Property.NodeScope);
- public static final String UPDATE_THREAD_NAME = "clusterService#updateTask";
- private final ThreadPool threadPool;
private final ClusterName clusterName;
- private final Supplier localNodeSupplier;
-
- private BiConsumer clusterStatePublisher;
private final OperationRouting operationRouting;
private final ClusterSettings clusterSettings;
- private TimeValue slowTaskLoggingThreshold;
-
- private volatile PrioritizedEsThreadPoolExecutor threadPoolExecutor;
- private volatile ClusterServiceTaskBatcher taskBatcher;
-
- /**
- * Those 3 state listeners are changing infrequently - CopyOnWriteArrayList is just fine
- */
- private final Collection highPriorityStateAppliers = new CopyOnWriteArrayList<>();
- private final Collection normalPriorityStateAppliers = new CopyOnWriteArrayList<>();
- private final Collection lowPriorityStateAppliers = new CopyOnWriteArrayList<>();
- private final Iterable clusterStateAppliers = Iterables.concat(highPriorityStateAppliers,
- normalPriorityStateAppliers, lowPriorityStateAppliers);
-
- private final Collection clusterStateListeners = new CopyOnWriteArrayList<>();
- private final Collection timeoutClusterStateListeners =
- Collections.newSetFromMap(new ConcurrentHashMap());
-
- private final LocalNodeMasterListeners localNodeMasterListeners;
-
- private final Queue onGoingTimeouts = ConcurrentCollections.newQueue();
-
- private final AtomicReference state;
-
- private final ClusterBlocks.Builder initialBlocks;
-
- private NodeConnectionsService nodeConnectionsService;
-
- private DiscoverySettings discoverySettings;
-
- public ClusterService(Settings settings,
- ClusterSettings clusterSettings, ThreadPool threadPool, Supplier localNodeSupplier) {
+ public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super(settings);
- this.localNodeSupplier = localNodeSupplier;
+ this.clusterApplierService = new ClusterApplierService(settings, clusterSettings, threadPool);
+ this.masterService = new MasterService(settings, threadPool);
this.operationRouting = new OperationRouting(settings, clusterSettings);
- this.threadPool = threadPool;
this.clusterSettings = clusterSettings;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
- // will be replaced on doStart.
- this.state = new AtomicReference<>(ClusterState.builder(clusterName).build());
-
this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
- this::setSlowTaskLoggingThreshold);
-
- this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
-
- localNodeMasterListeners = new LocalNodeMasterListeners(threadPool);
-
- initialBlocks = ClusterBlocks.builder();
+ this::setSlowTaskLoggingThreshold);
}
private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
- this.slowTaskLoggingThreshold = slowTaskLoggingThreshold;
- }
-
- public synchronized void setClusterStatePublisher(BiConsumer publisher) {
- clusterStatePublisher = publisher;
- }
-
- private void updateState(UnaryOperator updateFunction) {
- this.state.getAndUpdate(updateFunction);
+ masterService.setSlowTaskLoggingThreshold(slowTaskLoggingThreshold);
+ clusterApplierService.setSlowTaskLoggingThreshold(slowTaskLoggingThreshold);
}
public synchronized void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService) {
- assert this.nodeConnectionsService == null : "nodeConnectionsService is already set";
- this.nodeConnectionsService = nodeConnectionsService;
- }
-
- /**
- * Adds an initial block to be set on the first cluster state created.
- */
- public synchronized void addInitialStateBlock(ClusterBlock block) throws IllegalStateException {
- if (lifecycle.started()) {
- throw new IllegalStateException("can't set initial block when started");
- }
- initialBlocks.addGlobalBlock(block);
- }
-
- /**
- * Remove an initial block to be set on the first cluster state created.
- */
- public synchronized void removeInitialStateBlock(ClusterBlock block) throws IllegalStateException {
- removeInitialStateBlock(block.id());
- }
-
- /**
- * Remove an initial block to be set on the first cluster state created.
- */
- public synchronized void removeInitialStateBlock(int blockId) throws IllegalStateException {
- if (lifecycle.started()) {
- throw new IllegalStateException("can't set initial block when started");
- }
- initialBlocks.removeGlobalBlock(blockId);
+ clusterApplierService.setNodeConnectionsService(nodeConnectionsService);
}
@Override
protected synchronized void doStart() {
- Objects.requireNonNull(clusterStatePublisher, "please set a cluster state publisher before starting");
- Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting");
- Objects.requireNonNull(discoverySettings, "please set discovery settings before starting");
- addListener(localNodeMasterListeners);
- DiscoveryNode localNode = localNodeSupplier.get();
- assert localNode != null;
- updateState(state -> {
- assert state.nodes().getLocalNodeId() == null : "local node is already set";
- DiscoveryNodes nodes = DiscoveryNodes.builder(state.nodes()).add(localNode).localNodeId(localNode.getId()).build();
- return ClusterState.builder(state).nodes(nodes).blocks(initialBlocks).build();
- });
- this.threadPoolExecutor = EsExecutors.newSinglePrioritizing(UPDATE_THREAD_NAME,
- daemonThreadFactory(settings, UPDATE_THREAD_NAME), threadPool.getThreadContext(), threadPool.scheduler());
- this.taskBatcher = new ClusterServiceTaskBatcher(logger, threadPoolExecutor);
+ clusterApplierService.start();
+ masterService.start();
}
@Override
protected synchronized void doStop() {
- for (NotifyTimeout onGoingTimeout : onGoingTimeouts) {
- onGoingTimeout.cancel();
- try {
- onGoingTimeout.cancel();
- onGoingTimeout.listener.onClose();
- } catch (Exception ex) {
- logger.debug("failed to notify listeners on shutdown", ex);
- }
- }
- ThreadPool.terminate(threadPoolExecutor, 10, TimeUnit.SECONDS);
- // close timeout listeners that did not have an ongoing timeout
- timeoutClusterStateListeners.forEach(TimeoutClusterStateListener::onClose);
- removeListener(localNodeMasterListeners);
+ masterService.stop();
+ clusterApplierService.stop();
}
@Override
protected synchronized void doClose() {
- }
-
- class ClusterServiceTaskBatcher extends TaskBatcher {
-
- ClusterServiceTaskBatcher(Logger logger, PrioritizedEsThreadPoolExecutor threadExecutor) {
- super(logger, threadExecutor);
- }
-
- @Override
- protected void onTimeout(List extends BatchedTask> tasks, TimeValue timeout) {
- threadPool.generic().execute(
- () -> tasks.forEach(
- task -> ((UpdateTask) task).listener.onFailure(task.source,
- new ProcessClusterEventTimeoutException(timeout, task.source))));
- }
-
- @Override
- protected void run(Object batchingKey, List extends BatchedTask> tasks, String tasksSummary) {
- ClusterStateTaskExecutor