Separate cluster update tasks that are published from those that are not (#21912)
This commit factors out the cluster state update tasks that are published (ClusterStateUpdateTask) from those that are not (LocalClusterUpdateTask), serving as a basis for future refactorings to separate the publishing mechanism out of ClusterService.
This commit is contained in:
parent
2511442a92
commit
baea17b53f
|
@ -26,6 +26,7 @@ import org.elasticsearch.action.support.ActionFilters;
|
|||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
|
||||
import org.elasticsearch.cluster.LocalClusterUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateObserver;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
|
@ -85,37 +86,55 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
|
|||
protected void masterOperation(Task task, final ClusterHealthRequest request, final ClusterState unusedState, final ActionListener<ClusterHealthResponse> listener) {
|
||||
if (request.waitForEvents() != null) {
|
||||
final long endTimeMS = TimeValue.nsecToMSec(System.nanoTime()) + request.timeout().millis();
|
||||
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", new ClusterStateUpdateTask(request.waitForEvents()) {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
return currentState;
|
||||
}
|
||||
if (request.local()) {
|
||||
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", new LocalClusterUpdateTask(request.waitForEvents()) {
|
||||
@Override
|
||||
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) {
|
||||
return unchanged();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
final long timeoutInMillis = Math.max(0, endTimeMS - TimeValue.nsecToMSec(System.nanoTime()));
|
||||
final TimeValue newTimeout = TimeValue.timeValueMillis(timeoutInMillis);
|
||||
request.timeout(newTimeout);
|
||||
executeHealth(request, listener);
|
||||
}
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
final long timeoutInMillis = Math.max(0, endTimeMS - TimeValue.nsecToMSec(System.nanoTime()));
|
||||
final TimeValue newTimeout = TimeValue.timeValueMillis(timeoutInMillis);
|
||||
request.timeout(newTimeout);
|
||||
executeHealth(request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNoLongerMaster(String source) {
|
||||
logger.trace("stopped being master while waiting for events with priority [{}]. retrying.", request.waitForEvents());
|
||||
doExecute(task, request, listener);
|
||||
}
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.error((Supplier<?>) () -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", new ClusterStateUpdateTask(request.waitForEvents()) {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
return currentState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.error((Supplier<?>) () -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
final long timeoutInMillis = Math.max(0, endTimeMS - TimeValue.nsecToMSec(System.nanoTime()));
|
||||
final TimeValue newTimeout = TimeValue.timeValueMillis(timeoutInMillis);
|
||||
request.timeout(newTimeout);
|
||||
executeHealth(request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean runOnlyOnMaster() {
|
||||
return !request.local();
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void onNoLongerMaster(String source) {
|
||||
logger.trace("stopped being master while waiting for events with priority [{}]. retrying.", request.waitForEvents());
|
||||
doExecute(task, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.error((Supplier<?>) () -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
} else {
|
||||
executeHealth(request, listener);
|
||||
}
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
*/
|
||||
package org.elasticsearch.cluster;
|
||||
|
||||
import org.elasticsearch.common.Nullable;
|
||||
|
||||
import java.util.IdentityHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -27,10 +29,10 @@ public interface ClusterStateTaskExecutor<T> {
|
|||
* Update the cluster state based on the current state and the given tasks. Return the *same instance* if no state
|
||||
* should be changed.
|
||||
*/
|
||||
BatchResult<T> execute(ClusterState currentState, List<T> tasks) throws Exception;
|
||||
ClusterTasksResult<T> execute(ClusterState currentState, List<T> tasks) throws Exception;
|
||||
|
||||
/**
|
||||
* indicates whether this task should only run if current node is master
|
||||
* indicates whether this executor should only run if the current node is master
|
||||
*/
|
||||
default boolean runOnlyOnMaster() {
|
||||
return true;
|
||||
|
@ -68,18 +70,22 @@ public interface ClusterStateTaskExecutor<T> {
|
|||
* Represents the result of a batched execution of cluster state update tasks
|
||||
* @param <T> the type of the cluster state update task
|
||||
*/
|
||||
class BatchResult<T> {
|
||||
class ClusterTasksResult<T> {
|
||||
public final boolean noMaster;
|
||||
@Nullable
|
||||
public final ClusterState resultingState;
|
||||
public final Map<T, TaskResult> executionResults;
|
||||
|
||||
/**
|
||||
* Construct an execution result instance with a correspondence between the tasks and their execution result
|
||||
* @param noMaster whether this node steps down as master or has lost connection to the master
|
||||
* @param resultingState the resulting cluster state
|
||||
* @param executionResults the correspondence between tasks and their outcome
|
||||
*/
|
||||
BatchResult(ClusterState resultingState, Map<T, TaskResult> executionResults) {
|
||||
ClusterTasksResult(boolean noMaster, ClusterState resultingState, Map<T, TaskResult> executionResults) {
|
||||
this.resultingState = resultingState;
|
||||
this.executionResults = executionResults;
|
||||
this.noMaster = noMaster;
|
||||
}
|
||||
|
||||
public static <T> Builder<T> builder() {
|
||||
|
@ -117,8 +123,13 @@ public interface ClusterStateTaskExecutor<T> {
|
|||
return this;
|
||||
}
|
||||
|
||||
public BatchResult<T> build(ClusterState resultingState) {
|
||||
return new BatchResult<>(resultingState, executionResults);
|
||||
public ClusterTasksResult<T> build(ClusterState resultingState) {
|
||||
return new ClusterTasksResult<>(false, resultingState, executionResults);
|
||||
}
|
||||
|
||||
ClusterTasksResult<T> build(ClusterTasksResult<T> result, ClusterState previousState) {
|
||||
return new ClusterTasksResult<>(result.noMaster, result.resultingState == null ? previousState : result.resultingState,
|
||||
executionResults);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ import java.util.List;
|
|||
/**
|
||||
* A task that can update the cluster state.
|
||||
*/
|
||||
public abstract class ClusterStateUpdateTask implements ClusterStateTaskConfig, ClusterStateTaskExecutor<ClusterStateUpdateTask>, ClusterStateTaskListener {
|
||||
public abstract class ClusterStateUpdateTask implements ClusterStateTaskConfig, ClusterStateTaskExecutor<ClusterStateUpdateTask>, ClusterStateTaskListener {
|
||||
|
||||
private final Priority priority;
|
||||
|
||||
|
@ -41,9 +41,9 @@ public abstract class ClusterStateUpdateTask implements ClusterStateTaskConfig,
|
|||
}
|
||||
|
||||
@Override
|
||||
public final BatchResult<ClusterStateUpdateTask> execute(ClusterState currentState, List<ClusterStateUpdateTask> tasks) throws Exception {
|
||||
public final ClusterTasksResult<ClusterStateUpdateTask> execute(ClusterState currentState, List<ClusterStateUpdateTask> tasks) throws Exception {
|
||||
ClusterState result = execute(currentState);
|
||||
return BatchResult.<ClusterStateUpdateTask>builder().successes(tasks).build(result);
|
||||
return ClusterTasksResult.<ClusterStateUpdateTask>builder().successes(tasks).build(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -75,4 +75,13 @@ public abstract class ClusterStateUpdateTask implements ClusterStateTaskConfig,
|
|||
public Priority priority() {
|
||||
return priority;
|
||||
}
|
||||
|
||||
/**
|
||||
* Marked as final as cluster state update tasks should only run on master.
|
||||
* For local requests, use {@link LocalClusterUpdateTask} instead.
|
||||
*/
|
||||
@Override
|
||||
public final boolean runOnlyOnMaster() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,93 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.cluster;
|
||||
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Used to apply state updates on nodes that are not necessarily master
|
||||
*/
|
||||
public abstract class LocalClusterUpdateTask implements ClusterStateTaskConfig, ClusterStateTaskExecutor<LocalClusterUpdateTask>,
|
||||
ClusterStateTaskListener {
|
||||
|
||||
private final Priority priority;
|
||||
|
||||
public LocalClusterUpdateTask() {
|
||||
this(Priority.NORMAL);
|
||||
}
|
||||
|
||||
public LocalClusterUpdateTask(Priority priority) {
|
||||
this.priority = priority;
|
||||
}
|
||||
|
||||
public abstract ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) throws Exception;
|
||||
|
||||
@Override
|
||||
public final ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState,
|
||||
List<LocalClusterUpdateTask> tasks) throws Exception {
|
||||
assert tasks.size() == 1 && tasks.get(0) == this : "expected one-element task list containing current object but was " + tasks;
|
||||
ClusterTasksResult<LocalClusterUpdateTask> result = execute(currentState);
|
||||
return ClusterTasksResult.<LocalClusterUpdateTask>builder().successes(tasks).build(result, currentState);
|
||||
}
|
||||
|
||||
/**
|
||||
* node stepped down as master or has lost connection to the master
|
||||
*/
|
||||
public static ClusterTasksResult<LocalClusterUpdateTask> noMaster() {
|
||||
return new ClusterTasksResult(true, null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* no changes were made to the cluster state. Useful to execute a runnable on the cluster state applier thread
|
||||
*/
|
||||
public static ClusterTasksResult<LocalClusterUpdateTask> unchanged() {
|
||||
return new ClusterTasksResult(false, null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* locally apply cluster state received from a master
|
||||
*/
|
||||
public static ClusterTasksResult<LocalClusterUpdateTask> newState(ClusterState clusterState) {
|
||||
return new ClusterTasksResult(false, clusterState, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String describeTasks(List<LocalClusterUpdateTask> tasks) {
|
||||
return ""; // one of task, source is enough
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public TimeValue timeout() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Priority priority() {
|
||||
return priority;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean runOnlyOnMaster() {
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -25,10 +25,10 @@ import org.apache.logging.log4j.util.Supplier;
|
|||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateObserver;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskConfig;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskListener;
|
||||
import org.elasticsearch.cluster.MasterNodeChangePredicate;
|
||||
import org.elasticsearch.cluster.NotMasterException;
|
||||
|
@ -260,8 +260,8 @@ public class ShardStateAction extends AbstractComponent {
|
|||
}
|
||||
|
||||
@Override
|
||||
public BatchResult<ShardEntry> execute(ClusterState currentState, List<ShardEntry> tasks) throws Exception {
|
||||
BatchResult.Builder<ShardEntry> batchResultBuilder = BatchResult.builder();
|
||||
public ClusterTasksResult<ShardEntry> execute(ClusterState currentState, List<ShardEntry> tasks) throws Exception {
|
||||
ClusterTasksResult.Builder<ShardEntry> batchResultBuilder = ClusterTasksResult.builder();
|
||||
List<ShardEntry> tasksToBeApplied = new ArrayList<>();
|
||||
List<FailedShard> failedShardsToBeApplied = new ArrayList<>();
|
||||
List<StaleShard> staleShardsToBeApplied = new ArrayList<>();
|
||||
|
@ -394,8 +394,8 @@ public class ShardStateAction extends AbstractComponent {
|
|||
}
|
||||
|
||||
@Override
|
||||
public BatchResult<ShardEntry> execute(ClusterState currentState, List<ShardEntry> tasks) throws Exception {
|
||||
BatchResult.Builder<ShardEntry> builder = BatchResult.builder();
|
||||
public ClusterTasksResult<ShardEntry> execute(ClusterState currentState, List<ShardEntry> tasks) throws Exception {
|
||||
ClusterTasksResult.Builder<ShardEntry> builder = ClusterTasksResult.builder();
|
||||
List<ShardEntry> tasksToBeApplied = new ArrayList<>();
|
||||
List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<>(tasks.size());
|
||||
Set<ShardRouting> seenShardRoutings = new HashSet<>(); // to prevent duplicates
|
||||
|
|
|
@ -26,9 +26,9 @@ import org.apache.lucene.util.IOUtils;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingClusterStateUpdateRequest;
|
||||
import org.elasticsearch.cluster.AckedClusterStateTaskListener;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskConfig;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
|
||||
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
|
@ -64,8 +64,8 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
private final ClusterService clusterService;
|
||||
private final IndicesService indicesService;
|
||||
|
||||
final ClusterStateTaskExecutor<RefreshTask> refreshExecutor = new RefreshTaskExecutor();
|
||||
final ClusterStateTaskExecutor<PutMappingClusterStateUpdateRequest> putMappingExecutor = new PutMappingExecutor();
|
||||
final RefreshTaskExecutor refreshExecutor = new RefreshTaskExecutor();
|
||||
final PutMappingExecutor putMappingExecutor = new PutMappingExecutor();
|
||||
|
||||
|
||||
@Inject
|
||||
|
@ -92,9 +92,9 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
|
||||
class RefreshTaskExecutor implements ClusterStateTaskExecutor<RefreshTask> {
|
||||
@Override
|
||||
public BatchResult<RefreshTask> execute(ClusterState currentState, List<RefreshTask> tasks) throws Exception {
|
||||
public ClusterTasksResult<RefreshTask> execute(ClusterState currentState, List<RefreshTask> tasks) throws Exception {
|
||||
ClusterState newClusterState = executeRefresh(currentState, tasks);
|
||||
return BatchResult.<RefreshTask>builder().successes(tasks).build(newClusterState);
|
||||
return ClusterTasksResult.<RefreshTask>builder().successes(tasks).build(newClusterState);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -211,10 +211,10 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
|
||||
class PutMappingExecutor implements ClusterStateTaskExecutor<PutMappingClusterStateUpdateRequest> {
|
||||
@Override
|
||||
public BatchResult<PutMappingClusterStateUpdateRequest> execute(ClusterState currentState,
|
||||
List<PutMappingClusterStateUpdateRequest> tasks) throws Exception {
|
||||
public ClusterTasksResult<PutMappingClusterStateUpdateRequest> execute(ClusterState currentState,
|
||||
List<PutMappingClusterStateUpdateRequest> tasks) throws Exception {
|
||||
Map<Index, MapperService> indexMapperServices = new HashMap<>();
|
||||
BatchResult.Builder<PutMappingClusterStateUpdateRequest> builder = BatchResult.builder();
|
||||
ClusterTasksResult.Builder<PutMappingClusterStateUpdateRequest> builder = ClusterTasksResult.builder();
|
||||
try {
|
||||
for (PutMappingClusterStateUpdateRequest request : tasks) {
|
||||
try {
|
||||
|
|
|
@ -31,9 +31,8 @@ import org.elasticsearch.cluster.ClusterStateApplier;
|
|||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskConfig;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskExecutor.BatchResult;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskListener;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.LocalNodeMasterListener;
|
||||
import org.elasticsearch.cluster.NodeConnectionsService;
|
||||
import org.elasticsearch.cluster.TimeoutClusterStateListener;
|
||||
|
@ -64,6 +63,7 @@ import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
|
|||
import org.elasticsearch.common.util.concurrent.PrioritizedRunnable;
|
||||
import org.elasticsearch.common.util.iterable.Iterables;
|
||||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -137,6 +137,8 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|||
|
||||
private NodeConnectionsService nodeConnectionsService;
|
||||
|
||||
private DiscoverySettings discoverySettings;
|
||||
|
||||
public ClusterService(Settings settings,
|
||||
ClusterSettings clusterSettings, ThreadPool threadPool) {
|
||||
super(settings);
|
||||
|
@ -214,6 +216,7 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|||
Objects.requireNonNull(clusterStatePublisher, "please set a cluster state publisher before starting");
|
||||
Objects.requireNonNull(state().nodes().getLocalNode(), "please set the local node before starting");
|
||||
Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting");
|
||||
Objects.requireNonNull(discoverySettings, "please set discovery settings before starting");
|
||||
addListener(localNodeMasterListeners);
|
||||
updateState(state -> ClusterState.builder(state).blocks(initialBlocks).build());
|
||||
this.threadPoolExecutor = EsExecutors.newSinglePrioritizing(UPDATE_THREAD_NAME, daemonThreadFactory(settings, UPDATE_THREAD_NAME),
|
||||
|
@ -379,11 +382,11 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|||
* task
|
||||
*
|
||||
*/
|
||||
public void submitStateUpdateTask(final String source, final ClusterStateUpdateTask updateTask) {
|
||||
public <T extends ClusterStateTaskConfig & ClusterStateTaskExecutor<T> & ClusterStateTaskListener> void submitStateUpdateTask(
|
||||
final String source, final T updateTask) {
|
||||
submitStateUpdateTask(source, updateTask, updateTask, updateTask, updateTask);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Submits a cluster state update task; submitted updates will be
|
||||
* batched across the same instance of executor. The exact batching
|
||||
|
@ -573,6 +576,10 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|||
return clusterName;
|
||||
}
|
||||
|
||||
public void setDiscoverySettings(DiscoverySettings discoverySettings) {
|
||||
this.discoverySettings = discoverySettings;
|
||||
}
|
||||
|
||||
abstract static class SourcePrioritizedRunnable extends PrioritizedRunnable {
|
||||
protected final String source;
|
||||
|
||||
|
@ -643,29 +650,28 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|||
}
|
||||
|
||||
public TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState, long startTimeNS) {
|
||||
BatchResult<Object> batchResult = executeTasks(taskInputs, startTimeNS, previousClusterState);
|
||||
ClusterState newClusterState = batchResult.resultingState;
|
||||
ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, startTimeNS, previousClusterState);
|
||||
// extract those that are waiting for results
|
||||
List<UpdateTask> nonFailedTasks = new ArrayList<>();
|
||||
for (UpdateTask updateTask : taskInputs.updateTasks) {
|
||||
assert batchResult.executionResults.containsKey(updateTask.task) : "missing " + updateTask;
|
||||
assert clusterTasksResult.executionResults.containsKey(updateTask.task) : "missing " + updateTask;
|
||||
final ClusterStateTaskExecutor.TaskResult taskResult =
|
||||
batchResult.executionResults.get(updateTask.task);
|
||||
clusterTasksResult.executionResults.get(updateTask.task);
|
||||
if (taskResult.isSuccess()) {
|
||||
nonFailedTasks.add(updateTask);
|
||||
}
|
||||
}
|
||||
newClusterState = patchVersions(previousClusterState, newClusterState);
|
||||
ClusterState newClusterState = patchVersionsAndNoMasterBlocks(previousClusterState, clusterTasksResult);
|
||||
|
||||
return new TaskOutputs(taskInputs, previousClusterState, newClusterState, nonFailedTasks,
|
||||
batchResult.executionResults);
|
||||
clusterTasksResult.executionResults);
|
||||
}
|
||||
|
||||
private BatchResult<Object> executeTasks(TaskInputs taskInputs, long startTimeNS, ClusterState previousClusterState) {
|
||||
BatchResult<Object> batchResult;
|
||||
private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, long startTimeNS, ClusterState previousClusterState) {
|
||||
ClusterTasksResult<Object> clusterTasksResult;
|
||||
try {
|
||||
List<Object> inputs = taskInputs.updateTasks.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList());
|
||||
batchResult = taskInputs.executor.execute(previousClusterState, inputs);
|
||||
clusterTasksResult = taskInputs.executor.execute(previousClusterState, inputs);
|
||||
} catch (Exception e) {
|
||||
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
|
||||
if (logger.isTraceEnabled()) {
|
||||
|
@ -681,42 +687,70 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|||
e);
|
||||
}
|
||||
warnAboutSlowTaskIfNeeded(executionTime, taskInputs.summary);
|
||||
batchResult = BatchResult.builder()
|
||||
clusterTasksResult = ClusterTasksResult.builder()
|
||||
.failures(taskInputs.updateTasks.stream().map(updateTask -> updateTask.task)::iterator, e)
|
||||
.build(previousClusterState);
|
||||
}
|
||||
|
||||
assert batchResult.executionResults != null;
|
||||
assert batchResult.executionResults.size() == taskInputs.updateTasks.size()
|
||||
assert clusterTasksResult.executionResults != null;
|
||||
assert clusterTasksResult.executionResults.size() == taskInputs.updateTasks.size()
|
||||
: String.format(Locale.ROOT, "expected [%d] task result%s but was [%d]", taskInputs.updateTasks.size(),
|
||||
taskInputs.updateTasks.size() == 1 ? "" : "s", batchResult.executionResults.size());
|
||||
taskInputs.updateTasks.size() == 1 ? "" : "s", clusterTasksResult.executionResults.size());
|
||||
boolean assertsEnabled = false;
|
||||
assert (assertsEnabled = true);
|
||||
if (assertsEnabled) {
|
||||
for (UpdateTask updateTask : taskInputs.updateTasks) {
|
||||
assert batchResult.executionResults.containsKey(updateTask.task) :
|
||||
assert clusterTasksResult.executionResults.containsKey(updateTask.task) :
|
||||
"missing task result for " + updateTask;
|
||||
}
|
||||
}
|
||||
|
||||
return batchResult;
|
||||
return clusterTasksResult;
|
||||
}
|
||||
|
||||
private ClusterState patchVersions(ClusterState previousClusterState, ClusterState newClusterState) {
|
||||
if (previousClusterState != newClusterState) {
|
||||
if (newClusterState.nodes().isLocalNodeElectedMaster()) {
|
||||
// only the master controls the version numbers
|
||||
Builder builder = ClusterState.builder(newClusterState).incrementVersion();
|
||||
if (previousClusterState.routingTable() != newClusterState.routingTable()) {
|
||||
builder.routingTable(RoutingTable.builder(newClusterState.routingTable())
|
||||
.version(newClusterState.routingTable().version() + 1).build());
|
||||
}
|
||||
if (previousClusterState.metaData() != newClusterState.metaData()) {
|
||||
builder.metaData(MetaData.builder(newClusterState.metaData()).version(newClusterState.metaData().version() + 1));
|
||||
}
|
||||
newClusterState = builder.build();
|
||||
private ClusterState patchVersionsAndNoMasterBlocks(ClusterState previousClusterState, ClusterTasksResult<Object> executionResult) {
|
||||
ClusterState newClusterState = executionResult.resultingState;
|
||||
|
||||
if (executionResult.noMaster) {
|
||||
assert newClusterState == previousClusterState : "state can only be changed by ClusterService when noMaster = true";
|
||||
if (previousClusterState.nodes().getMasterNodeId() != null) {
|
||||
// remove block if it already exists before adding new one
|
||||
assert previousClusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock().id()) == false :
|
||||
"NO_MASTER_BLOCK should only be added by ClusterService";
|
||||
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(previousClusterState.blocks())
|
||||
.addGlobalBlock(discoverySettings.getNoMasterBlock())
|
||||
.build();
|
||||
|
||||
DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder(previousClusterState.nodes()).masterNodeId(null).build();
|
||||
newClusterState = ClusterState.builder(previousClusterState)
|
||||
.blocks(clusterBlocks)
|
||||
.nodes(discoveryNodes)
|
||||
.build();
|
||||
}
|
||||
} else if (newClusterState.nodes().isLocalNodeElectedMaster() && previousClusterState != newClusterState) {
|
||||
// only the master controls the version numbers
|
||||
Builder builder = ClusterState.builder(newClusterState).incrementVersion();
|
||||
if (previousClusterState.routingTable() != newClusterState.routingTable()) {
|
||||
builder.routingTable(RoutingTable.builder(newClusterState.routingTable())
|
||||
.version(newClusterState.routingTable().version() + 1).build());
|
||||
}
|
||||
if (previousClusterState.metaData() != newClusterState.metaData()) {
|
||||
builder.metaData(MetaData.builder(newClusterState.metaData()).version(newClusterState.metaData().version() + 1));
|
||||
}
|
||||
|
||||
// remove the no master block, if it exists
|
||||
if (newClusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock().id())) {
|
||||
builder.blocks(ClusterBlocks.builder().blocks(newClusterState.blocks())
|
||||
.removeGlobalBlock(discoverySettings.getNoMasterBlock().id()));
|
||||
}
|
||||
|
||||
newClusterState = builder.build();
|
||||
}
|
||||
|
||||
assert newClusterState.nodes().getMasterNodeId() == null ||
|
||||
newClusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock().id()) == false :
|
||||
"cluster state with master node must not have NO_MASTER_BLOCK";
|
||||
|
||||
return newClusterState;
|
||||
}
|
||||
|
||||
|
@ -801,14 +835,16 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|||
|
||||
taskOutputs.processedDifferentClusterState(previousClusterState, newClusterState);
|
||||
|
||||
try {
|
||||
taskOutputs.clusterStatePublished(clusterChangedEvent);
|
||||
} catch (Exception e) {
|
||||
logger.error(
|
||||
(Supplier<?>) () -> new ParameterizedMessage(
|
||||
"exception thrown while notifying executor of new cluster state publication [{}]",
|
||||
taskInputs.summary),
|
||||
e);
|
||||
if (newClusterState.nodes().isLocalNodeElectedMaster()) {
|
||||
try {
|
||||
taskOutputs.clusterStatePublished(clusterChangedEvent);
|
||||
} catch (Exception e) {
|
||||
logger.error(
|
||||
(Supplier<?>) () -> new ParameterizedMessage(
|
||||
"exception thrown while notifying executor of new cluster state publication [{}]",
|
||||
taskInputs.summary),
|
||||
e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -24,12 +24,11 @@ import org.apache.logging.log4j.util.Supplier;
|
|||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskConfig;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskListener;
|
||||
import org.elasticsearch.cluster.NotMasterException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
|
@ -59,7 +58,6 @@ public class NodeJoinController extends AbstractComponent {
|
|||
private final ClusterService clusterService;
|
||||
private final AllocationService allocationService;
|
||||
private final ElectMasterService electMaster;
|
||||
private final DiscoverySettings discoverySettings;
|
||||
private final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor();
|
||||
|
||||
// this is set while trying to become a master
|
||||
|
@ -68,12 +66,11 @@ public class NodeJoinController extends AbstractComponent {
|
|||
|
||||
|
||||
public NodeJoinController(ClusterService clusterService, AllocationService allocationService, ElectMasterService electMaster,
|
||||
DiscoverySettings discoverySettings, Settings settings) {
|
||||
Settings settings) {
|
||||
super(settings);
|
||||
this.clusterService = clusterService;
|
||||
this.allocationService = allocationService;
|
||||
this.electMaster = electMaster;
|
||||
this.discoverySettings = discoverySettings;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -408,8 +405,9 @@ public class NodeJoinController extends AbstractComponent {
|
|||
class JoinTaskExecutor implements ClusterStateTaskExecutor<DiscoveryNode> {
|
||||
|
||||
@Override
|
||||
public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<DiscoveryNode> joiningNodes) throws Exception {
|
||||
final BatchResult.Builder<DiscoveryNode> results = BatchResult.builder();
|
||||
public ClusterTasksResult<DiscoveryNode> execute(ClusterState currentState, List<DiscoveryNode> joiningNodes) throws Exception {
|
||||
final ClusterTasksResult.Builder<DiscoveryNode> results = ClusterTasksResult.builder();
|
||||
|
||||
final DiscoveryNodes currentNodes = currentState.nodes();
|
||||
boolean nodesChanged = false;
|
||||
ClusterState.Builder newState;
|
||||
|
@ -471,8 +469,6 @@ public class NodeJoinController extends AbstractComponent {
|
|||
DiscoveryNodes currentNodes = currentState.nodes();
|
||||
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentNodes);
|
||||
nodesBuilder.masterNodeId(currentState.nodes().getLocalNodeId());
|
||||
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks())
|
||||
.removeGlobalBlock(discoverySettings.getNoMasterBlock()).build();
|
||||
for (final DiscoveryNode joiningNode : joiningNodes) {
|
||||
final DiscoveryNode nodeWithSameId = nodesBuilder.get(joiningNode.getId());
|
||||
if (nodeWithSameId != null && nodeWithSameId.equals(joiningNode) == false) {
|
||||
|
@ -490,7 +486,7 @@ public class NodeJoinController extends AbstractComponent {
|
|||
|
||||
// now trim any left over dead nodes - either left there when the previous master stepped down
|
||||
// or removed by us above
|
||||
ClusterState tmpState = ClusterState.builder(currentState).nodes(nodesBuilder).blocks(clusterBlocks).build();
|
||||
ClusterState tmpState = ClusterState.builder(currentState).nodes(nodesBuilder).build();
|
||||
return ClusterState.builder(allocationService.deassociateDeadNodes(tmpState, false,
|
||||
"removed dead nodes on election"));
|
||||
}
|
||||
|
|
|
@ -27,13 +27,12 @@ import org.elasticsearch.ElasticsearchException;
|
|||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskConfig;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskListener;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.LocalClusterUpdateTask;
|
||||
import org.elasticsearch.cluster.NotMasterException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
|
@ -207,25 +206,20 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
nodesFD.setLocalNode(clusterService.localNode());
|
||||
joinThreadControl.start();
|
||||
zenPing.start(this);
|
||||
this.nodeJoinController = new NodeJoinController(clusterService, allocationService, electMaster, discoverySettings, settings);
|
||||
this.nodeJoinController = new NodeJoinController(clusterService, allocationService, electMaster, settings);
|
||||
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, electMaster, this::submitRejoin, logger);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startInitialJoin() {
|
||||
// start the join thread from a cluster state update. See {@link JoinThreadControl} for details.
|
||||
clusterService.submitStateUpdateTask("initial_join", new ClusterStateUpdateTask() {
|
||||
clusterService.submitStateUpdateTask("initial_join", new LocalClusterUpdateTask() {
|
||||
|
||||
@Override
|
||||
public boolean runOnlyOnMaster() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) throws Exception {
|
||||
// do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered
|
||||
joinThreadControl.startNewThreadIfNotRunning();
|
||||
return currentState;
|
||||
return unchanged();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -352,7 +346,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
return joinThreadControl.joinThreadActive();
|
||||
}
|
||||
|
||||
|
||||
// used for testing
|
||||
public ClusterState[] pendingClusterStates() {
|
||||
return publishClusterState.pendingStatesQueue().pendingClusterStates();
|
||||
|
@ -408,18 +401,13 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
|
||||
// finalize join through the cluster state update thread
|
||||
final DiscoveryNode finalMasterNode = masterNode;
|
||||
clusterService.submitStateUpdateTask("finalize_join (" + masterNode + ")", new ClusterStateUpdateTask() {
|
||||
clusterService.submitStateUpdateTask("finalize_join (" + masterNode + ")", new LocalClusterUpdateTask() {
|
||||
@Override
|
||||
public boolean runOnlyOnMaster() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) throws Exception {
|
||||
if (!success) {
|
||||
// failed to join. Try again...
|
||||
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
|
||||
return currentState;
|
||||
return unchanged();
|
||||
}
|
||||
|
||||
if (currentState.getNodes().getMasterNode() == null) {
|
||||
|
@ -427,7 +415,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
// a valid master.
|
||||
logger.debug("no master node is set, despite of join request completing. retrying pings.");
|
||||
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
|
||||
return currentState;
|
||||
return unchanged();
|
||||
}
|
||||
|
||||
if (!currentState.getNodes().getMasterNode().equals(finalMasterNode)) {
|
||||
|
@ -437,7 +425,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
// Note: we do not have to start master fault detection here because it's set at {@link #processNextPendingClusterState }
|
||||
// when the first cluster state arrives.
|
||||
joinThreadControl.markThreadAsDone(currentThread);
|
||||
return currentState;
|
||||
return unchanged();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -496,9 +484,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
}
|
||||
|
||||
private void submitRejoin(String source) {
|
||||
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(Priority.IMMEDIATE) {
|
||||
clusterService.submitStateUpdateTask(source, new LocalClusterUpdateTask(Priority.IMMEDIATE) {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) {
|
||||
return rejoin(currentState, source);
|
||||
}
|
||||
|
||||
|
@ -554,7 +542,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
}
|
||||
|
||||
@Override
|
||||
public BatchResult<Task> execute(final ClusterState currentState, final List<Task> tasks) throws Exception {
|
||||
public ClusterTasksResult<Task> execute(final ClusterState currentState, final List<Task> tasks) throws Exception {
|
||||
final DiscoveryNodes.Builder remainingNodesBuilder = DiscoveryNodes.builder(currentState.nodes());
|
||||
boolean removed = false;
|
||||
for (final Task task : tasks) {
|
||||
|
@ -568,12 +556,12 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
|
||||
if (!removed) {
|
||||
// no nodes to remove, keep the current cluster state
|
||||
return BatchResult.<Task>builder().successes(tasks).build(currentState);
|
||||
return ClusterTasksResult.<Task>builder().successes(tasks).build(currentState);
|
||||
}
|
||||
|
||||
final ClusterState remainingNodesClusterState = remainingNodesClusterState(currentState, remainingNodesBuilder);
|
||||
|
||||
final BatchResult.Builder<Task> resultBuilder = BatchResult.<Task>builder().successes(tasks);
|
||||
final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.<Task>builder().successes(tasks);
|
||||
if (!electMasterService.hasEnoughMasterNodes(remainingNodesClusterState.nodes())) {
|
||||
rejoin.accept("not enough master nodes");
|
||||
return resultBuilder.build(currentState);
|
||||
|
@ -645,14 +633,14 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
// We only set the new value. If the master doesn't see enough nodes it will revoke it's mastership.
|
||||
return;
|
||||
}
|
||||
clusterService.submitStateUpdateTask("zen-disco-mini-master-nodes-changed", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
|
||||
clusterService.submitStateUpdateTask("zen-disco-min-master-nodes-changed", new LocalClusterUpdateTask(Priority.IMMEDIATE) {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) {
|
||||
// check if we have enough master nodes, if not, we need to move into joining the cluster again
|
||||
if (!electMaster.hasEnoughMasterNodes(currentState.nodes())) {
|
||||
return rejoin(currentState, "not enough master nodes on change of minimum_master_nodes from [" + prevMinimumMasterNode + "] to [" + minimumMasterNodes + "]");
|
||||
}
|
||||
return currentState;
|
||||
return unchanged();
|
||||
}
|
||||
|
||||
|
||||
|
@ -685,18 +673,13 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
|
||||
logger.info((Supplier<?>) () -> new ParameterizedMessage("master_left [{}], reason [{}]", masterNode, reason), cause);
|
||||
|
||||
clusterService.submitStateUpdateTask("master_failed (" + masterNode + ")", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
|
||||
clusterService.submitStateUpdateTask("master_failed (" + masterNode + ")", new LocalClusterUpdateTask(Priority.IMMEDIATE) {
|
||||
|
||||
@Override
|
||||
public boolean runOnlyOnMaster() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) {
|
||||
if (!masterNode.equals(currentState.nodes().getMasterNode())) {
|
||||
// master got switched on us, no need to send anything
|
||||
return currentState;
|
||||
return unchanged();
|
||||
}
|
||||
|
||||
// flush any pending cluster states from old master, so it will not be set as master again
|
||||
|
@ -710,29 +693,20 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
logger.error((Supplier<?>) () -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
void processNextPendingClusterState(String reason) {
|
||||
clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + reason + "])", new ClusterStateUpdateTask(Priority.URGENT) {
|
||||
@Override
|
||||
public boolean runOnlyOnMaster() {
|
||||
return false;
|
||||
}
|
||||
|
||||
clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + reason + "])", new LocalClusterUpdateTask(Priority.URGENT) {
|
||||
ClusterState newClusterState = null;
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) {
|
||||
newClusterState = publishClusterState.pendingStatesQueue().getNextClusterStateToProcess();
|
||||
|
||||
// all pending states have been processed
|
||||
if (newClusterState == null) {
|
||||
return currentState;
|
||||
return unchanged();
|
||||
}
|
||||
|
||||
assert newClusterState.nodes().getMasterNode() != null : "received a cluster state without a master";
|
||||
|
@ -743,7 +717,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
}
|
||||
|
||||
if (shouldIgnoreOrRejectNewClusterState(logger, currentState, newClusterState)) {
|
||||
return currentState;
|
||||
return unchanged();
|
||||
}
|
||||
|
||||
// check to see that we monitor the correct master of the cluster
|
||||
|
@ -754,7 +728,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
if (currentState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock())) {
|
||||
// its a fresh update from the master as we transition from a start of not having a master to having one
|
||||
logger.debug("got first state from fresh master [{}]", newClusterState.nodes().getMasterNodeId());
|
||||
return newClusterState;
|
||||
return newState(newClusterState);
|
||||
}
|
||||
|
||||
|
||||
|
@ -784,7 +758,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
builder.metaData(metaDataBuilder);
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
return newState(builder.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -962,7 +936,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
return pingResponses;
|
||||
}
|
||||
|
||||
protected ClusterState rejoin(ClusterState clusterState, String reason) {
|
||||
protected ClusterStateTaskExecutor.ClusterTasksResult<LocalClusterUpdateTask> rejoin(ClusterState clusterState, String reason) {
|
||||
|
||||
// *** called from within an cluster state update task *** //
|
||||
assert Thread.currentThread().getName().contains(ClusterService.UPDATE_THREAD_NAME);
|
||||
|
@ -971,29 +945,17 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
nodesFD.stop();
|
||||
masterFD.stop(reason);
|
||||
|
||||
|
||||
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(clusterState.blocks())
|
||||
.addGlobalBlock(discoverySettings.getNoMasterBlock())
|
||||
.build();
|
||||
|
||||
// clean the nodes, we are now not connected to anybody, since we try and reform the cluster
|
||||
DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder(clusterState.nodes()).masterNodeId(null).build();
|
||||
|
||||
// TODO: do we want to force a new thread if we actively removed the master? this is to give a full pinging cycle
|
||||
// before a decision is made.
|
||||
joinThreadControl.startNewThreadIfNotRunning();
|
||||
|
||||
return ClusterState.builder(clusterState)
|
||||
.blocks(clusterBlocks)
|
||||
.nodes(discoveryNodes)
|
||||
.build();
|
||||
return LocalClusterUpdateTask.noMaster();
|
||||
}
|
||||
|
||||
private boolean localNodeMaster() {
|
||||
return nodes().isLocalNodeElectedMaster();
|
||||
}
|
||||
|
||||
private ClusterState handleAnotherMaster(ClusterState localClusterState, final DiscoveryNode otherMaster, long otherClusterStateVersion, String reason) {
|
||||
private ClusterStateTaskExecutor.ClusterTasksResult handleAnotherMaster(ClusterState localClusterState, final DiscoveryNode otherMaster, long otherClusterStateVersion, String reason) {
|
||||
assert localClusterState.nodes().isLocalNodeElectedMaster() : "handleAnotherMaster called but current node is not a master";
|
||||
assert Thread.currentThread().getName().contains(ClusterService.UPDATE_THREAD_NAME) : "not called from the cluster state update thread";
|
||||
|
||||
|
@ -1016,7 +978,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
} catch (Exception e) {
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to send rejoin request to [{}]", otherMaster), e);
|
||||
}
|
||||
return localClusterState;
|
||||
return LocalClusterUpdateTask.unchanged();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1083,12 +1045,16 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
return;
|
||||
}
|
||||
logger.debug("got a ping from another master {}. resolving who should rejoin. current ping count: [{}]", pingRequest.masterNode(), pingsWhileMaster.get());
|
||||
clusterService.submitStateUpdateTask("ping from another master", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
|
||||
clusterService.submitStateUpdateTask("ping from another master", new LocalClusterUpdateTask(Priority.IMMEDIATE) {
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
pingsWhileMaster.set(0);
|
||||
return handleAnotherMaster(currentState, pingRequest.masterNode(), pingRequest.clusterStateVersion(), "node fd ping");
|
||||
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) throws Exception {
|
||||
if (currentState.nodes().isLocalNodeElectedMaster()) {
|
||||
pingsWhileMaster.set(0);
|
||||
return handleAnotherMaster(currentState, pingRequest.masterNode(), pingRequest.clusterStateVersion(), "node fd ping");
|
||||
} else {
|
||||
return unchanged();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1134,15 +1100,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
class RejoinClusterRequestHandler implements TransportRequestHandler<RejoinClusterRequest> {
|
||||
@Override
|
||||
public void messageReceived(final RejoinClusterRequest request, final TransportChannel channel) throws Exception {
|
||||
clusterService.submitStateUpdateTask("received a request to rejoin the cluster from [" + request.fromNodeId + "]", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
|
||||
clusterService.submitStateUpdateTask("received a request to rejoin the cluster from [" + request.fromNodeId + "]", new LocalClusterUpdateTask(Priority.IMMEDIATE) {
|
||||
|
||||
@Override
|
||||
public boolean runOnlyOnMaster() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) {
|
||||
try {
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
} catch (Exception e) {
|
||||
|
@ -1186,7 +1147,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
}
|
||||
|
||||
/** cleans any running joining thread and calls {@link #rejoin} */
|
||||
public ClusterState stopRunningThreadAndRejoin(ClusterState clusterState, String reason) {
|
||||
public ClusterStateTaskExecutor.ClusterTasksResult<LocalClusterUpdateTask> stopRunningThreadAndRejoin(ClusterState clusterState, String reason) {
|
||||
ClusterService.assertClusterStateThread();
|
||||
currentJoinThread.set(null);
|
||||
return rejoin(clusterState, reason);
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.elasticsearch.cluster.ClusterName;
|
|||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.ClusterStateObserver;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.LocalClusterUpdateTask;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
|
@ -283,24 +283,19 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
|
|||
return;
|
||||
}
|
||||
|
||||
clusterService.submitStateUpdateTask("indices_store ([" + shardId + "] active fully on other nodes)", new ClusterStateUpdateTask() {
|
||||
clusterService.submitStateUpdateTask("indices_store ([" + shardId + "] active fully on other nodes)", new LocalClusterUpdateTask() {
|
||||
@Override
|
||||
public boolean runOnlyOnMaster() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) throws Exception {
|
||||
if (clusterStateVersion != currentState.getVersion()) {
|
||||
logger.trace("not deleting shard {}, the update task state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, currentState.getVersion(), clusterStateVersion);
|
||||
return currentState;
|
||||
return unchanged();
|
||||
}
|
||||
try {
|
||||
indicesService.deleteShardStore("no longer used", shardId, currentState);
|
||||
} catch (Exception ex) {
|
||||
logger.debug((Supplier<?>) () -> new ParameterizedMessage("{} failed to delete unallocated shard, ignoring", shardId), ex);
|
||||
}
|
||||
return currentState;
|
||||
return unchanged();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -564,6 +564,7 @@ public class Node implements Closeable {
|
|||
injector.getInstance(ResourceWatcherService.class).start();
|
||||
injector.getInstance(GatewayService.class).start();
|
||||
Discovery discovery = injector.getInstance(Discovery.class);
|
||||
clusterService.setDiscoverySettings(discovery.getDiscoverySettings());
|
||||
clusterService.addInitialStateBlock(discovery.getDiscoverySettings().getNoMasterBlock());
|
||||
clusterService.setClusterStatePublisher(discovery::publish);
|
||||
|
||||
|
|
|
@ -628,8 +628,8 @@ public class RestoreService extends AbstractComponent implements ClusterStateApp
|
|||
}
|
||||
|
||||
@Override
|
||||
public BatchResult<Task> execute(final ClusterState currentState, final List<Task> tasks) throws Exception {
|
||||
final BatchResult.Builder<Task> resultBuilder = BatchResult.<Task>builder().successes(tasks);
|
||||
public ClusterTasksResult<Task> execute(final ClusterState currentState, final List<Task> tasks) throws Exception {
|
||||
final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.<Task>builder().successes(tasks);
|
||||
Set<Snapshot> completedSnapshots = tasks.stream().map(e -> e.snapshot).collect(Collectors.toSet());
|
||||
final List<RestoreInProgress.Entry> entries = new ArrayList<>();
|
||||
final RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE);
|
||||
|
|
|
@ -359,19 +359,19 @@ public class TribeService extends AbstractLifecycleComponent {
|
|||
this.tribeName = tribeName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean runOnlyOnMaster() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String describeTasks(List<ClusterChangedEvent> tasks) {
|
||||
return tasks.stream().map(ClusterChangedEvent::source).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
|
||||
}
|
||||
|
||||
@Override
|
||||
public BatchResult<ClusterChangedEvent> execute(ClusterState currentState, List<ClusterChangedEvent> tasks) throws Exception {
|
||||
BatchResult.Builder<ClusterChangedEvent> builder = BatchResult.builder();
|
||||
public boolean runOnlyOnMaster() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterTasksResult<ClusterChangedEvent> execute(ClusterState currentState, List<ClusterChangedEvent> tasks) throws Exception {
|
||||
ClusterTasksResult.Builder<ClusterChangedEvent> builder = ClusterTasksResult.builder();
|
||||
ClusterState.Builder newState = ClusterState.builder(currentState).incrementVersion();
|
||||
boolean clusterStateChanged = updateNodes(currentState, tasks, newState);
|
||||
clusterStateChanged |= updateIndicesAndMetaData(currentState, tasks, newState);
|
||||
|
|
|
@ -88,7 +88,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
|
|||
|
||||
public void testEmptyTaskListProducesSameClusterState() throws Exception {
|
||||
List<ShardStateAction.ShardEntry> tasks = Collections.emptyList();
|
||||
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardEntry> result =
|
||||
ClusterStateTaskExecutor.ClusterTasksResult<ShardStateAction.ShardEntry> result =
|
||||
executor.execute(clusterState, tasks);
|
||||
assertTasksSuccessful(tasks, result, clusterState, false);
|
||||
}
|
||||
|
@ -97,7 +97,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
|
|||
String reason = "test duplicate failures are okay";
|
||||
ClusterState currentState = createClusterStateWithStartedShards(reason);
|
||||
List<ShardStateAction.ShardEntry> tasks = createExistingShards(currentState, reason);
|
||||
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardEntry> result = executor.execute(currentState, tasks);
|
||||
ClusterStateTaskExecutor.ClusterTasksResult<ShardStateAction.ShardEntry> result = executor.execute(currentState, tasks);
|
||||
assertTasksSuccessful(tasks, result, clusterState, true);
|
||||
}
|
||||
|
||||
|
@ -105,7 +105,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
|
|||
String reason = "test non existent shards are marked as successful";
|
||||
ClusterState currentState = createClusterStateWithStartedShards(reason);
|
||||
List<ShardStateAction.ShardEntry> tasks = createNonExistentShards(currentState, reason);
|
||||
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardEntry> result = executor.execute(clusterState, tasks);
|
||||
ClusterStateTaskExecutor.ClusterTasksResult<ShardStateAction.ShardEntry> result = executor.execute(clusterState, tasks);
|
||||
assertTasksSuccessful(tasks, result, clusterState, false);
|
||||
}
|
||||
|
||||
|
@ -123,7 +123,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
|
|||
List<ShardStateAction.ShardEntry> tasks = new ArrayList<>();
|
||||
tasks.addAll(failingTasks);
|
||||
tasks.addAll(nonExistentTasks);
|
||||
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardEntry> result = failingExecutor.execute(currentState, tasks);
|
||||
ClusterStateTaskExecutor.ClusterTasksResult<ShardStateAction.ShardEntry> result = failingExecutor.execute(currentState, tasks);
|
||||
Map<ShardStateAction.ShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap =
|
||||
failingTasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.failure(new RuntimeException("simulated applyFailedShards failure"))));
|
||||
taskResultMap.putAll(nonExistentTasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.success())));
|
||||
|
@ -146,7 +146,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
|
|||
task -> ClusterStateTaskExecutor.TaskResult.failure(new ShardStateAction.NoLongerPrimaryShardException(task.shardId,
|
||||
"primary term [" + task.primaryTerm + "] did not match current primary term [" +
|
||||
currentState.metaData().index(task.shardId.getIndex()).primaryTerm(task.shardId.id()) + "]"))));
|
||||
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardEntry> result = executor.execute(currentState, tasks);
|
||||
ClusterStateTaskExecutor.ClusterTasksResult<ShardStateAction.ShardEntry> result = executor.execute(currentState, tasks);
|
||||
assertTaskResults(taskResultMap, result, currentState, false);
|
||||
}
|
||||
|
||||
|
@ -214,7 +214,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
|
|||
|
||||
private static void assertTasksSuccessful(
|
||||
List<ShardStateAction.ShardEntry> tasks,
|
||||
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardEntry> result,
|
||||
ClusterStateTaskExecutor.ClusterTasksResult<ShardStateAction.ShardEntry> result,
|
||||
ClusterState clusterState,
|
||||
boolean clusterStateChanged
|
||||
) {
|
||||
|
@ -225,7 +225,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
|
|||
|
||||
private static void assertTaskResults(
|
||||
Map<ShardStateAction.ShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap,
|
||||
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardEntry> result,
|
||||
ClusterStateTaskExecutor.ClusterTasksResult<ShardStateAction.ShardEntry> result,
|
||||
ClusterState clusterState,
|
||||
boolean clusterStateChanged
|
||||
) {
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.action.support.PlainActionFuture;
|
|||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.LocalClusterUpdateTask;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
|
@ -130,22 +131,17 @@ public class ClusterStateHealthTests extends ESTestCase {
|
|||
});
|
||||
|
||||
logger.info("--> submit task to restore master");
|
||||
clusterService.submitStateUpdateTask("restore master", new ClusterStateUpdateTask() {
|
||||
clusterService.submitStateUpdateTask("restore master", new LocalClusterUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
final DiscoveryNodes nodes = currentState.nodes();
|
||||
return ClusterState.builder(currentState).nodes(DiscoveryNodes.builder(nodes).masterNodeId(nodes.getLocalNodeId())).build();
|
||||
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) throws Exception {
|
||||
return newState(ClusterState.builder(currentState).nodes(
|
||||
DiscoveryNodes.builder(currentState.nodes()).masterNodeId(currentState.nodes().getLocalNodeId())).build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.warn("unexpected failure", e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean runOnlyOnMaster() {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
|
||||
logger.info("--> waiting for listener to be called and cluster state being blocked");
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.logging.log4j.util.Supplier;
|
|||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.LocalClusterUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskConfig;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
|
||||
|
@ -137,6 +138,8 @@ public class ClusterServiceTests extends ESTestCase {
|
|||
});
|
||||
timedClusterService.setClusterStatePublisher((event, ackListener) -> {
|
||||
});
|
||||
timedClusterService.setDiscoverySettings(new DiscoverySettings(Settings.EMPTY,
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)));
|
||||
timedClusterService.start();
|
||||
ClusterState state = timedClusterService.state();
|
||||
final DiscoveryNodes nodes = state.nodes();
|
||||
|
@ -284,17 +287,12 @@ public class ClusterServiceTests extends ESTestCase {
|
|||
|
||||
taskFailed[0] = true;
|
||||
final CountDownLatch latch2 = new CountDownLatch(1);
|
||||
nonMaster.submitStateUpdateTask("test", new ClusterStateUpdateTask() {
|
||||
nonMaster.submitStateUpdateTask("test", new LocalClusterUpdateTask() {
|
||||
@Override
|
||||
public boolean runOnlyOnMaster() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) throws Exception {
|
||||
taskFailed[0] = false;
|
||||
latch2.countDown();
|
||||
return currentState;
|
||||
return unchanged();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -324,14 +322,9 @@ public class ClusterServiceTests extends ESTestCase {
|
|||
ClusterStateTaskConfig.build(Priority.NORMAL),
|
||||
new ClusterStateTaskExecutor<Object>() {
|
||||
@Override
|
||||
public boolean runOnlyOnMaster() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BatchResult<Object> execute(ClusterState currentState, List<Object> tasks) throws Exception {
|
||||
public ClusterTasksResult<Object> execute(ClusterState currentState, List<Object> tasks) throws Exception {
|
||||
ClusterState newClusterState = ClusterState.builder(currentState).build();
|
||||
return BatchResult.builder().successes(tasks).build(newClusterState);
|
||||
return ClusterTasksResult.builder().successes(tasks).build(newClusterState);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -367,19 +360,9 @@ public class ClusterServiceTests extends ESTestCase {
|
|||
ClusterStateTaskConfig.build(Priority.NORMAL),
|
||||
new ClusterStateTaskExecutor<Object>() {
|
||||
@Override
|
||||
public boolean runOnlyOnMaster() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BatchResult<Object> execute(ClusterState currentState, List<Object> tasks) throws Exception {
|
||||
public ClusterTasksResult<Object> execute(ClusterState currentState, List<Object> tasks) throws Exception {
|
||||
ClusterState newClusterState = ClusterState.builder(currentState).build();
|
||||
return BatchResult.builder().successes(tasks).build(newClusterState);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
|
||||
assertNotNull(assertionRef.get());
|
||||
return ClusterTasksResult.builder().successes(tasks).build(newClusterState);
|
||||
}
|
||||
},
|
||||
new ClusterStateTaskListener() {
|
||||
|
@ -419,16 +402,11 @@ public class ClusterServiceTests extends ESTestCase {
|
|||
class TaskExecutor implements ClusterStateTaskExecutor<String> {
|
||||
|
||||
@Override
|
||||
public BatchResult<String> execute(ClusterState currentState, List<String> tasks) throws Exception {
|
||||
public ClusterTasksResult<String> execute(ClusterState currentState, List<String> tasks) throws Exception {
|
||||
executionOrder.addAll(tasks); // do this first, so startedProcessing can be used as a notification that this is done.
|
||||
startedProcessing.release(tasks.size());
|
||||
allowProcessing.acquire(tasks.size());
|
||||
return BatchResult.<String>builder().successes(tasks).build(ClusterState.builder(currentState).build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean runOnlyOnMaster() {
|
||||
return false;
|
||||
return ClusterTasksResult.<String>builder().successes(tasks).build(ClusterState.builder(currentState).build());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -478,14 +456,9 @@ public class ClusterServiceTests extends ESTestCase {
|
|||
List<Integer> tasks = new ArrayList<>();
|
||||
|
||||
@Override
|
||||
public BatchResult<Integer> execute(ClusterState currentState, List<Integer> tasks) throws Exception {
|
||||
public ClusterTasksResult<Integer> execute(ClusterState currentState, List<Integer> tasks) throws Exception {
|
||||
this.tasks.addAll(tasks);
|
||||
return BatchResult.<Integer>builder().successes(tasks).build(ClusterState.builder(currentState).build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean runOnlyOnMaster() {
|
||||
return false;
|
||||
return ClusterTasksResult.<Integer>builder().successes(tasks).build(ClusterState.builder(currentState).build());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -573,7 +546,7 @@ public class ClusterServiceTests extends ESTestCase {
|
|||
(currentState, taskList) -> {
|
||||
assertThat(taskList.size(), equalTo(tasks.size()));
|
||||
assertThat(taskList.stream().collect(Collectors.toSet()), equalTo(tasks.keySet()));
|
||||
return ClusterStateTaskExecutor.BatchResult.<Integer>builder().successes(taskList).build(currentState);
|
||||
return ClusterStateTaskExecutor.ClusterTasksResult.<Integer>builder().successes(taskList).build(currentState);
|
||||
});
|
||||
|
||||
latch.await();
|
||||
|
@ -637,7 +610,7 @@ public class ClusterServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public BatchResult<Task> execute(ClusterState currentState, List<Task> tasks) throws Exception {
|
||||
public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> tasks) throws Exception {
|
||||
for (Set<Task> expectedSet : taskGroups) {
|
||||
long count = tasks.stream().filter(expectedSet::contains).count();
|
||||
assertThat("batched set should be executed together or not at all. Expected " + expectedSet + "s. Executing " + tasks,
|
||||
|
@ -651,12 +624,7 @@ public class ClusterServiceTests extends ESTestCase {
|
|||
batches.incrementAndGet();
|
||||
semaphore.acquire();
|
||||
}
|
||||
return BatchResult.<Task>builder().successes(tasks).build(maybeUpdatedClusterState);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean runOnlyOnMaster() {
|
||||
return false;
|
||||
return ClusterTasksResult.<Task>builder().successes(tasks).build(maybeUpdatedClusterState);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -812,7 +780,7 @@ public class ClusterServiceTests extends ESTestCase {
|
|||
clusterService.submitStateUpdateTask("blocking", blockingTask);
|
||||
|
||||
ClusterStateTaskExecutor<SimpleTask> executor = (currentState, tasks) ->
|
||||
ClusterStateTaskExecutor.BatchResult.<SimpleTask>builder().successes(tasks).build(currentState);
|
||||
ClusterStateTaskExecutor.ClusterTasksResult.<SimpleTask>builder().successes(tasks).build(currentState);
|
||||
|
||||
SimpleTask task = new SimpleTask(1);
|
||||
ClusterStateTaskListener listener = new ClusterStateTaskListener() {
|
||||
|
@ -1109,6 +1077,8 @@ public class ClusterServiceTests extends ESTestCase {
|
|||
throw new Discovery.FailedToCommitClusterStateException("just to test this");
|
||||
}
|
||||
});
|
||||
timedClusterService.setDiscoverySettings(new DiscoverySettings(Settings.EMPTY,
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)));
|
||||
timedClusterService.start();
|
||||
ClusterState state = timedClusterService.state();
|
||||
final DiscoveryNodes nodes = state.nodes();
|
||||
|
|
|
@ -117,8 +117,7 @@ public class NodeJoinControllerTests extends ESTestCase {
|
|||
setState(clusterService, ClusterState.builder(clusterService.state()).nodes(
|
||||
DiscoveryNodes.builder(initialNodes).masterNodeId(localNode.getId())));
|
||||
nodeJoinController = new NodeJoinController(clusterService, createAllocationService(Settings.EMPTY),
|
||||
new ElectMasterService(Settings.EMPTY), new DiscoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
|
||||
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), Settings.EMPTY);
|
||||
new ElectMasterService(Settings.EMPTY), Settings.EMPTY);
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
|
@ -32,7 +32,6 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
@ -67,7 +66,7 @@ public class NodeRemovalClusterStateTaskExecutorTests extends ESTestCase {
|
|||
.map(node -> new ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task(node, randomBoolean() ? "left" : "failed"))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
final ClusterStateTaskExecutor.BatchResult<ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task> result
|
||||
final ClusterStateTaskExecutor.ClusterTasksResult<ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task> result
|
||||
= executor.execute(clusterState, tasks);
|
||||
assertThat(result.resultingState, equalTo(clusterState));
|
||||
}
|
||||
|
@ -106,7 +105,7 @@ public class NodeRemovalClusterStateTaskExecutorTests extends ESTestCase {
|
|||
}
|
||||
final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).nodes(builder).build();
|
||||
|
||||
final ClusterStateTaskExecutor.BatchResult<ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task> result =
|
||||
final ClusterStateTaskExecutor.ClusterTasksResult<ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task> result =
|
||||
executor.execute(clusterState, tasks);
|
||||
verify(electMasterService).hasEnoughMasterNodes(eq(remainingNodesClusterState.get().nodes()));
|
||||
verifyNoMoreInteractions(electMasterService);
|
||||
|
@ -156,7 +155,7 @@ public class NodeRemovalClusterStateTaskExecutorTests extends ESTestCase {
|
|||
}
|
||||
final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).nodes(builder).build();
|
||||
|
||||
final ClusterStateTaskExecutor.BatchResult<ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task> result =
|
||||
final ClusterStateTaskExecutor.ClusterTasksResult<ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task> result =
|
||||
executor.execute(clusterState, tasks);
|
||||
verify(electMasterService).hasEnoughMasterNodes(eq(remainingNodesClusterState.get().nodes()));
|
||||
verifyNoMoreInteractions(electMasterService);
|
||||
|
|
|
@ -23,7 +23,7 @@ import org.apache.logging.log4j.Logger;
|
|||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.LocalClusterUpdateTask;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
|
@ -395,9 +395,9 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
|
|||
// disable relocations when we do this, to make sure the shards are not relocated from node2
|
||||
// due to rebalancing, and delete its content
|
||||
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)).get();
|
||||
internalCluster().getInstance(ClusterService.class, nonMasterNode).submitStateUpdateTask("test", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
|
||||
internalCluster().getInstance(ClusterService.class, nonMasterNode).submitStateUpdateTask("test", new LocalClusterUpdateTask(Priority.IMMEDIATE) {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) throws Exception {
|
||||
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
|
||||
for (int i = 0; i < numShards; i++) {
|
||||
indexRoutingTableBuilder.addIndexShard(
|
||||
|
@ -406,14 +406,9 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
|
|||
.build()
|
||||
);
|
||||
}
|
||||
return ClusterState.builder(currentState)
|
||||
return newState(ClusterState.builder(currentState)
|
||||
.routingTable(RoutingTable.builder().add(indexRoutingTableBuilder).build())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean runOnlyOnMaster() {
|
||||
return false;
|
||||
.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -22,10 +22,10 @@ package org.elasticsearch.tribe;
|
|||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.support.DestructiveOperations;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.LocalClusterUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
|
@ -556,7 +556,7 @@ public class TribeIT extends ESIntegTestCase {
|
|||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
clusterService.submitStateUpdateTask("update customMetaData", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
|
||||
@Override
|
||||
public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
|
@ -564,7 +564,7 @@ public class TribeIT extends ESIntegTestCase {
|
|||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
MetaData.Builder builder = MetaData.builder(currentState.metaData());
|
||||
builder = addCustoms.apply(builder);
|
||||
return new ClusterState.Builder(currentState).metaData(builder).build();
|
||||
return ClusterState.builder(currentState).metaData(builder).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,15 +20,15 @@ package org.elasticsearch.test;
|
|||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.LocalClusterUpdateTask;
|
||||
import org.elasticsearch.cluster.NodeConnectionsService;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
@ -65,6 +65,8 @@ public class ClusterServiceUtils {
|
|||
});
|
||||
clusterService.setClusterStatePublisher((event, ackListener) -> {
|
||||
});
|
||||
clusterService.setDiscoverySettings(new DiscoverySettings(Settings.EMPTY,
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)));
|
||||
clusterService.start();
|
||||
final DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterService.state().nodes());
|
||||
nodes.masterNodeId(clusterService.localNode().getId());
|
||||
|
@ -84,16 +86,11 @@ public class ClusterServiceUtils {
|
|||
|
||||
public static void setState(ClusterService clusterService, ClusterState clusterState) {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
clusterService.submitStateUpdateTask("test setting state", new ClusterStateUpdateTask() {
|
||||
clusterService.submitStateUpdateTask("test setting state", new LocalClusterUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) throws Exception {
|
||||
// make sure we increment versions as listener may depend on it for change
|
||||
return ClusterState.builder(clusterState).version(currentState.version() + 1).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean runOnlyOnMaster() {
|
||||
return false;
|
||||
return newState(ClusterState.builder(clusterState).version(currentState.version() + 1).build());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
package org.elasticsearch.test.disruption;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.LocalClusterUpdateTask;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
@ -58,21 +58,16 @@ public class BlockClusterStateProcessing extends SingleNodeDisruption {
|
|||
boolean success = disruptionLatch.compareAndSet(null, new CountDownLatch(1));
|
||||
assert success : "startDisrupting called without waiting on stopDisrupting to complete";
|
||||
final CountDownLatch started = new CountDownLatch(1);
|
||||
clusterService.submitStateUpdateTask("service_disruption_block", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
|
||||
clusterService.submitStateUpdateTask("service_disruption_block", new LocalClusterUpdateTask(Priority.IMMEDIATE) {
|
||||
|
||||
@Override
|
||||
public boolean runOnlyOnMaster() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) throws Exception {
|
||||
started.countDown();
|
||||
CountDownLatch latch = disruptionLatch.get();
|
||||
if (latch != null) {
|
||||
latch.await();
|
||||
}
|
||||
return currentState;
|
||||
return unchanged();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
package org.elasticsearch.test.disruption;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.LocalClusterUpdateTask;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
@ -102,15 +102,10 @@ public class SlowClusterStateProcessing extends SingleNodeDisruption {
|
|||
return false;
|
||||
}
|
||||
final AtomicBoolean stopped = new AtomicBoolean(false);
|
||||
clusterService.submitStateUpdateTask("service_disruption_delay", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
|
||||
clusterService.submitStateUpdateTask("service_disruption_delay", new LocalClusterUpdateTask(Priority.IMMEDIATE) {
|
||||
|
||||
@Override
|
||||
public boolean runOnlyOnMaster() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) throws Exception {
|
||||
long count = duration.millis() / 200;
|
||||
// wait while checking for a stopped
|
||||
for (; count > 0 && !stopped.get(); count--) {
|
||||
|
@ -120,7 +115,7 @@ public class SlowClusterStateProcessing extends SingleNodeDisruption {
|
|||
Thread.sleep(duration.millis() % 200);
|
||||
}
|
||||
countDownLatch.countDown();
|
||||
return currentState;
|
||||
return unchanged();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue