Split cluster state update tasks into roles

This commit splits cluster state update tasks into roles. Those roles
are:
 - task info
 - task configuration
 - task executor
 - task listener

All tasks that have the same executor will be executed in batches. This
removes the need for local batching as was previously in
MetaDataMappingService.

Additionally, this commit reintroduces batching on mapping update calls.

Relates #13627
This commit is contained in:
Boaz Leskes 2015-10-23 21:11:15 +02:00 committed by Jason Tedor
parent a4e22b44e4
commit f0f89e708d
30 changed files with 856 additions and 689 deletions

View File

@ -74,7 +74,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
protected void masterOperation(final ClusterHealthRequest request, final ClusterState unusedState, final ActionListener<ClusterHealthResponse> listener) {
if (request.waitForEvents() != null) {
final long endTimeMS = TimeValue.nsecToMSec(System.nanoTime()) + request.timeout().millis();
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", request.waitForEvents(), new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", new ClusterStateUpdateTask(request.waitForEvents()) {
@Override
public ClusterState execute(ClusterState currentState) {
return currentState;

View File

@ -68,7 +68,7 @@ public class TransportClusterRerouteAction extends TransportMasterNodeAction<Clu
@Override
protected void masterOperation(final ClusterRerouteRequest request, final ClusterState state, final ActionListener<ClusterRerouteResponse> listener) {
clusterService.submitStateUpdateTask("cluster_reroute (api)", Priority.IMMEDIATE, new AckedClusterStateUpdateTask<ClusterRerouteResponse>(request, listener) {
clusterService.submitStateUpdateTask("cluster_reroute (api)", new AckedClusterStateUpdateTask<ClusterRerouteResponse>(Priority.IMMEDIATE, request, listener) {
private volatile ClusterState clusterStateToSend;
private volatile RoutingExplanations explanations;

View File

@ -91,7 +91,8 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeAct
final Settings.Builder transientUpdates = Settings.settingsBuilder();
final Settings.Builder persistentUpdates = Settings.settingsBuilder();
clusterService.submitStateUpdateTask("cluster_update_settings", Priority.IMMEDIATE, new AckedClusterStateUpdateTask<ClusterUpdateSettingsResponse>(request, listener) {
clusterService.submitStateUpdateTask("cluster_update_settings",
new AckedClusterStateUpdateTask<ClusterUpdateSettingsResponse>(Priority.IMMEDIATE, request, listener) {
private volatile boolean changed = false;
@ -132,7 +133,8 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeAct
// in the components (e.g. FilterAllocationDecider), so the changes made by the first call aren't visible
// to the components until the ClusterStateListener instances have been invoked, but are visible after
// the first update task has been completed.
clusterService.submitStateUpdateTask("reroute_after_cluster_update_settings", Priority.URGENT, new AckedClusterStateUpdateTask<ClusterUpdateSettingsResponse>(request, listener) {
clusterService.submitStateUpdateTask("reroute_after_cluster_update_settings",
new AckedClusterStateUpdateTask<ClusterUpdateSettingsResponse>(Priority.URGENT, request, listener) {
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {

View File

@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
public interface AckedClusterStateTaskListener extends ClusterStateTaskListener {
/**
* Called to determine which nodes the acknowledgement is expected from
*
* @param discoveryNode a node
* @return true if the node is expected to send ack back, false otherwise
*/
boolean mustAck(DiscoveryNode discoveryNode);
/**
* Called once all the nodes have acknowledged the cluster state update request. Must be
* very lightweight execution, since it gets executed on the cluster service thread.
*
* @param t optional error that might have been thrown
*/
void onAllNodesAcked(@Nullable Throwable t);
/**
* Called once the acknowledgement timeout defined by
* {@link AckedClusterStateUpdateTask#ackTimeout()} has expired
*/
void onAckTimeout();
/**
* Acknowledgement timeout, maximum time interval to wait for acknowledgements
*/
TimeValue ackTimeout();
}

View File

@ -22,18 +22,24 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ack.AckedRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue;
/**
* An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when
* all the nodes have acknowledged a cluster state update request
*/
public abstract class AckedClusterStateUpdateTask<Response> extends ClusterStateUpdateTask {
public abstract class AckedClusterStateUpdateTask<Response> extends ClusterStateUpdateTask implements AckedClusterStateTaskListener {
private final ActionListener<Response> listener;
private final AckedRequest request;
protected AckedClusterStateUpdateTask(AckedRequest request, ActionListener<Response> listener) {
this(Priority.NORMAL, request, listener);
}
protected AckedClusterStateUpdateTask(Priority priority, AckedRequest request, ActionListener<Response> listener) {
super(priority);
this.listener = listener;
this.request = request;
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.unit.TimeValue;
@ -101,12 +100,16 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
void add(@Nullable TimeValue timeout, TimeoutClusterStateListener listener);
/**
* Submits a task that will update the cluster state.
* Submits a task that will update the cluster state, using the given config. result will communicated
* to the given listener
*/
void submitStateUpdateTask(final String source, Priority priority, final ClusterStateUpdateTask updateTask);
<T> void submitStateUpdateTask(final String source, final T task,
final ClusterStateTaskConfig config,
final ClusterStateTaskExecutor<T> executor,
final ClusterStateTaskListener listener);
/**
* Submits a task that will update the cluster state (the task has a default priority of {@link Priority#NORMAL}).
* Submits a task that will update the cluster state;
*/
void submitStateUpdateTask(final String source, final ClusterStateUpdateTask updateTask);

View File

@ -0,0 +1,64 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue;
public interface ClusterStateTaskConfig {
/**
* If the cluster state update task wasn't processed by the provided timeout, call
* {@link ClusterStateTaskListener#onFailure(String, Throwable)}. May return null to indicate no timeout is needed (default).
*/
@Nullable
TimeValue timeout();
Priority priority();
static ClusterStateTaskConfig build(Priority priority) {
return new Basic(priority, null);
}
static ClusterStateTaskConfig build(Priority priority, TimeValue timeout) {
return new Basic(priority, timeout);
}
class Basic implements ClusterStateTaskConfig {
final TimeValue timeout;
final Priority priority;
public Basic(Priority priority, TimeValue timeout) {
this.timeout = timeout;
this.priority = priority;
}
@Override
public TimeValue timeout() {
return timeout;
}
@Override
public Priority priority() {
return priority;
}
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;
import java.util.Arrays;
import java.util.List;
public interface ClusterStateTaskExecutor<T> {
/**
* Update the cluster state based on the current state and the given tasks. Return the *same instance* if no state
* should be changed.
*/
Result execute(ClusterState currentState, List<T> tasks) throws Exception;
/**
* indicates whether this task should only run if current node is master
*/
default boolean runOnlyOnMaster() {
return true;
}
class Result {
final public ClusterState resultingState;
final public List<Throwable> failures;
public Result(ClusterState resultingState, int numberOfTasks) {
this.resultingState = resultingState;
failures = Arrays.asList(new Throwable[numberOfTasks]);
}
public Result(ClusterState resultingState, List<Throwable> failures) {
this.resultingState = resultingState;
this.failures = failures;
}
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import java.util.List;
public interface ClusterStateTaskListener {
/**
* A callback called when execute fails.
*/
void onFailure(String source, Throwable t);
/**
* called when the task was rejected because the local node is no longer master
*/
default void onNoLongerMaster(String source) {
onFailure(source, new EsRejectedExecutionException("no longer master. source: [" + source + "]"));
}
/**
* Called when the result of the {@link ClusterStateTaskExecutor#execute(ClusterState, List)} have been processed
* properly by all listeners.
*/
default void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
;
}

View File

@ -20,13 +20,31 @@
package org.elasticsearch.cluster;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import java.util.List;
/**
* A task that can update the cluster state.
*/
abstract public class ClusterStateUpdateTask {
abstract public class ClusterStateUpdateTask implements ClusterStateTaskConfig, ClusterStateTaskExecutor<Void>, ClusterStateTaskListener {
final private Priority priority;
public ClusterStateUpdateTask() {
this(Priority.NORMAL);
}
public ClusterStateUpdateTask(Priority priority) {
this.priority = priority;
}
@Override
final public Result execute(ClusterState currentState, List<Void> tasks) throws Exception {
ClusterState result = execute(currentState);
return new Result(result, tasks.size());
}
/**
* Update the cluster state based on the current state. Return the *same instance* if no state
@ -39,28 +57,6 @@ abstract public class ClusterStateUpdateTask {
*/
abstract public void onFailure(String source, Throwable t);
/**
* indicates whether this task should only run if current node is master
*/
public boolean runOnlyOnMaster() {
return true;
}
/**
* called when the task was rejected because the local node is no longer master
*/
public void onNoLongerMaster(String source) {
onFailure(source, new NotMasterException("no longer master. source: [" + source + "]"));
}
/**
* Called when the result of the {@link #execute(ClusterState)} have been processed
* properly by all listeners.
*/
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
/**
* If the cluster state update task wasn't processed by the provided timeout, call
* {@link #onFailure(String, Throwable)}. May return null to indicate no timeout is needed (default).
@ -70,5 +66,8 @@ abstract public class ClusterStateUpdateTask {
return null;
}
@Override
public Priority priority() {
return priority;
}
}

View File

@ -144,7 +144,8 @@ public class ShardStateAction extends AbstractComponent {
private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) {
logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
failedShardQueue.add(shardRoutingEntry);
clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]", Priority.HIGH, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]",
new ClusterStateUpdateTask(Priority.HIGH) {
@Override
public ClusterState execute(ClusterState currentState) {
@ -198,8 +199,13 @@ public class ShardStateAction extends AbstractComponent {
// process started events as fast as possible, to make shards available
startedShardsQueue.add(shardRoutingEntry);
clusterService.submitStateUpdateTask("shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]", Priority.URGENT,
clusterService.submitStateUpdateTask("shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]",
new ClusterStateUpdateTask() {
@Override
public Priority priority() {
return Priority.URGENT;
}
@Override
public ClusterState execute(ClusterState currentState) {

View File

@ -170,12 +170,12 @@ public class MetaDataCreateIndexService extends AbstractComponent {
updatedSettingsBuilder.put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX);
request.settings(updatedSettingsBuilder.build());
clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]", Priority.URGENT, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
}
clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]",
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) {
@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {

View File

@ -39,7 +39,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
import java.util.Collection;
import java.util.Locale;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -71,7 +70,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
Collection<String> indices = Arrays.asList(request.indices);
final DeleteIndexListener listener = new DeleteIndexListener(userListener);
clusterService.submitStateUpdateTask("delete-index " + indices, Priority.URGENT, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("delete-index " + indices, new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public TimeValue timeout() {

View File

@ -62,7 +62,7 @@ public class MetaDataIndexAliasesService extends AbstractComponent {
}
public void indicesAliases(final IndicesAliasesClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
clusterService.submitStateUpdateTask("index-aliases", Priority.URGENT, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
clusterService.submitStateUpdateTask("index-aliases", new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) {
@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);

View File

@ -76,7 +76,7 @@ public class MetaDataIndexStateService extends AbstractComponent {
}
final String indicesAsString = Arrays.toString(request.indices());
clusterService.submitStateUpdateTask("close-indices " + indicesAsString, Priority.URGENT, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
clusterService.submitStateUpdateTask("close-indices " + indicesAsString, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) {
@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
@ -140,7 +140,7 @@ public class MetaDataIndexStateService extends AbstractComponent {
}
final String indicesAsString = Arrays.toString(request.indices());
clusterService.submitStateUpdateTask("open-indices " + indicesAsString, Priority.URGENT, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
clusterService.submitStateUpdateTask("open-indices " + indicesAsString, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) {
@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);

View File

@ -56,7 +56,7 @@ public class MetaDataIndexTemplateService extends AbstractComponent {
}
public void removeTemplates(final RemoveRequest request, final RemoveListener listener) {
clusterService.submitStateUpdateTask("remove-index-template [" + request.name + "]", Priority.URGENT, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("remove-index-template [" + request.name + "]", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public TimeValue timeout() {
@ -143,7 +143,8 @@ public class MetaDataIndexTemplateService extends AbstractComponent {
}
final IndexTemplateMetaData template = templateBuilder.build();
clusterService.submitStateUpdateTask("create-index-template [" + request.name + "], cause [" + request.cause + "]", Priority.URGENT, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("create-index-template [" + request.name + "], cause [" + request.cause + "]",
new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public TimeValue timeout() {

View File

@ -22,17 +22,17 @@ package org.elasticsearch.cluster.metadata;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingClusterStateUpdateRequest;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.NodeServicesProvider;
@ -44,6 +44,7 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidTypeNameException;
import org.elasticsearch.percolator.PercolatorService;
import java.io.IOException;
import java.util.*;
/**
* Service responsible for submitting mapping changes
@ -53,13 +54,11 @@ public class MetaDataMappingService extends AbstractComponent {
private final ClusterService clusterService;
private final IndicesService indicesService;
// the mutex protect all the refreshOrUpdate variables!
private final Object refreshOrUpdateMutex = new Object();
private final List<MappingTask> refreshOrUpdateQueue = new ArrayList<>();
private long refreshOrUpdateInsertOrder;
private long refreshOrUpdateProcessedInsertOrder;
final ClusterStateTaskExecutor<RefreshTask> refreshExectuor = new RefreshTaskExecutor();
final ClusterStateTaskExecutor<PutMappingClusterStateUpdateRequest> putMappingExecutor = new PutMappingExecutor();
private final NodeServicesProvider nodeServicesProvider;
@Inject
public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService, NodeServicesProvider nodeServicesProvider) {
super(settings);
@ -68,37 +67,23 @@ public class MetaDataMappingService extends AbstractComponent {
this.nodeServicesProvider = nodeServicesProvider;
}
static class MappingTask {
static class RefreshTask {
final String index;
final String indexUUID;
MappingTask(String index, final String indexUUID) {
this.index = index;
this.indexUUID = indexUUID;
}
}
static class RefreshTask extends MappingTask {
final String[] types;
RefreshTask(String index, final String indexUUID, String[] types) {
super(index, indexUUID);
this.index = index;
this.indexUUID = indexUUID;
this.types = types;
}
}
static class UpdateTask extends MappingTask {
final String type;
final CompressedXContent mappingSource;
final String nodeId; // null fr unknown
final ActionListener<ClusterStateUpdateResponse> listener;
UpdateTask(String index, String indexUUID, String type, CompressedXContent mappingSource, String nodeId, ActionListener<ClusterStateUpdateResponse> listener) {
super(index, indexUUID);
this.type = type;
this.mappingSource = mappingSource;
this.nodeId = nodeId;
this.listener = listener;
class RefreshTaskExecutor implements ClusterStateTaskExecutor<RefreshTask> {
@Override
public Result execute(ClusterState currentState, List<RefreshTask> tasks) throws Exception {
ClusterState newClusterState = executeRefresh(currentState, tasks);
return new Result(newClusterState, tasks.size());
}
}
@ -107,39 +92,19 @@ public class MetaDataMappingService extends AbstractComponent {
* as possible so we won't create the same index all the time for example for the updates on the same mapping
* and generate a single cluster change event out of all of those.
*/
Tuple<ClusterState, List<MappingTask>> executeRefreshOrUpdate(final ClusterState currentState, final long insertionOrder) throws Exception {
final List<MappingTask> allTasks = new ArrayList<>();
synchronized (refreshOrUpdateMutex) {
if (refreshOrUpdateQueue.isEmpty()) {
return Tuple.tuple(currentState, allTasks);
}
// we already processed this task in a bulk manner in a previous cluster event, simply ignore
// it so we will let other tasks get in and processed ones, we will handle the queued ones
// later on in a subsequent cluster state event
if (insertionOrder < refreshOrUpdateProcessedInsertOrder) {
return Tuple.tuple(currentState, allTasks);
}
allTasks.addAll(refreshOrUpdateQueue);
refreshOrUpdateQueue.clear();
refreshOrUpdateProcessedInsertOrder = refreshOrUpdateInsertOrder;
}
ClusterState executeRefresh(final ClusterState currentState, final List<RefreshTask> allTasks) throws Exception {
if (allTasks.isEmpty()) {
return Tuple.tuple(currentState, allTasks);
return currentState;
}
// break down to tasks per index, so we can optimize the on demand index service creation
// to only happen for the duration of a single index processing of its respective events
Map<String, List<MappingTask>> tasksPerIndex = new HashMap<>();
for (MappingTask task : allTasks) {
Map<String, List<RefreshTask>> tasksPerIndex = new HashMap<>();
for (RefreshTask task : allTasks) {
if (task.index == null) {
logger.debug("ignoring a mapping task of type [{}] with a null index.", task);
}
List<MappingTask> indexTasks = tasksPerIndex.get(task.index);
List<RefreshTask> indexTasks = tasksPerIndex.get(task.index);
if (indexTasks == null) {
indexTasks = new ArrayList<>();
tasksPerIndex.put(task.index, indexTasks);
@ -150,7 +115,7 @@ public class MetaDataMappingService extends AbstractComponent {
boolean dirty = false;
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
for (Map.Entry<String, List<MappingTask>> entry : tasksPerIndex.entrySet()) {
for (Map.Entry<String, List<RefreshTask>> entry : tasksPerIndex.entrySet()) {
String index = entry.getKey();
IndexMetaData indexMetaData = mdBuilder.get(index);
if (indexMetaData == null) {
@ -160,9 +125,9 @@ public class MetaDataMappingService extends AbstractComponent {
}
// the tasks lists to iterate over, filled with the list of mapping tasks, trying to keep
// the latest (based on order) update mapping one per node
List<MappingTask> allIndexTasks = entry.getValue();
List<MappingTask> tasks = new ArrayList<>();
for (MappingTask task : allIndexTasks) {
List<RefreshTask> allIndexTasks = entry.getValue();
List<RefreshTask> tasks = new ArrayList<>();
for (RefreshTask task : allIndexTasks) {
if (!indexMetaData.isSameUUID(task.indexUUID)) {
logger.debug("[{}] ignoring task [{}] - index meta data doesn't match task uuid", index, task);
continue;
@ -178,12 +143,8 @@ public class MetaDataMappingService extends AbstractComponent {
indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.EMPTY_LIST);
removeIndex = true;
Set<String> typesToIntroduce = new HashSet<>();
for (MappingTask task : tasks) {
if (task instanceof UpdateTask) {
typesToIntroduce.add(((UpdateTask) task).type);
} else if (task instanceof RefreshTask) {
Collections.addAll(typesToIntroduce, ((RefreshTask) task).types);
}
for (RefreshTask task : tasks) {
Collections.addAll(typesToIntroduce, task.types);
}
for (String type : typesToIntroduce) {
// only add the current relevant mapping (if exists)
@ -209,80 +170,42 @@ public class MetaDataMappingService extends AbstractComponent {
}
if (!dirty) {
return Tuple.tuple(currentState, allTasks);
return currentState;
}
return Tuple.tuple(ClusterState.builder(currentState).metaData(mdBuilder).build(), allTasks);
return ClusterState.builder(currentState).metaData(mdBuilder).build();
}
private boolean processIndexMappingTasks(List<MappingTask> tasks, IndexService indexService, IndexMetaData.Builder builder) {
private boolean processIndexMappingTasks(List<RefreshTask> tasks, IndexService indexService, IndexMetaData.Builder builder) {
boolean dirty = false;
String index = indexService.index().name();
// keep track of what we already refreshed, no need to refresh it again...
Set<String> processedRefreshes = new HashSet<>();
for (MappingTask task : tasks) {
if (task instanceof RefreshTask) {
RefreshTask refreshTask = (RefreshTask) task;
try {
List<String> updatedTypes = new ArrayList<>();
for (String type : refreshTask.types) {
if (processedRefreshes.contains(type)) {
continue;
}
DocumentMapper mapper = indexService.mapperService().documentMapper(type);
if (mapper == null) {
continue;
}
if (!mapper.mappingSource().equals(builder.mapping(type).source())) {
updatedTypes.add(type);
builder.putMapping(new MappingMetaData(mapper));
}
processedRefreshes.add(type);
}
if (updatedTypes.isEmpty()) {
for (RefreshTask refreshTask : tasks) {
try {
List<String> updatedTypes = new ArrayList<>();
for (String type : refreshTask.types) {
if (processedRefreshes.contains(type)) {
continue;
}
logger.warn("[{}] re-syncing mappings with cluster state for types [{}]", index, updatedTypes);
dirty = true;
} catch (Throwable t) {
logger.warn("[{}] failed to refresh-mapping in cluster state, types [{}]", index, refreshTask.types);
}
} else if (task instanceof UpdateTask) {
UpdateTask updateTask = (UpdateTask) task;
try {
String type = updateTask.type;
CompressedXContent mappingSource = updateTask.mappingSource;
MappingMetaData mappingMetaData = builder.mapping(type);
if (mappingMetaData != null && mappingMetaData.source().equals(mappingSource)) {
logger.debug("[{}] update_mapping [{}] ignoring mapping update task as its source is equal to ours", index, updateTask.type);
DocumentMapper mapper = indexService.mapperService().documentMapper(type);
if (mapper == null) {
continue;
}
DocumentMapper updatedMapper = indexService.mapperService().merge(type, mappingSource, false, true);
if (!mapper.mappingSource().equals(builder.mapping(type).source())) {
updatedTypes.add(type);
builder.putMapping(new MappingMetaData(mapper));
}
processedRefreshes.add(type);
// if we end up with the same mapping as the original once, ignore
if (mappingMetaData != null && mappingMetaData.source().equals(updatedMapper.mappingSource())) {
logger.debug("[{}] update_mapping [{}] ignoring mapping update task as it results in the same source as what we have", index, updateTask.type);
continue;
}
// build the updated mapping source
if (logger.isDebugEnabled()) {
logger.debug("[{}] update_mapping [{}] (dynamic) with source [{}]", index, type, updatedMapper.mappingSource());
} else if (logger.isInfoEnabled()) {
logger.info("[{}] update_mapping [{}] (dynamic)", index, type);
}
builder.putMapping(new MappingMetaData(updatedMapper));
dirty = true;
} catch (Throwable t) {
logger.warn("[{}] failed to update-mapping in cluster state, type [{}]", index, updateTask.type);
}
} else {
logger.warn("illegal state, got wrong mapping task type [{}]", task);
if (updatedTypes.isEmpty()) {
continue;
}
logger.warn("[{}] re-syncing mappings with cluster state for types [{}]", index, updatedTypes);
dirty = true;
} catch (Throwable t) {
logger.warn("[{}] failed to refresh-mapping in cluster state, types [{}]", index, refreshTask.types);
}
}
return dirty;
@ -292,197 +215,203 @@ public class MetaDataMappingService extends AbstractComponent {
* Refreshes mappings if they are not the same between original and parsed version
*/
public void refreshMapping(final String index, final String indexUUID, final String... types) {
final long insertOrder;
synchronized (refreshOrUpdateMutex) {
insertOrder = ++refreshOrUpdateInsertOrder;
refreshOrUpdateQueue.add(new RefreshTask(index, indexUUID, types));
}
clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", Priority.HIGH, new ClusterStateUpdateTask() {
private volatile List<MappingTask> allTasks;
final RefreshTask refreshTask = new RefreshTask(index, indexUUID, types);
clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]",
refreshTask,
ClusterStateTaskConfig.build(Priority.HIGH),
refreshExectuor,
(source, t) -> logger.warn("failure during [{}]", t, source)
);
}
@Override
public void onFailure(String source, Throwable t) {
logger.warn("failure during [{}]", t, source);
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Tuple<ClusterState, List<MappingTask>> tuple = executeRefreshOrUpdate(currentState, insertOrder);
this.allTasks = tuple.v2();
return tuple.v1();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (allTasks == null) {
return;
class PutMappingExecutor implements ClusterStateTaskExecutor<PutMappingClusterStateUpdateRequest> {
@Override
public Result execute(ClusterState currentState, List<PutMappingClusterStateUpdateRequest> tasks) throws Exception {
List<String> indicesToClose = new ArrayList<>();
ArrayList<Throwable> failures = new ArrayList<>(tasks.size());
try {
// precreate incoming indices;
for (PutMappingClusterStateUpdateRequest request : tasks) {
// failures here mean something is broken with our cluster state - fail all tasks by letting exceptions bubble up
for (String index : request.indices()) {
if (currentState.metaData().hasIndex(index)) {
// if we don't have the index, we will throw exceptions later;
if (indicesService.hasIndex(index) == false) {
final IndexMetaData indexMetaData = currentState.metaData().index(index);
IndexService indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.EMPTY_LIST);
indicesToClose.add(indexMetaData.getIndex());
// make sure to add custom default mapping if exists
if (indexMetaData.getMappings().containsKey(MapperService.DEFAULT_MAPPING)) {
indexService.mapperService().merge(MapperService.DEFAULT_MAPPING, indexMetaData.getMappings().get(MapperService.DEFAULT_MAPPING).source(), false, request.updateAllTypes());
}
// only add the current relevant mapping (if exists)
if (indexMetaData.getMappings().containsKey(request.type())) {
indexService.mapperService().merge(request.type(), indexMetaData.getMappings().get(request.type()).source(), false, request.updateAllTypes());
}
}
}
}
}
for (Object task : allTasks) {
if (task instanceof UpdateTask) {
UpdateTask uTask = (UpdateTask) task;
ClusterStateUpdateResponse response = new ClusterStateUpdateResponse(true);
uTask.listener.onResponse(response);
for (PutMappingClusterStateUpdateRequest request : tasks) {
try {
currentState = applyRequest(currentState, request);
failures.add(null);
} catch (Throwable t) {
failures.add(t);
}
}
return new Result(currentState, failures);
} finally {
for (String index : indicesToClose) {
indicesService.removeIndex(index, "created for mapping processing");
}
}
}
private ClusterState applyRequest(ClusterState currentState, PutMappingClusterStateUpdateRequest request) throws IOException {
Map<String, DocumentMapper> newMappers = new HashMap<>();
Map<String, DocumentMapper> existingMappers = new HashMap<>();
for (String index : request.indices()) {
IndexService indexService = indicesService.indexServiceSafe(index);
// try and parse it (no need to add it here) so we can bail early in case of parsing exception
DocumentMapper newMapper;
DocumentMapper existingMapper = indexService.mapperService().documentMapper(request.type());
if (MapperService.DEFAULT_MAPPING.equals(request.type())) {
// _default_ types do not go through merging, but we do test the new settings. Also don't apply the old default
newMapper = indexService.mapperService().parse(request.type(), new CompressedXContent(request.source()), false);
} else {
newMapper = indexService.mapperService().parse(request.type(), new CompressedXContent(request.source()), existingMapper == null);
if (existingMapper != null) {
// first, simulate
MergeResult mergeResult = existingMapper.merge(newMapper.mapping(), true, request.updateAllTypes());
// if we have conflicts, throw an exception
if (mergeResult.hasConflicts()) {
throw new MergeMappingException(mergeResult.buildConflicts());
}
} else {
// TODO: can we find a better place for this validation?
// The reason this validation is here is that the mapper service doesn't learn about
// new types all at once , which can create a false error.
// For example in MapperService we can't distinguish between a create index api call
// and a put mapping api call, so we don't which type did exist before.
// Also the order of the mappings may be backwards.
if (newMapper.parentFieldMapper().active()) {
IndexMetaData indexMetaData = currentState.metaData().index(index);
for (ObjectCursor<MappingMetaData> mapping : indexMetaData.getMappings().values()) {
if (newMapper.parentFieldMapper().type().equals(mapping.value.type())) {
throw new IllegalArgumentException("can't add a _parent field that points to an already existing type");
}
}
}
}
}
newMappers.put(index, newMapper);
if (existingMapper != null) {
existingMappers.put(index, existingMapper);
}
}
String mappingType = request.type();
if (mappingType == null) {
mappingType = newMappers.values().iterator().next().type();
} else if (!mappingType.equals(newMappers.values().iterator().next().type())) {
throw new InvalidTypeNameException("Type name provided does not match type name within mapping definition");
}
if (!MapperService.DEFAULT_MAPPING.equals(mappingType) && !PercolatorService.TYPE_NAME.equals(mappingType) && mappingType.charAt(0) == '_') {
throw new InvalidTypeNameException("Document mapping type name can't start with '_'");
}
final Map<String, MappingMetaData> mappings = new HashMap<>();
for (Map.Entry<String, DocumentMapper> entry : newMappers.entrySet()) {
String index = entry.getKey();
// do the actual merge here on the master, and update the mapping source
DocumentMapper newMapper = entry.getValue();
IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
continue;
}
CompressedXContent existingSource = null;
if (existingMappers.containsKey(entry.getKey())) {
existingSource = existingMappers.get(entry.getKey()).mappingSource();
}
DocumentMapper mergedMapper = indexService.mapperService().merge(newMapper.type(), newMapper.mappingSource(), false, request.updateAllTypes());
CompressedXContent updatedSource = mergedMapper.mappingSource();
if (existingSource != null) {
if (existingSource.equals(updatedSource)) {
// same source, no changes, ignore it
} else {
// use the merged mapping source
mappings.put(index, new MappingMetaData(mergedMapper));
if (logger.isDebugEnabled()) {
logger.debug("[{}] update_mapping [{}] with source [{}]", index, mergedMapper.type(), updatedSource);
} else if (logger.isInfoEnabled()) {
logger.info("[{}] update_mapping [{}]", index, mergedMapper.type());
}
}
} else {
mappings.put(index, new MappingMetaData(mergedMapper));
if (logger.isDebugEnabled()) {
logger.debug("[{}] create_mapping [{}] with source [{}]", index, newMapper.type(), updatedSource);
} else if (logger.isInfoEnabled()) {
logger.info("[{}] create_mapping [{}]", index, newMapper.type());
}
}
}
});
if (mappings.isEmpty()) {
// no changes, return
return currentState;
}
MetaData.Builder builder = MetaData.builder(currentState.metaData());
for (String indexName : request.indices()) {
IndexMetaData indexMetaData = currentState.metaData().index(indexName);
if (indexMetaData == null) {
throw new IndexNotFoundException(indexName);
}
MappingMetaData mappingMd = mappings.get(indexName);
if (mappingMd != null) {
builder.put(IndexMetaData.builder(indexMetaData).putMapping(mappingMd));
}
}
return ClusterState.builder(currentState).metaData(builder).build();
}
}
public void putMapping(final PutMappingClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
clusterService.submitStateUpdateTask("put-mapping [" + request.type() + "]",
request,
ClusterStateTaskConfig.build(Priority.HIGH, request.masterNodeTimeout()),
putMappingExecutor,
new AckedClusterStateTaskListener() {
clusterService.submitStateUpdateTask("put-mapping [" + request.type() + "]", Priority.HIGH, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
}
@Override
public ClusterState execute(final ClusterState currentState) throws Exception {
List<String> indicesToClose = new ArrayList<>();
try {
for (String index : request.indices()) {
if (!currentState.metaData().hasIndex(index)) {
throw new IndexNotFoundException(index);
}
@Override
public void onFailure(String source, Throwable t) {
listener.onFailure(t);
}
// pre create indices here and add mappings to them so we can merge the mappings here if needed
for (String index : request.indices()) {
if (indicesService.hasIndex(index)) {
continue;
}
final IndexMetaData indexMetaData = currentState.metaData().index(index);
IndexService indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.EMPTY_LIST);
indicesToClose.add(indexMetaData.getIndex());
// make sure to add custom default mapping if exists
if (indexMetaData.getMappings().containsKey(MapperService.DEFAULT_MAPPING)) {
indexService.mapperService().merge(MapperService.DEFAULT_MAPPING, indexMetaData.getMappings().get(MapperService.DEFAULT_MAPPING).source(), false, request.updateAllTypes());
}
// only add the current relevant mapping (if exists)
if (indexMetaData.getMappings().containsKey(request.type())) {
indexService.mapperService().merge(request.type(), indexMetaData.getMappings().get(request.type()).source(), false, request.updateAllTypes());
}
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}
Map<String, DocumentMapper> newMappers = new HashMap<>();
Map<String, DocumentMapper> existingMappers = new HashMap<>();
for (String index : request.indices()) {
IndexService indexService = indicesService.indexServiceSafe(index);
// try and parse it (no need to add it here) so we can bail early in case of parsing exception
DocumentMapper newMapper;
DocumentMapper existingMapper = indexService.mapperService().documentMapper(request.type());
if (MapperService.DEFAULT_MAPPING.equals(request.type())) {
// _default_ types do not go through merging, but we do test the new settings. Also don't apply the old default
newMapper = indexService.mapperService().parse(request.type(), new CompressedXContent(request.source()), false);
} else {
newMapper = indexService.mapperService().parse(request.type(), new CompressedXContent(request.source()), existingMapper == null);
if (existingMapper != null) {
// first, simulate
MergeResult mergeResult = existingMapper.merge(newMapper.mapping(), true, request.updateAllTypes());
// if we have conflicts, throw an exception
if (mergeResult.hasConflicts()) {
throw new MergeMappingException(mergeResult.buildConflicts());
}
} else {
// TODO: can we find a better place for this validation?
// The reason this validation is here is that the mapper service doesn't learn about
// new types all at once , which can create a false error.
// For example in MapperService we can't distinguish between a create index api call
// and a put mapping api call, so we don't which type did exist before.
// Also the order of the mappings may be backwards.
if (newMapper.parentFieldMapper().active()) {
IndexMetaData indexMetaData = currentState.metaData().index(index);
for (ObjectCursor<MappingMetaData> mapping : indexMetaData.getMappings().values()) {
if (newMapper.parentFieldMapper().type().equals(mapping.value.type())) {
throw new IllegalArgumentException("can't add a _parent field that points to an already existing type");
}
}
}
}
}
newMappers.put(index, newMapper);
if (existingMapper != null) {
existingMappers.put(index, existingMapper);
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
listener.onResponse(new ClusterStateUpdateResponse(true));
}
String mappingType = request.type();
if (mappingType == null) {
mappingType = newMappers.values().iterator().next().type();
} else if (!mappingType.equals(newMappers.values().iterator().next().type())) {
throw new InvalidTypeNameException("Type name provided does not match type name within mapping definition");
}
if (!MapperService.DEFAULT_MAPPING.equals(mappingType) && !PercolatorService.TYPE_NAME.equals(mappingType) && mappingType.charAt(0) == '_') {
throw new InvalidTypeNameException("Document mapping type name can't start with '_'");
@Override
public void onAckTimeout() {
listener.onResponse(new ClusterStateUpdateResponse(false));
}
final Map<String, MappingMetaData> mappings = new HashMap<>();
for (Map.Entry<String, DocumentMapper> entry : newMappers.entrySet()) {
String index = entry.getKey();
// do the actual merge here on the master, and update the mapping source
DocumentMapper newMapper = entry.getValue();
IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
continue;
}
CompressedXContent existingSource = null;
if (existingMappers.containsKey(entry.getKey())) {
existingSource = existingMappers.get(entry.getKey()).mappingSource();
}
DocumentMapper mergedMapper = indexService.mapperService().merge(newMapper.type(), newMapper.mappingSource(), false, request.updateAllTypes());
CompressedXContent updatedSource = mergedMapper.mappingSource();
if (existingSource != null) {
if (existingSource.equals(updatedSource)) {
// same source, no changes, ignore it
} else {
// use the merged mapping source
mappings.put(index, new MappingMetaData(mergedMapper));
if (logger.isDebugEnabled()) {
logger.debug("[{}] update_mapping [{}] with source [{}]", index, mergedMapper.type(), updatedSource);
} else if (logger.isInfoEnabled()) {
logger.info("[{}] update_mapping [{}]", index, mergedMapper.type());
}
}
} else {
mappings.put(index, new MappingMetaData(mergedMapper));
if (logger.isDebugEnabled()) {
logger.debug("[{}] create_mapping [{}] with source [{}]", index, newMapper.type(), updatedSource);
} else if (logger.isInfoEnabled()) {
logger.info("[{}] create_mapping [{}]", index, newMapper.type());
}
}
@Override
public TimeValue ackTimeout() {
return request.ackTimeout();
}
if (mappings.isEmpty()) {
// no changes, return
return currentState;
}
MetaData.Builder builder = MetaData.builder(currentState.metaData());
for (String indexName : request.indices()) {
IndexMetaData indexMetaData = currentState.metaData().index(indexName);
if (indexMetaData == null) {
throw new IndexNotFoundException(indexName);
}
MappingMetaData mappingMd = mappings.get(indexName);
if (mappingMd != null) {
builder.put(IndexMetaData.builder(indexMetaData).putMapping(mappingMd));
}
}
return ClusterState.builder(currentState).metaData(builder).build();
} finally {
for (String index : indicesToClose) {
indicesService.removeIndex(index, "created for mapping processing");
}
}
}
});
});
}
}

View File

@ -24,11 +24,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeSettingsClusterStateUpdateRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.routing.RoutingTable;
@ -44,13 +40,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.settings.IndexDynamicSettings;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.*;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
@ -219,7 +209,8 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
}
final Settings openSettings = updatedSettingsBuilder.build();
clusterService.submitStateUpdateTask("update-settings", Priority.URGENT, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
clusterService.submitStateUpdateTask("update-settings",
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) {
@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
@ -334,7 +325,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
public void upgradeIndexSettings(final UpgradeSettingsClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
clusterService.submitStateUpdateTask("update-index-compatibility-versions", Priority.URGENT, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
clusterService.submitStateUpdateTask("update-index-compatibility-versions", new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) {
@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {

View File

@ -147,7 +147,7 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
return;
}
logger.trace("rerouting {}", reason);
clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")", Priority.HIGH, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")", new ClusterStateUpdateTask(Priority.HIGH) {
@Override
public ClusterState execute(ClusterState currentState) {
rerouting.set(false);

View File

@ -20,16 +20,8 @@
package org.elasticsearch.cluster.service;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.ClusterState.Builder;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.TimeoutClusterStateListener;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.MetaData;
@ -41,6 +33,7 @@ import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
@ -49,13 +42,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.StringText;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.PrioritizedRunnable;
import org.elasticsearch.common.util.concurrent.*;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryService;
@ -63,18 +50,10 @@ import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
@ -111,6 +90,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
private final Collection<ClusterStateListener> priorityClusterStateListeners = new CopyOnWriteArrayList<>();
private final Collection<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<>();
private final Collection<ClusterStateListener> lastClusterStateListeners = new CopyOnWriteArrayList<>();
private final Map<ClusterStateTaskExecutor, List<UpdateTask>> updateTasksPerExecutor = new HashMap<>();
// TODO this is rather frequently changing I guess a Synced Set would be better here and a dedicated remove API
private final Collection<ClusterStateListener> postAppliedListeners = new CopyOnWriteArrayList<>();
private final Iterable<ClusterStateListener> preAppliedListeners = Iterables.concat(priorityClusterStateListeners, clusterStateListeners, lastClusterStateListeners);
@ -289,30 +269,47 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
@Override
public void submitStateUpdateTask(final String source, final ClusterStateUpdateTask updateTask) {
submitStateUpdateTask(source, Priority.NORMAL, updateTask);
submitStateUpdateTask(source, null, updateTask, updateTask, updateTask);
}
@Override
public void submitStateUpdateTask(final String source, Priority priority, final ClusterStateUpdateTask updateTask) {
public <T> void submitStateUpdateTask(final String source, final T task,
final ClusterStateTaskConfig config,
final ClusterStateTaskExecutor<T> executor,
final ClusterStateTaskListener listener
) {
if (!lifecycle.started()) {
return;
}
try {
final UpdateTask task = new UpdateTask(source, priority, updateTask);
if (updateTask.timeout() != null) {
updateTasksExecutor.execute(task, threadPool.scheduler(), updateTask.timeout(), new Runnable() {
final UpdateTask<T> updateTask = new UpdateTask<>(source, task, config, executor, listener);
synchronized (updateTasksPerExecutor) {
List<UpdateTask> pendingTasks = updateTasksPerExecutor.get(executor);
if (pendingTasks == null) {
pendingTasks = new ArrayList<>();
updateTasksPerExecutor.put(executor, pendingTasks);
}
pendingTasks.add(updateTask);
}
if (config.timeout() != null) {
updateTasksExecutor.execute(updateTask, threadPool.scheduler(), config.timeout(), new Runnable() {
@Override
public void run() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
updateTask.onFailure(task.source(), new ProcessClusterEventTimeoutException(updateTask.timeout(), task.source()));
if (updateTask.processed.getAndSet(true) == false) {
listener.onFailure(source, new ProcessClusterEventTimeoutException(config.timeout(), source));
}
}
});
}
});
} else {
updateTasksExecutor.execute(task);
updateTasksExecutor.execute(updateTask);
}
} catch (EsRejectedExecutionException e) {
// ignore cases where we are shutting down..., there is really nothing interesting
@ -379,188 +376,241 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
}
class UpdateTask extends SourcePrioritizedRunnable {
<T> void runTasksForExecutor(ClusterStateTaskExecutor<T> executor) {
final ArrayList<UpdateTask<T>> toExecute = new ArrayList<>();
final ArrayList<String> sources = new ArrayList<>();
synchronized (updateTasksPerExecutor) {
List<UpdateTask> pending = updateTasksPerExecutor.remove(executor);
if (pending != null) {
for (Iterator<UpdateTask> iter = pending.iterator(); iter.hasNext(); ) {
UpdateTask task = iter.next();
if (task.processed.getAndSet(true) == false) {
logger.trace("will process [{}]", task.source);
toExecute.add((UpdateTask<T>) task);
sources.add(task.source);
} else {
logger.trace("skipping [{}], already processed", task.source);
}
}
}
}
if (toExecute.isEmpty()) {
return;
}
final String source = Strings.collectionToCommaDelimitedString(sources);
if (!lifecycle.started()) {
logger.debug("processing [{}]: ignoring, cluster_service not started", source);
return;
}
logger.debug("processing [{}]: execute", source);
ClusterState previousClusterState = clusterState;
if (!previousClusterState.nodes().localNodeMaster() && executor.runOnlyOnMaster()) {
logger.debug("failing [{}]: local node is no longer master", source);
toExecute.stream().forEach(task -> task.listener.onNoLongerMaster(task.source));
return;
}
ClusterStateTaskExecutor.Result result;
long startTimeNS = System.nanoTime();
try {
List<T> inputs = toExecute.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList());
result = executor.execute(previousClusterState, inputs);
} catch (Throwable e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("failed to execute cluster state update in ").append(executionTime).append(", state:\nversion [").append(previousClusterState.version()).append("], source [").append(source).append("]\n");
sb.append(previousClusterState.nodes().prettyPrint());
sb.append(previousClusterState.routingTable().prettyPrint());
sb.append(previousClusterState.getRoutingNodes().prettyPrint());
logger.trace(sb.toString(), e);
}
warnAboutSlowTaskIfNeeded(executionTime, source);
result = new ClusterStateTaskExecutor.Result(previousClusterState, Collections.nCopies(toExecute.size(), e));
}
assert result.failures.size() == toExecute.size();
public final ClusterStateUpdateTask updateTask;
UpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) {
super(priority, source);
this.updateTask = updateTask;
ClusterState newClusterState = result.resultingState;
final ArrayList<UpdateTask<T>> proccessedListeners = new ArrayList<>();
// fail all tasks that have failed and extract those that are waiting for results
for (int i = 0; i < toExecute.size(); i++) {
final UpdateTask<T> task = toExecute.get(i);
final Throwable failure = result.failures.get(i);
if (failure == null) {
proccessedListeners.add(task);
} else {
task.listener.onFailure(task.source, failure);
}
}
@Override
public void run() {
if (!lifecycle.started()) {
logger.debug("processing [{}]: ignoring, cluster_service not started", source);
return;
}
logger.debug("processing [{}]: execute", source);
ClusterState previousClusterState = clusterState;
if (!previousClusterState.nodes().localNodeMaster() && updateTask.runOnlyOnMaster()) {
logger.debug("failing [{}]: local node is no longer master", source);
updateTask.onNoLongerMaster(source);
return;
}
ClusterState newClusterState;
long startTimeNS = System.nanoTime();
try {
newClusterState = updateTask.execute(previousClusterState);
} catch (Throwable e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("failed to execute cluster state update in ").append(executionTime).append(", state:\nversion [").append(previousClusterState.version()).append("], source [").append(source).append("]\n");
sb.append(previousClusterState.nodes().prettyPrint());
sb.append(previousClusterState.routingTable().prettyPrint());
sb.append(previousClusterState.getRoutingNodes().prettyPrint());
logger.trace(sb.toString(), e);
}
warnAboutSlowTaskIfNeeded(executionTime, source);
updateTask.onFailure(source, e);
return;
}
if (previousClusterState == newClusterState) {
if (updateTask instanceof AckedClusterStateUpdateTask) {
if (previousClusterState == newClusterState) {
for (UpdateTask<T> task : proccessedListeners) {
if (task.listener instanceof AckedClusterStateTaskListener) {
//no need to wait for ack if nothing changed, the update can be counted as acknowledged
((AckedClusterStateUpdateTask) updateTask).onAllNodesAcked(null);
((AckedClusterStateTaskListener) task.listener).onAllNodesAcked(null);
}
updateTask.clusterStateProcessed(source, previousClusterState, newClusterState);
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
logger.debug("processing [{}]: took {} no change in cluster_state", source, executionTime);
warnAboutSlowTaskIfNeeded(executionTime, source);
return;
task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState);
}
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
logger.debug("processing [{}]: took {} no change in cluster_state", source, executionTime);
warnAboutSlowTaskIfNeeded(executionTime, source);
return;
}
try {
Discovery.AckListener ackListener = new NoOpAckListener();
if (newClusterState.nodes().localNodeMaster()) {
// only the master controls the version numbers
Builder builder = ClusterState.builder(newClusterState).incrementVersion();
if (previousClusterState.routingTable() != newClusterState.routingTable()) {
builder.routingTable(RoutingTable.builder(newClusterState.routingTable()).version(newClusterState.routingTable().version() + 1).build());
}
if (previousClusterState.metaData() != newClusterState.metaData()) {
builder.metaData(MetaData.builder(newClusterState.metaData()).version(newClusterState.metaData().version() + 1));
}
newClusterState = builder.build();
if (updateTask instanceof AckedClusterStateUpdateTask) {
final AckedClusterStateUpdateTask ackedUpdateTask = (AckedClusterStateUpdateTask) updateTask;
if (ackedUpdateTask.ackTimeout() == null || ackedUpdateTask.ackTimeout().millis() == 0) {
ackedUpdateTask.onAckTimeout();
try {
ArrayList<Discovery.AckListener> ackListeners = new ArrayList<>();
if (newClusterState.nodes().localNodeMaster()) {
// only the master controls the version numbers
Builder builder = ClusterState.builder(newClusterState).incrementVersion();
if (previousClusterState.routingTable() != newClusterState.routingTable()) {
builder.routingTable(RoutingTable.builder(newClusterState.routingTable()).version(newClusterState.routingTable().version() + 1).build());
}
if (previousClusterState.metaData() != newClusterState.metaData()) {
builder.metaData(MetaData.builder(newClusterState.metaData()).version(newClusterState.metaData().version() + 1));
}
newClusterState = builder.build();
for (UpdateTask<T> task : proccessedListeners) {
if (task.listener instanceof AckedClusterStateTaskListener) {
final AckedClusterStateTaskListener ackedListener = (AckedClusterStateTaskListener) task.listener;
if (ackedListener.ackTimeout() == null || ackedListener.ackTimeout().millis() == 0) {
ackedListener.onAckTimeout();
} else {
try {
ackListener = new AckCountDownListener(ackedUpdateTask, newClusterState.version(), newClusterState.nodes(), threadPool);
ackListeners.add(new AckCountDownListener(ackedListener, newClusterState.version(), newClusterState.nodes(), threadPool));
} catch (EsRejectedExecutionException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Couldn't schedule timeout thread - node might be shutting down", ex);
}
//timeout straightaway, otherwise we could wait forever as the timeout thread has not started
ackedUpdateTask.onAckTimeout();
ackedListener.onAckTimeout();
}
}
}
}
newClusterState.status(ClusterState.ClusterStateStatus.BEING_APPLIED);
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("cluster state updated, source [").append(source).append("]\n");
sb.append(newClusterState.prettyPrint());
logger.trace(sb.toString());
} else if (logger.isDebugEnabled()) {
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), source);
}
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(source, newClusterState, previousClusterState);
// new cluster state, notify all listeners
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
String summary = nodesDelta.shortSummary();
if (summary.length() > 0) {
logger.info("{}, reason: {}", summary, source);
}
}
// TODO, do this in parallel (and wait)
for (DiscoveryNode node : nodesDelta.addedNodes()) {
if (!nodeRequiresConnection(node)) {
continue;
}
try {
transportService.connectToNode(node);
} catch (Throwable e) {
// the fault detection will detect it as failed as well
logger.warn("failed to connect to node [" + node + "]", e);
}
}
// if we are the master, publish the new state to all nodes
// we publish here before we send a notification to all the listeners, since if it fails
// we don't want to notify
if (newClusterState.nodes().localNodeMaster()) {
logger.debug("publishing cluster state version [{}]", newClusterState.version());
try {
discoveryService.publish(clusterChangedEvent, ackListener);
} catch (Discovery.FailedToCommitClusterStateException t) {
logger.warn("failing [{}]: failed to commit cluster state version [{}]", t, source, newClusterState.version());
updateTask.onFailure(source, t);
return;
}
}
// update the current cluster state
clusterState = newClusterState;
logger.debug("set local cluster state to version {}", newClusterState.version());
for (ClusterStateListener listener : preAppliedListeners) {
try {
listener.clusterChanged(clusterChangedEvent);
} catch (Exception ex) {
logger.warn("failed to notify ClusterStateListener", ex);
}
}
for (DiscoveryNode node : nodesDelta.removedNodes()) {
try {
transportService.disconnectFromNode(node);
} catch (Throwable e) {
logger.warn("failed to disconnect to node [" + node + "]", e);
}
}
newClusterState.status(ClusterState.ClusterStateStatus.APPLIED);
for (ClusterStateListener listener : postAppliedListeners) {
try {
listener.clusterChanged(clusterChangedEvent);
} catch (Exception ex) {
logger.warn("failed to notify ClusterStateListener", ex);
}
}
//manual ack only from the master at the end of the publish
if (newClusterState.nodes().localNodeMaster()) {
try {
ackListener.onNodeAck(newClusterState.nodes().localNode(), null);
} catch (Throwable t) {
logger.debug("error while processing ack for master node [{}]", t, newClusterState.nodes().localNode());
}
}
updateTask.clusterStateProcessed(source, previousClusterState, newClusterState);
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
logger.debug("processing [{}]: took {} done applying updated cluster_state (version: {}, uuid: {})", source, executionTime, newClusterState.version(), newClusterState.stateUUID());
warnAboutSlowTaskIfNeeded(executionTime, source);
} catch (Throwable t) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
StringBuilder sb = new StringBuilder("failed to apply updated cluster state in ").append(executionTime).append(":\nversion [").append(newClusterState.version()).append("], uuid [").append(newClusterState.stateUUID()).append("], source [").append(source).append("]\n");
sb.append(newClusterState.nodes().prettyPrint());
sb.append(newClusterState.routingTable().prettyPrint());
sb.append(newClusterState.getRoutingNodes().prettyPrint());
logger.warn(sb.toString(), t);
// TODO: do we want to call updateTask.onFailure here?
}
final Discovery.AckListener ackListener = new DelegetingAckListener(ackListeners);
newClusterState.status(ClusterState.ClusterStateStatus.BEING_APPLIED);
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("cluster state updated, source [").append(source).append("]\n");
sb.append(newClusterState.prettyPrint());
logger.trace(sb.toString());
} else if (logger.isDebugEnabled()) {
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), source);
}
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(source, newClusterState, previousClusterState);
// new cluster state, notify all listeners
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
String summary = nodesDelta.shortSummary();
if (summary.length() > 0) {
logger.info("{}, reason: {}", summary, source);
}
}
// TODO, do this in parallel (and wait)
for (DiscoveryNode node : nodesDelta.addedNodes()) {
if (!nodeRequiresConnection(node)) {
continue;
}
try {
transportService.connectToNode(node);
} catch (Throwable e) {
// the fault detection will detect it as failed as well
logger.warn("failed to connect to node [" + node + "]", e);
}
}
// if we are the master, publish the new state to all nodes
// we publish here before we send a notification to all the listeners, since if it fails
// we don't want to notify
if (newClusterState.nodes().localNodeMaster()) {
logger.debug("publishing cluster state version [{}]", newClusterState.version());
try {
discoveryService.publish(clusterChangedEvent, ackListener);
} catch (Discovery.FailedToCommitClusterStateException t) {
logger.warn("failing [{}]: failed to commit cluster state version [{}]", t, source, newClusterState.version());
proccessedListeners.forEach(task -> task.listener.onFailure(task.source, t));
return;
}
}
// update the current cluster state
clusterState = newClusterState;
logger.debug("set local cluster state to version {}", newClusterState.version());
for (ClusterStateListener listener : preAppliedListeners) {
try {
listener.clusterChanged(clusterChangedEvent);
} catch (Exception ex) {
logger.warn("failed to notify ClusterStateListener", ex);
}
}
for (DiscoveryNode node : nodesDelta.removedNodes()) {
try {
transportService.disconnectFromNode(node);
} catch (Throwable e) {
logger.warn("failed to disconnect to node [" + node + "]", e);
}
}
newClusterState.status(ClusterState.ClusterStateStatus.APPLIED);
for (ClusterStateListener listener : postAppliedListeners) {
try {
listener.clusterChanged(clusterChangedEvent);
} catch (Exception ex) {
logger.warn("failed to notify ClusterStateListener", ex);
}
}
//manual ack only from the master at the end of the publish
if (newClusterState.nodes().localNodeMaster()) {
try {
ackListener.onNodeAck(newClusterState.nodes().localNode(), null);
} catch (Throwable t) {
logger.debug("error while processing ack for master node [{}]", t, newClusterState.nodes().localNode());
}
}
for (UpdateTask<T> task : proccessedListeners) {
task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState);
}
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
logger.debug("processing [{}]: took {} done applying updated cluster_state (version: {}, uuid: {})", source, executionTime, newClusterState.version(), newClusterState.stateUUID());
warnAboutSlowTaskIfNeeded(executionTime, source);
} catch (Throwable t) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
StringBuilder sb = new StringBuilder("failed to apply updated cluster state in ").append(executionTime).append(":\nversion [").append(newClusterState.version()).append("], uuid [").append(newClusterState.stateUUID()).append("], source [").append(source).append("]\n");
sb.append(newClusterState.nodes().prettyPrint());
sb.append(newClusterState.routingTable().prettyPrint());
sb.append(newClusterState.getRoutingNodes().prettyPrint());
logger.warn(sb.toString(), t);
// TODO: do we want to call updateTask.onFailure here?
}
}
class UpdateTask<T> extends SourcePrioritizedRunnable {
public final T task;
public final ClusterStateTaskConfig config;
public final ClusterStateTaskExecutor<T> executor;
public final ClusterStateTaskListener listener;
public final AtomicBoolean processed = new AtomicBoolean();
UpdateTask(String source, T task, ClusterStateTaskConfig config, ClusterStateTaskExecutor<T> executor, ClusterStateTaskListener listener) {
super(config.priority(), source);
this.task = task;
this.config = config;
this.executor = executor;
this.listener = listener;
}
@Override
public void run() {
runTasksForExecutor(executor);
}
}
@ -729,13 +779,24 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
}
private static class NoOpAckListener implements Discovery.AckListener {
private static class DelegetingAckListener implements Discovery.AckListener {
final private List<Discovery.AckListener> listeners;
private DelegetingAckListener(List<Discovery.AckListener> listeners) {
this.listeners = listeners;
}
@Override
public void onNodeAck(DiscoveryNode node, @Nullable Throwable t) {
for (Discovery.AckListener listener : listeners) {
listener.onNodeAck(node, t);
}
}
@Override
public void onTimeout() {
throw new UnsupportedOperationException("no timeout delegation");
}
}
@ -743,20 +804,20 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
private static final ESLogger logger = Loggers.getLogger(AckCountDownListener.class);
private final AckedClusterStateUpdateTask ackedUpdateTask;
private final AckedClusterStateTaskListener ackedTaskListener;
private final CountDown countDown;
private final DiscoveryNodes nodes;
private final long clusterStateVersion;
private final Future<?> ackTimeoutCallback;
private Throwable lastFailure;
AckCountDownListener(AckedClusterStateUpdateTask ackedUpdateTask, long clusterStateVersion, DiscoveryNodes nodes, ThreadPool threadPool) {
this.ackedUpdateTask = ackedUpdateTask;
AckCountDownListener(AckedClusterStateTaskListener ackedTaskListener, long clusterStateVersion, DiscoveryNodes nodes, ThreadPool threadPool) {
this.ackedTaskListener = ackedTaskListener;
this.clusterStateVersion = clusterStateVersion;
this.nodes = nodes;
int countDown = 0;
for (DiscoveryNode node : nodes) {
if (ackedUpdateTask.mustAck(node)) {
if (ackedTaskListener.mustAck(node)) {
countDown++;
}
}
@ -764,7 +825,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
countDown = Math.max(1, countDown);
logger.trace("expecting {} acknowledgements for cluster_state update (version: {})", countDown, clusterStateVersion);
this.countDown = new CountDown(countDown);
this.ackTimeoutCallback = threadPool.schedule(ackedUpdateTask.ackTimeout(), ThreadPool.Names.GENERIC, new Runnable() {
this.ackTimeoutCallback = threadPool.schedule(ackedTaskListener.ackTimeout(), ThreadPool.Names.GENERIC, new Runnable() {
@Override
public void run() {
onTimeout();
@ -774,7 +835,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
@Override
public void onNodeAck(DiscoveryNode node, @Nullable Throwable t) {
if (!ackedUpdateTask.mustAck(node)) {
if (!ackedTaskListener.mustAck(node)) {
//we always wait for the master ack anyway
if (!node.equals(nodes.masterNode())) {
return;
@ -790,7 +851,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
if (countDown.countDown()) {
logger.trace("all expected nodes acknowledged cluster_state update (version: {})", clusterStateVersion);
FutureUtils.cancel(ackTimeoutCallback);
ackedUpdateTask.onAllNodesAcked(lastFailure);
ackedTaskListener.onAllNodesAcked(lastFailure);
}
}
@ -798,7 +859,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
public void onTimeout() {
if (countDown.fastForward()) {
logger.trace("timeout waiting for acknowledgement for cluster_state update (version: {})", clusterStateVersion);
ackedUpdateTask.onAckTimeout();
ackedTaskListener.onAckTimeout();
}
}
}
@ -810,5 +871,4 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
InternalClusterService.this.slowTaskLoggingThreshold = slowTaskLoggingThreshold;
}
}
}

View File

@ -133,7 +133,7 @@ public class NodeJoinController extends AbstractComponent {
/** utility method to fail the given election context under the cluster state thread */
private void failContext(final ElectionContext context, final String reason, final Throwable throwable) {
clusterService.submitStateUpdateTask("zen-disco-join(failure [" + reason + "])", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-join(failure [" + reason + "])", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
@Override
public boolean runOnlyOnMaster() {
@ -231,7 +231,7 @@ public class NodeJoinController extends AbstractComponent {
}
final String source = "zen-disco-join(elected_as_master, [" + pendingMasterJoins + "] joins received)";
clusterService.submitStateUpdateTask(source, Priority.IMMEDIATE, new ProcessJoinsTask() {
clusterService.submitStateUpdateTask(source, new ProcessJoinsTask(Priority.IMMEDIATE) {
@Override
public ClusterState execute(ClusterState currentState) {
// Take into account the previous known nodes, if they happen not to be available
@ -280,7 +280,7 @@ public class NodeJoinController extends AbstractComponent {
/** process all pending joins */
private void processJoins(String reason) {
clusterService.submitStateUpdateTask("zen-disco-join(" + reason + ")", Priority.URGENT, new ProcessJoinsTask());
clusterService.submitStateUpdateTask("zen-disco-join(" + reason + ")", new ProcessJoinsTask(Priority.URGENT));
}
@ -356,6 +356,10 @@ public class NodeJoinController extends AbstractComponent {
private final List<MembershipAction.JoinCallback> joinCallbacksToRespondTo = new ArrayList<>();
private boolean nodeAdded = false;
public ProcessJoinsTask(Priority priority) {
super(priority);
}
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder nodesBuilder;

View File

@ -320,7 +320,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
} catch (FailedToCommitClusterStateException t) {
// cluster service logs a WARN message
logger.debug("failed to publish cluster state version [{}] (not enough nodes acknowledged, min master nodes [{}])", clusterChangedEvent.state().version(), electMaster.minimumMasterNodes());
clusterService.submitStateUpdateTask("zen-disco-failed-to-publish", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-failed-to-publish", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
@Override
public ClusterState execute(ClusterState currentState) {
return rejoin(currentState, "failed to publish to min_master_nodes");
@ -498,7 +498,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return;
}
if (localNodeMaster()) {
clusterService.submitStateUpdateTask("zen-disco-node_left(" + node + ")", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-node_left(" + node + ")", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder(currentState.nodes()).remove(node.id());
@ -538,7 +538,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
// nothing to do here...
return;
}
clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, Priority.IMMEDIATE, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, new ClusterStateUpdateTask(Priority.IMMEDIATE) {
@Override
public ClusterState execute(ClusterState currentState) {
if (currentState.nodes().get(node.id()) == null) {
@ -587,7 +587,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
// We only set the new value. If the master doesn't see enough nodes it will revoke it's mastership.
return;
}
clusterService.submitStateUpdateTask("zen-disco-minimum_master_nodes_changed", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-minimum_master_nodes_changed", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
@Override
public ClusterState execute(ClusterState currentState) {
// check if we have enough master nodes, if not, we need to move into joining the cluster again
@ -627,7 +627,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
logger.info("master_left [{}], reason [{}]", cause, masterNode, reason);
clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
@Override
public boolean runOnlyOnMaster() {
@ -694,7 +694,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
void processNextPendingClusterState(String reason) {
clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + reason + "])", Priority.URGENT, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + reason + "])", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public boolean runOnlyOnMaster() {
return false;
@ -1059,7 +1059,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return;
}
logger.debug("got a ping from another master {}. resolving who should rejoin. current ping count: [{}]", pingRequest.masterNode(), pingsWhileMaster.get());
clusterService.submitStateUpdateTask("ping from another master", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("ping from another master", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
@ -1114,7 +1114,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
class RejoinClusterRequestHandler implements TransportRequestHandler<RejoinClusterRequest> {
@Override
public void messageReceived(final RejoinClusterRequest request, final TransportChannel channel) throws Exception {
clusterService.submitStateUpdateTask("received a request to rejoin the cluster from [" + request.fromNodeId + "]", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("received a request to rejoin the cluster from [" + request.fromNodeId + "]", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
@Override
public boolean runOnlyOnMaster() {

View File

@ -43,23 +43,14 @@ import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.*;
/**
*
@ -711,8 +702,8 @@ public class ClusterServiceIT extends ESIntegTestCase {
.build();
internalCluster().startNode(settings);
ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
BlockingTask block = new BlockingTask();
clusterService.submitStateUpdateTask("test", Priority.IMMEDIATE, block);
BlockingTask block = new BlockingTask(Priority.IMMEDIATE);
clusterService.submitStateUpdateTask("test", block);
int taskCount = randomIntBetween(5, 20);
Priority[] priorities = Priority.values();
@ -721,7 +712,7 @@ public class ClusterServiceIT extends ESIntegTestCase {
CountDownLatch latch = new CountDownLatch(taskCount);
for (int i = 0; i < taskCount; i++) {
Priority priority = priorities[randomIntBetween(0, priorities.length - 1)];
clusterService.submitStateUpdateTask("test", priority, new PrioritiezedTask(priority, latch, tasks));
clusterService.submitStateUpdateTask("test", new PrioritiezedTask(priority, latch, tasks));
}
block.release();
@ -730,9 +721,9 @@ public class ClusterServiceIT extends ESIntegTestCase {
Priority prevPriority = null;
for (PrioritiezedTask task : tasks) {
if (prevPriority == null) {
prevPriority = task.priority;
prevPriority = task.priority();
} else {
assertThat(task.priority.sameOrAfter(prevPriority), is(true));
assertThat(task.priority().sameOrAfter(prevPriority), is(true));
}
}
}
@ -947,6 +938,10 @@ public class ClusterServiceIT extends ESIntegTestCase {
private static class BlockingTask extends ClusterStateUpdateTask {
private final CountDownLatch latch = new CountDownLatch(1);
public BlockingTask(Priority priority) {
super(priority);
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
latch.await();
@ -965,12 +960,11 @@ public class ClusterServiceIT extends ESIntegTestCase {
private static class PrioritiezedTask extends ClusterStateUpdateTask {
private final Priority priority;
private final CountDownLatch latch;
private final List<PrioritiezedTask> tasks;
private PrioritiezedTask(Priority priority, CountDownLatch latch, List<PrioritiezedTask> tasks) {
this.priority = priority;
super(priority);
this.latch = latch;
this.tasks = tasks;
}

View File

@ -25,11 +25,7 @@ import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -59,16 +55,7 @@ import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.discovery.ClusterDiscoveryConfiguration;
import org.elasticsearch.test.disruption.BlockClusterStateProcessing;
import org.elasticsearch.test.disruption.IntermittentLongGCDisruption;
import org.elasticsearch.test.disruption.LongGCDisruption;
import org.elasticsearch.test.disruption.NetworkDelaysPartition;
import org.elasticsearch.test.disruption.NetworkDisconnectPartition;
import org.elasticsearch.test.disruption.NetworkPartition;
import org.elasticsearch.test.disruption.NetworkUnresponsivePartition;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.disruption.SingleNodeDisruption;
import org.elasticsearch.test.disruption.SlowClusterStateProcessing;
import org.elasticsearch.test.disruption.*;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportException;
@ -78,31 +65,15 @@ import org.elasticsearch.transport.TransportService;
import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.*;
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
@ESIntegTestCase.SuppressLocalMode
@ -650,7 +621,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
// but will be queued and once the old master node un-freezes it gets executed.
// The old master node will send this update + the cluster state where he is flagged as master to the other
// nodes that follow the new master. These nodes should ignore this update.
internalCluster().getInstance(ClusterService.class, oldMasterNode).submitStateUpdateTask("sneaky-update", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
internalCluster().getInstance(ClusterService.class, oldMasterNode).submitStateUpdateTask("sneaky-update", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return ClusterState.builder(currentState).build();

View File

@ -27,13 +27,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
@ -56,11 +50,7 @@ import org.elasticsearch.test.disruption.BlockClusterStateProcessing;
import org.elasticsearch.test.disruption.SingleNodeDisruption;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.nio.file.Files;
@ -407,7 +397,7 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
// disable relocations when we do this, to make sure the shards are not relocated from node2
// due to rebalancing, and delete its content
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE)).get();
internalCluster().getInstance(ClusterService.class, nonMasterNode).submitStateUpdateTask("test", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
internalCluster().getInstance(ClusterService.class, nonMasterNode).submitStateUpdateTask("test", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder("test");

View File

@ -20,12 +20,7 @@ package org.elasticsearch.snapshots;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.service.PendingClusterTask;
@ -208,7 +203,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
private void addBlock() {
// We should block after this task - add blocking cluster state update task
clusterService.submitStateUpdateTask("test_block", passThroughPriority, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("test_block", new ClusterStateUpdateTask(passThroughPriority) {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
while(System.currentTimeMillis() < stopWaitingAt) {

View File

@ -25,7 +25,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.transport.DummyTransportAddress;
@ -115,12 +114,12 @@ public class NoopClusterService implements ClusterService {
}
@Override
public void submitStateUpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) {
public void submitStateUpdateTask(String source, ClusterStateUpdateTask updateTask) {
}
@Override
public void submitStateUpdateTask(String source, ClusterStateUpdateTask updateTask) {
public <T> void submitStateUpdateTask(String source, T task, ClusterStateTaskConfig config, ClusterStateTaskExecutor<T> executor, ClusterStateTaskListener listener) {
}

View File

@ -28,7 +28,6 @@ import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.logging.ESLogger;
@ -40,10 +39,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
@ -183,31 +179,34 @@ public class TestClusterService implements ClusterService {
}
@Override
synchronized public void submitStateUpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) {
logger.debug("processing [{}]", source);
if (state().nodes().localNodeMaster() == false && updateTask.runOnlyOnMaster()) {
updateTask.onNoLongerMaster(source);
logger.debug("failed [{}], no longer master", source);
return;
}
ClusterState newState;
ClusterState previousClusterState = state;
try {
newState = updateTask.execute(previousClusterState);
} catch (Exception e) {
updateTask.onFailure(source, new ElasticsearchException("failed to process cluster state update task [" + source + "]", e));
return;
}
setStateAndNotifyListeners(newState);
if (updateTask instanceof ClusterStateUpdateTask) {
((ClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newState);
}
logger.debug("finished [{}]", source);
public void submitStateUpdateTask(String source, ClusterStateUpdateTask updateTask) {
submitStateUpdateTask(source, null, updateTask, updateTask, updateTask);
}
@Override
public void submitStateUpdateTask(String source, ClusterStateUpdateTask updateTask) {
submitStateUpdateTask(source, Priority.NORMAL, updateTask);
synchronized public <T> void submitStateUpdateTask(String source, T task, ClusterStateTaskConfig config, ClusterStateTaskExecutor<T> executor, ClusterStateTaskListener listener) {
logger.debug("processing [{}]", source);
if (state().nodes().localNodeMaster() == false && executor.runOnlyOnMaster()) {
listener.onNoLongerMaster(source);
logger.debug("failed [{}], no longer master", source);
return;
}
ClusterStateTaskExecutor.Result result;
ClusterState previousClusterState = state;
try {
result = executor.execute(previousClusterState, Arrays.asList(task));
} catch (Exception e) {
result = new ClusterStateTaskExecutor.Result(previousClusterState, Arrays.asList(e));
}
if (result.failures.get(0) != null) {
listener.onFailure(source, new ElasticsearchException("failed to process cluster state update task [" + source + "]",
result.failures.get(0)));
return;
}
setStateAndNotifyListeners(result.resultingState);
listener.clusterStateProcessed(source, previousClusterState, result.resultingState);
logger.debug("finished [{}]", source);
}
@Override

View File

@ -58,7 +58,7 @@ public class BlockClusterStateProcessing extends SingleNodeDisruption {
boolean success = disruptionLatch.compareAndSet(null, new CountDownLatch(1));
assert success : "startDisrupting called without waiting on stopDistrupting to complete";
final CountDownLatch started = new CountDownLatch(1);
clusterService.submitStateUpdateTask("service_disruption_block", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("service_disruption_block", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
@Override
public boolean runOnlyOnMaster() {

View File

@ -102,7 +102,7 @@ public class SlowClusterStateProcessing extends SingleNodeDisruption {
return false;
}
final AtomicBoolean stopped = new AtomicBoolean(false);
clusterService.submitStateUpdateTask("service_disruption_delay", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("service_disruption_delay", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
@Override
public boolean runOnlyOnMaster() {