diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java index 0f0b2680f61..f1cc59ba760 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java @@ -74,7 +74,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction< protected void masterOperation(final ClusterHealthRequest request, final ClusterState unusedState, final ActionListener listener) { if (request.waitForEvents() != null) { final long endTimeMS = TimeValue.nsecToMSec(System.nanoTime()) + request.timeout().millis(); - clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", request.waitForEvents(), new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", new ClusterStateUpdateTask(request.waitForEvents()) { @Override public ClusterState execute(ClusterState currentState) { return currentState; diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java index f916c37aec2..d7ec84fb7a5 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java @@ -68,7 +68,7 @@ public class TransportClusterRerouteAction extends TransportMasterNodeAction listener) { - clusterService.submitStateUpdateTask("cluster_reroute (api)", Priority.IMMEDIATE, new AckedClusterStateUpdateTask(request, listener) { + clusterService.submitStateUpdateTask("cluster_reroute (api)", new AckedClusterStateUpdateTask(Priority.IMMEDIATE, request, listener) { private volatile ClusterState clusterStateToSend; private volatile RoutingExplanations explanations; diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java index b0934781dcd..73d14a2bb11 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java @@ -91,7 +91,8 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeAct final Settings.Builder transientUpdates = Settings.settingsBuilder(); final Settings.Builder persistentUpdates = Settings.settingsBuilder(); - clusterService.submitStateUpdateTask("cluster_update_settings", Priority.IMMEDIATE, new AckedClusterStateUpdateTask(request, listener) { + clusterService.submitStateUpdateTask("cluster_update_settings", + new AckedClusterStateUpdateTask(Priority.IMMEDIATE, request, listener) { private volatile boolean changed = false; @@ -132,7 +133,8 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeAct // in the components (e.g. FilterAllocationDecider), so the changes made by the first call aren't visible // to the components until the ClusterStateListener instances have been invoked, but are visible after // the first update task has been completed. - clusterService.submitStateUpdateTask("reroute_after_cluster_update_settings", Priority.URGENT, new AckedClusterStateUpdateTask(request, listener) { + clusterService.submitStateUpdateTask("reroute_after_cluster_update_settings", + new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { @Override public boolean mustAck(DiscoveryNode discoveryNode) { diff --git a/core/src/main/java/org/elasticsearch/cluster/AckedClusterStateTaskListener.java b/core/src/main/java/org/elasticsearch/cluster/AckedClusterStateTaskListener.java new file mode 100644 index 00000000000..cdd9b2204ff --- /dev/null +++ b/core/src/main/java/org/elasticsearch/cluster/AckedClusterStateTaskListener.java @@ -0,0 +1,54 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.unit.TimeValue; + +public interface AckedClusterStateTaskListener extends ClusterStateTaskListener { + + /** + * Called to determine which nodes the acknowledgement is expected from + * + * @param discoveryNode a node + * @return true if the node is expected to send ack back, false otherwise + */ + boolean mustAck(DiscoveryNode discoveryNode); + + /** + * Called once all the nodes have acknowledged the cluster state update request. Must be + * very lightweight execution, since it gets executed on the cluster service thread. + * + * @param t optional error that might have been thrown + */ + void onAllNodesAcked(@Nullable Throwable t); + + /** + * Called once the acknowledgement timeout defined by + * {@link AckedClusterStateUpdateTask#ackTimeout()} has expired + */ + void onAckTimeout(); + + /** + * Acknowledgement timeout, maximum time interval to wait for acknowledgements + */ + TimeValue ackTimeout(); + +} diff --git a/core/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java b/core/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java index 21c6cd5032a..b833f6e1879 100644 --- a/core/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java +++ b/core/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java @@ -22,18 +22,24 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ack.AckedRequest; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.unit.TimeValue; /** * An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when * all the nodes have acknowledged a cluster state update request */ -public abstract class AckedClusterStateUpdateTask extends ClusterStateUpdateTask { +public abstract class AckedClusterStateUpdateTask extends ClusterStateUpdateTask implements AckedClusterStateTaskListener { private final ActionListener listener; private final AckedRequest request; protected AckedClusterStateUpdateTask(AckedRequest request, ActionListener listener) { + this(Priority.NORMAL, request, listener); + } + + protected AckedClusterStateUpdateTask(Priority priority, AckedRequest request, ActionListener listener) { + super(priority); this.listener = listener; this.request = request; } diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterService.java b/core/src/main/java/org/elasticsearch/cluster/ClusterService.java index 8a3a287bac8..d3985bd2e78 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterService.java @@ -24,7 +24,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.service.PendingClusterTask; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.unit.TimeValue; @@ -101,12 +100,16 @@ public interface ClusterService extends LifecycleComponent { void add(@Nullable TimeValue timeout, TimeoutClusterStateListener listener); /** - * Submits a task that will update the cluster state. + * Submits a task that will update the cluster state, using the given config. result will communicated + * to the given listener */ - void submitStateUpdateTask(final String source, Priority priority, final ClusterStateUpdateTask updateTask); + void submitStateUpdateTask(final String source, final T task, + final ClusterStateTaskConfig config, + final ClusterStateTaskExecutor executor, + final ClusterStateTaskListener listener); /** - * Submits a task that will update the cluster state (the task has a default priority of {@link Priority#NORMAL}). + * Submits a task that will update the cluster state; */ void submitStateUpdateTask(final String source, final ClusterStateUpdateTask updateTask); diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskConfig.java b/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskConfig.java new file mode 100644 index 00000000000..662095798af --- /dev/null +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskConfig.java @@ -0,0 +1,64 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.unit.TimeValue; + +public interface ClusterStateTaskConfig { + + /** + * If the cluster state update task wasn't processed by the provided timeout, call + * {@link ClusterStateTaskListener#onFailure(String, Throwable)}. May return null to indicate no timeout is needed (default). + */ + @Nullable + TimeValue timeout(); + + Priority priority(); + + static ClusterStateTaskConfig build(Priority priority) { + return new Basic(priority, null); + } + + static ClusterStateTaskConfig build(Priority priority, TimeValue timeout) { + return new Basic(priority, timeout); + } + + + class Basic implements ClusterStateTaskConfig { + final TimeValue timeout; + final Priority priority; + + public Basic(Priority priority, TimeValue timeout) { + this.timeout = timeout; + this.priority = priority; + } + + @Override + public TimeValue timeout() { + return timeout; + } + + @Override + public Priority priority() { + return priority; + } + } +} diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java b/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java new file mode 100644 index 00000000000..861b924c52e --- /dev/null +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java @@ -0,0 +1,53 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster; + +import java.util.Arrays; +import java.util.List; + +public interface ClusterStateTaskExecutor { + /** + * Update the cluster state based on the current state and the given tasks. Return the *same instance* if no state + * should be changed. + */ + Result execute(ClusterState currentState, List tasks) throws Exception; + + /** + * indicates whether this task should only run if current node is master + */ + default boolean runOnlyOnMaster() { + return true; + } + + class Result { + final public ClusterState resultingState; + final public List failures; + + public Result(ClusterState resultingState, int numberOfTasks) { + this.resultingState = resultingState; + failures = Arrays.asList(new Throwable[numberOfTasks]); + } + + public Result(ClusterState resultingState, List failures) { + this.resultingState = resultingState; + this.failures = failures; + } + } + +} diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskListener.java b/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskListener.java new file mode 100644 index 00000000000..16945d91971 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskListener.java @@ -0,0 +1,48 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster; + +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; + +import java.util.List; + +public interface ClusterStateTaskListener { + + /** + * A callback called when execute fails. + */ + void onFailure(String source, Throwable t); + + /** + * called when the task was rejected because the local node is no longer master + */ + default void onNoLongerMaster(String source) { + onFailure(source, new EsRejectedExecutionException("no longer master. source: [" + source + "]")); + } + + /** + * Called when the result of the {@link ClusterStateTaskExecutor#execute(ClusterState, List)} have been processed + * properly by all listeners. + */ + default void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + } + + ; + +} diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java b/core/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java index 7fef94d5c17..17c4635c7de 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java @@ -20,13 +20,31 @@ package org.elasticsearch.cluster; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; + +import java.util.List; /** * A task that can update the cluster state. */ -abstract public class ClusterStateUpdateTask { +abstract public class ClusterStateUpdateTask implements ClusterStateTaskConfig, ClusterStateTaskExecutor, ClusterStateTaskListener { + + final private Priority priority; + + public ClusterStateUpdateTask() { + this(Priority.NORMAL); + } + + public ClusterStateUpdateTask(Priority priority) { + this.priority = priority; + } + + @Override + final public Result execute(ClusterState currentState, List tasks) throws Exception { + ClusterState result = execute(currentState); + return new Result(result, tasks.size()); + } /** * Update the cluster state based on the current state. Return the *same instance* if no state @@ -39,28 +57,6 @@ abstract public class ClusterStateUpdateTask { */ abstract public void onFailure(String source, Throwable t); - - /** - * indicates whether this task should only run if current node is master - */ - public boolean runOnlyOnMaster() { - return true; - } - - /** - * called when the task was rejected because the local node is no longer master - */ - public void onNoLongerMaster(String source) { - onFailure(source, new NotMasterException("no longer master. source: [" + source + "]")); - } - - /** - * Called when the result of the {@link #execute(ClusterState)} have been processed - * properly by all listeners. - */ - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - } - /** * If the cluster state update task wasn't processed by the provided timeout, call * {@link #onFailure(String, Throwable)}. May return null to indicate no timeout is needed (default). @@ -70,5 +66,8 @@ abstract public class ClusterStateUpdateTask { return null; } - + @Override + public Priority priority() { + return priority; + } } diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 83897baa50d..1b43a33627b 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -144,7 +144,8 @@ public class ShardStateAction extends AbstractComponent { private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) { logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry); failedShardQueue.add(shardRoutingEntry); - clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]", Priority.HIGH, new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]", + new ClusterStateUpdateTask(Priority.HIGH) { @Override public ClusterState execute(ClusterState currentState) { @@ -198,8 +199,13 @@ public class ShardStateAction extends AbstractComponent { // process started events as fast as possible, to make shards available startedShardsQueue.add(shardRoutingEntry); - clusterService.submitStateUpdateTask("shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]", Priority.URGENT, + clusterService.submitStateUpdateTask("shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]", new ClusterStateUpdateTask() { + @Override + public Priority priority() { + return Priority.URGENT; + } + @Override public ClusterState execute(ClusterState currentState) { diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index 5b83870cc3c..d3ba811a6e5 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -170,12 +170,12 @@ public class MetaDataCreateIndexService extends AbstractComponent { updatedSettingsBuilder.put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX); request.settings(updatedSettingsBuilder.build()); - clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]", Priority.URGENT, new AckedClusterStateUpdateTask(request, listener) { - - @Override - protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { - return new ClusterStateUpdateResponse(acknowledged); - } + clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]", + new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { + @Override + protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { + return new ClusterStateUpdateResponse(acknowledged); + } @Override public ClusterState execute(ClusterState currentState) throws Exception { diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java index fc87bae6507..54c014fb4ed 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java @@ -39,7 +39,6 @@ import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; import java.util.Collection; -import java.util.Locale; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -71,7 +70,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent { Collection indices = Arrays.asList(request.indices); final DeleteIndexListener listener = new DeleteIndexListener(userListener); - clusterService.submitStateUpdateTask("delete-index " + indices, Priority.URGENT, new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("delete-index " + indices, new ClusterStateUpdateTask(Priority.URGENT) { @Override public TimeValue timeout() { diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java index 80ff68e6cf9..b13f9711bef 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java @@ -62,7 +62,7 @@ public class MetaDataIndexAliasesService extends AbstractComponent { } public void indicesAliases(final IndicesAliasesClusterStateUpdateRequest request, final ActionListener listener) { - clusterService.submitStateUpdateTask("index-aliases", Priority.URGENT, new AckedClusterStateUpdateTask(request, listener) { + clusterService.submitStateUpdateTask("index-aliases", new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { @Override protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { return new ClusterStateUpdateResponse(acknowledged); diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index b1c9f0749b0..1fa1b702f66 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -76,7 +76,7 @@ public class MetaDataIndexStateService extends AbstractComponent { } final String indicesAsString = Arrays.toString(request.indices()); - clusterService.submitStateUpdateTask("close-indices " + indicesAsString, Priority.URGENT, new AckedClusterStateUpdateTask(request, listener) { + clusterService.submitStateUpdateTask("close-indices " + indicesAsString, new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { @Override protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { return new ClusterStateUpdateResponse(acknowledged); @@ -140,7 +140,7 @@ public class MetaDataIndexStateService extends AbstractComponent { } final String indicesAsString = Arrays.toString(request.indices()); - clusterService.submitStateUpdateTask("open-indices " + indicesAsString, Priority.URGENT, new AckedClusterStateUpdateTask(request, listener) { + clusterService.submitStateUpdateTask("open-indices " + indicesAsString, new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { @Override protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { return new ClusterStateUpdateResponse(acknowledged); diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexTemplateService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexTemplateService.java index 13823e8ebdd..3d7d19b27b9 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexTemplateService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexTemplateService.java @@ -56,7 +56,7 @@ public class MetaDataIndexTemplateService extends AbstractComponent { } public void removeTemplates(final RemoveRequest request, final RemoveListener listener) { - clusterService.submitStateUpdateTask("remove-index-template [" + request.name + "]", Priority.URGENT, new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("remove-index-template [" + request.name + "]", new ClusterStateUpdateTask(Priority.URGENT) { @Override public TimeValue timeout() { @@ -143,7 +143,8 @@ public class MetaDataIndexTemplateService extends AbstractComponent { } final IndexTemplateMetaData template = templateBuilder.build(); - clusterService.submitStateUpdateTask("create-index-template [" + request.name + "], cause [" + request.cause + "]", Priority.URGENT, new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("create-index-template [" + request.name + "], cause [" + request.cause + "]", + new ClusterStateUpdateTask(Priority.URGENT) { @Override public TimeValue timeout() { diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index 725f06d9fae..215dde061db 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -22,17 +22,17 @@ package org.elasticsearch.cluster.metadata; import com.carrotsearch.hppc.cursors.ObjectCursor; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingClusterStateUpdateRequest; -import org.elasticsearch.cluster.AckedClusterStateUpdateTask; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.NodeServicesProvider; @@ -44,6 +44,7 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InvalidTypeNameException; import org.elasticsearch.percolator.PercolatorService; +import java.io.IOException; import java.util.*; /** * Service responsible for submitting mapping changes @@ -53,13 +54,11 @@ public class MetaDataMappingService extends AbstractComponent { private final ClusterService clusterService; private final IndicesService indicesService; - // the mutex protect all the refreshOrUpdate variables! - private final Object refreshOrUpdateMutex = new Object(); - private final List refreshOrUpdateQueue = new ArrayList<>(); - private long refreshOrUpdateInsertOrder; - private long refreshOrUpdateProcessedInsertOrder; + final ClusterStateTaskExecutor refreshExectuor = new RefreshTaskExecutor(); + final ClusterStateTaskExecutor putMappingExecutor = new PutMappingExecutor(); private final NodeServicesProvider nodeServicesProvider; + @Inject public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService, NodeServicesProvider nodeServicesProvider) { super(settings); @@ -68,37 +67,23 @@ public class MetaDataMappingService extends AbstractComponent { this.nodeServicesProvider = nodeServicesProvider; } - static class MappingTask { + static class RefreshTask { final String index; final String indexUUID; - - MappingTask(String index, final String indexUUID) { - this.index = index; - this.indexUUID = indexUUID; - } - } - - static class RefreshTask extends MappingTask { final String[] types; RefreshTask(String index, final String indexUUID, String[] types) { - super(index, indexUUID); + this.index = index; + this.indexUUID = indexUUID; this.types = types; } } - static class UpdateTask extends MappingTask { - final String type; - final CompressedXContent mappingSource; - final String nodeId; // null fr unknown - final ActionListener listener; - - UpdateTask(String index, String indexUUID, String type, CompressedXContent mappingSource, String nodeId, ActionListener listener) { - super(index, indexUUID); - this.type = type; - this.mappingSource = mappingSource; - this.nodeId = nodeId; - this.listener = listener; + class RefreshTaskExecutor implements ClusterStateTaskExecutor { + @Override + public Result execute(ClusterState currentState, List tasks) throws Exception { + ClusterState newClusterState = executeRefresh(currentState, tasks); + return new Result(newClusterState, tasks.size()); } } @@ -107,39 +92,19 @@ public class MetaDataMappingService extends AbstractComponent { * as possible so we won't create the same index all the time for example for the updates on the same mapping * and generate a single cluster change event out of all of those. */ - Tuple> executeRefreshOrUpdate(final ClusterState currentState, final long insertionOrder) throws Exception { - final List allTasks = new ArrayList<>(); - - synchronized (refreshOrUpdateMutex) { - if (refreshOrUpdateQueue.isEmpty()) { - return Tuple.tuple(currentState, allTasks); - } - - // we already processed this task in a bulk manner in a previous cluster event, simply ignore - // it so we will let other tasks get in and processed ones, we will handle the queued ones - // later on in a subsequent cluster state event - if (insertionOrder < refreshOrUpdateProcessedInsertOrder) { - return Tuple.tuple(currentState, allTasks); - } - - allTasks.addAll(refreshOrUpdateQueue); - refreshOrUpdateQueue.clear(); - - refreshOrUpdateProcessedInsertOrder = refreshOrUpdateInsertOrder; - } - + ClusterState executeRefresh(final ClusterState currentState, final List allTasks) throws Exception { if (allTasks.isEmpty()) { - return Tuple.tuple(currentState, allTasks); + return currentState; } // break down to tasks per index, so we can optimize the on demand index service creation // to only happen for the duration of a single index processing of its respective events - Map> tasksPerIndex = new HashMap<>(); - for (MappingTask task : allTasks) { + Map> tasksPerIndex = new HashMap<>(); + for (RefreshTask task : allTasks) { if (task.index == null) { logger.debug("ignoring a mapping task of type [{}] with a null index.", task); } - List indexTasks = tasksPerIndex.get(task.index); + List indexTasks = tasksPerIndex.get(task.index); if (indexTasks == null) { indexTasks = new ArrayList<>(); tasksPerIndex.put(task.index, indexTasks); @@ -150,7 +115,7 @@ public class MetaDataMappingService extends AbstractComponent { boolean dirty = false; MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData()); - for (Map.Entry> entry : tasksPerIndex.entrySet()) { + for (Map.Entry> entry : tasksPerIndex.entrySet()) { String index = entry.getKey(); IndexMetaData indexMetaData = mdBuilder.get(index); if (indexMetaData == null) { @@ -160,9 +125,9 @@ public class MetaDataMappingService extends AbstractComponent { } // the tasks lists to iterate over, filled with the list of mapping tasks, trying to keep // the latest (based on order) update mapping one per node - List allIndexTasks = entry.getValue(); - List tasks = new ArrayList<>(); - for (MappingTask task : allIndexTasks) { + List allIndexTasks = entry.getValue(); + List tasks = new ArrayList<>(); + for (RefreshTask task : allIndexTasks) { if (!indexMetaData.isSameUUID(task.indexUUID)) { logger.debug("[{}] ignoring task [{}] - index meta data doesn't match task uuid", index, task); continue; @@ -178,12 +143,8 @@ public class MetaDataMappingService extends AbstractComponent { indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.EMPTY_LIST); removeIndex = true; Set typesToIntroduce = new HashSet<>(); - for (MappingTask task : tasks) { - if (task instanceof UpdateTask) { - typesToIntroduce.add(((UpdateTask) task).type); - } else if (task instanceof RefreshTask) { - Collections.addAll(typesToIntroduce, ((RefreshTask) task).types); - } + for (RefreshTask task : tasks) { + Collections.addAll(typesToIntroduce, task.types); } for (String type : typesToIntroduce) { // only add the current relevant mapping (if exists) @@ -209,80 +170,42 @@ public class MetaDataMappingService extends AbstractComponent { } if (!dirty) { - return Tuple.tuple(currentState, allTasks); + return currentState; } - return Tuple.tuple(ClusterState.builder(currentState).metaData(mdBuilder).build(), allTasks); + return ClusterState.builder(currentState).metaData(mdBuilder).build(); } - private boolean processIndexMappingTasks(List tasks, IndexService indexService, IndexMetaData.Builder builder) { + private boolean processIndexMappingTasks(List tasks, IndexService indexService, IndexMetaData.Builder builder) { boolean dirty = false; String index = indexService.index().name(); // keep track of what we already refreshed, no need to refresh it again... Set processedRefreshes = new HashSet<>(); - for (MappingTask task : tasks) { - if (task instanceof RefreshTask) { - RefreshTask refreshTask = (RefreshTask) task; - try { - List updatedTypes = new ArrayList<>(); - for (String type : refreshTask.types) { - if (processedRefreshes.contains(type)) { - continue; - } - DocumentMapper mapper = indexService.mapperService().documentMapper(type); - if (mapper == null) { - continue; - } - if (!mapper.mappingSource().equals(builder.mapping(type).source())) { - updatedTypes.add(type); - builder.putMapping(new MappingMetaData(mapper)); - } - processedRefreshes.add(type); - } - - if (updatedTypes.isEmpty()) { + for (RefreshTask refreshTask : tasks) { + try { + List updatedTypes = new ArrayList<>(); + for (String type : refreshTask.types) { + if (processedRefreshes.contains(type)) { continue; } - - logger.warn("[{}] re-syncing mappings with cluster state for types [{}]", index, updatedTypes); - dirty = true; - } catch (Throwable t) { - logger.warn("[{}] failed to refresh-mapping in cluster state, types [{}]", index, refreshTask.types); - } - } else if (task instanceof UpdateTask) { - UpdateTask updateTask = (UpdateTask) task; - try { - String type = updateTask.type; - CompressedXContent mappingSource = updateTask.mappingSource; - - MappingMetaData mappingMetaData = builder.mapping(type); - if (mappingMetaData != null && mappingMetaData.source().equals(mappingSource)) { - logger.debug("[{}] update_mapping [{}] ignoring mapping update task as its source is equal to ours", index, updateTask.type); + DocumentMapper mapper = indexService.mapperService().documentMapper(type); + if (mapper == null) { continue; } - - DocumentMapper updatedMapper = indexService.mapperService().merge(type, mappingSource, false, true); + if (!mapper.mappingSource().equals(builder.mapping(type).source())) { + updatedTypes.add(type); + builder.putMapping(new MappingMetaData(mapper)); + } processedRefreshes.add(type); - - // if we end up with the same mapping as the original once, ignore - if (mappingMetaData != null && mappingMetaData.source().equals(updatedMapper.mappingSource())) { - logger.debug("[{}] update_mapping [{}] ignoring mapping update task as it results in the same source as what we have", index, updateTask.type); - continue; - } - - // build the updated mapping source - if (logger.isDebugEnabled()) { - logger.debug("[{}] update_mapping [{}] (dynamic) with source [{}]", index, type, updatedMapper.mappingSource()); - } else if (logger.isInfoEnabled()) { - logger.info("[{}] update_mapping [{}] (dynamic)", index, type); - } - - builder.putMapping(new MappingMetaData(updatedMapper)); - dirty = true; - } catch (Throwable t) { - logger.warn("[{}] failed to update-mapping in cluster state, type [{}]", index, updateTask.type); } - } else { - logger.warn("illegal state, got wrong mapping task type [{}]", task); + + if (updatedTypes.isEmpty()) { + continue; + } + + logger.warn("[{}] re-syncing mappings with cluster state for types [{}]", index, updatedTypes); + dirty = true; + } catch (Throwable t) { + logger.warn("[{}] failed to refresh-mapping in cluster state, types [{}]", index, refreshTask.types); } } return dirty; @@ -292,197 +215,203 @@ public class MetaDataMappingService extends AbstractComponent { * Refreshes mappings if they are not the same between original and parsed version */ public void refreshMapping(final String index, final String indexUUID, final String... types) { - final long insertOrder; - synchronized (refreshOrUpdateMutex) { - insertOrder = ++refreshOrUpdateInsertOrder; - refreshOrUpdateQueue.add(new RefreshTask(index, indexUUID, types)); - } - clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", Priority.HIGH, new ClusterStateUpdateTask() { - private volatile List allTasks; + final RefreshTask refreshTask = new RefreshTask(index, indexUUID, types); + clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", + refreshTask, + ClusterStateTaskConfig.build(Priority.HIGH), + refreshExectuor, + (source, t) -> logger.warn("failure during [{}]", t, source) + ); + } - @Override - public void onFailure(String source, Throwable t) { - logger.warn("failure during [{}]", t, source); - } - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - Tuple> tuple = executeRefreshOrUpdate(currentState, insertOrder); - this.allTasks = tuple.v2(); - return tuple.v1(); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (allTasks == null) { - return; + class PutMappingExecutor implements ClusterStateTaskExecutor { + @Override + public Result execute(ClusterState currentState, List tasks) throws Exception { + List indicesToClose = new ArrayList<>(); + ArrayList failures = new ArrayList<>(tasks.size()); + try { + // precreate incoming indices; + for (PutMappingClusterStateUpdateRequest request : tasks) { + // failures here mean something is broken with our cluster state - fail all tasks by letting exceptions bubble up + for (String index : request.indices()) { + if (currentState.metaData().hasIndex(index)) { + // if we don't have the index, we will throw exceptions later; + if (indicesService.hasIndex(index) == false) { + final IndexMetaData indexMetaData = currentState.metaData().index(index); + IndexService indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.EMPTY_LIST); + indicesToClose.add(indexMetaData.getIndex()); + // make sure to add custom default mapping if exists + if (indexMetaData.getMappings().containsKey(MapperService.DEFAULT_MAPPING)) { + indexService.mapperService().merge(MapperService.DEFAULT_MAPPING, indexMetaData.getMappings().get(MapperService.DEFAULT_MAPPING).source(), false, request.updateAllTypes()); + } + // only add the current relevant mapping (if exists) + if (indexMetaData.getMappings().containsKey(request.type())) { + indexService.mapperService().merge(request.type(), indexMetaData.getMappings().get(request.type()).source(), false, request.updateAllTypes()); + } + } + } + } } - for (Object task : allTasks) { - if (task instanceof UpdateTask) { - UpdateTask uTask = (UpdateTask) task; - ClusterStateUpdateResponse response = new ClusterStateUpdateResponse(true); - uTask.listener.onResponse(response); + for (PutMappingClusterStateUpdateRequest request : tasks) { + try { + currentState = applyRequest(currentState, request); + failures.add(null); + } catch (Throwable t) { + failures.add(t); + } + } + + return new Result(currentState, failures); + } finally { + for (String index : indicesToClose) { + indicesService.removeIndex(index, "created for mapping processing"); + } + } + } + + private ClusterState applyRequest(ClusterState currentState, PutMappingClusterStateUpdateRequest request) throws IOException { + Map newMappers = new HashMap<>(); + Map existingMappers = new HashMap<>(); + for (String index : request.indices()) { + IndexService indexService = indicesService.indexServiceSafe(index); + // try and parse it (no need to add it here) so we can bail early in case of parsing exception + DocumentMapper newMapper; + DocumentMapper existingMapper = indexService.mapperService().documentMapper(request.type()); + if (MapperService.DEFAULT_MAPPING.equals(request.type())) { + // _default_ types do not go through merging, but we do test the new settings. Also don't apply the old default + newMapper = indexService.mapperService().parse(request.type(), new CompressedXContent(request.source()), false); + } else { + newMapper = indexService.mapperService().parse(request.type(), new CompressedXContent(request.source()), existingMapper == null); + if (existingMapper != null) { + // first, simulate + MergeResult mergeResult = existingMapper.merge(newMapper.mapping(), true, request.updateAllTypes()); + // if we have conflicts, throw an exception + if (mergeResult.hasConflicts()) { + throw new MergeMappingException(mergeResult.buildConflicts()); + } + } else { + // TODO: can we find a better place for this validation? + // The reason this validation is here is that the mapper service doesn't learn about + // new types all at once , which can create a false error. + + // For example in MapperService we can't distinguish between a create index api call + // and a put mapping api call, so we don't which type did exist before. + // Also the order of the mappings may be backwards. + if (newMapper.parentFieldMapper().active()) { + IndexMetaData indexMetaData = currentState.metaData().index(index); + for (ObjectCursor mapping : indexMetaData.getMappings().values()) { + if (newMapper.parentFieldMapper().type().equals(mapping.value.type())) { + throw new IllegalArgumentException("can't add a _parent field that points to an already existing type"); + } + } + } + } + } + newMappers.put(index, newMapper); + if (existingMapper != null) { + existingMappers.put(index, existingMapper); + } + } + + String mappingType = request.type(); + if (mappingType == null) { + mappingType = newMappers.values().iterator().next().type(); + } else if (!mappingType.equals(newMappers.values().iterator().next().type())) { + throw new InvalidTypeNameException("Type name provided does not match type name within mapping definition"); + } + if (!MapperService.DEFAULT_MAPPING.equals(mappingType) && !PercolatorService.TYPE_NAME.equals(mappingType) && mappingType.charAt(0) == '_') { + throw new InvalidTypeNameException("Document mapping type name can't start with '_'"); + } + final Map mappings = new HashMap<>(); + for (Map.Entry entry : newMappers.entrySet()) { + String index = entry.getKey(); + // do the actual merge here on the master, and update the mapping source + DocumentMapper newMapper = entry.getValue(); + IndexService indexService = indicesService.indexService(index); + if (indexService == null) { + continue; + } + + CompressedXContent existingSource = null; + if (existingMappers.containsKey(entry.getKey())) { + existingSource = existingMappers.get(entry.getKey()).mappingSource(); + } + DocumentMapper mergedMapper = indexService.mapperService().merge(newMapper.type(), newMapper.mappingSource(), false, request.updateAllTypes()); + CompressedXContent updatedSource = mergedMapper.mappingSource(); + + if (existingSource != null) { + if (existingSource.equals(updatedSource)) { + // same source, no changes, ignore it + } else { + // use the merged mapping source + mappings.put(index, new MappingMetaData(mergedMapper)); + if (logger.isDebugEnabled()) { + logger.debug("[{}] update_mapping [{}] with source [{}]", index, mergedMapper.type(), updatedSource); + } else if (logger.isInfoEnabled()) { + logger.info("[{}] update_mapping [{}]", index, mergedMapper.type()); + } + + } + } else { + mappings.put(index, new MappingMetaData(mergedMapper)); + if (logger.isDebugEnabled()) { + logger.debug("[{}] create_mapping [{}] with source [{}]", index, newMapper.type(), updatedSource); + } else if (logger.isInfoEnabled()) { + logger.info("[{}] create_mapping [{}]", index, newMapper.type()); } } } - }); + if (mappings.isEmpty()) { + // no changes, return + return currentState; + } + MetaData.Builder builder = MetaData.builder(currentState.metaData()); + for (String indexName : request.indices()) { + IndexMetaData indexMetaData = currentState.metaData().index(indexName); + if (indexMetaData == null) { + throw new IndexNotFoundException(indexName); + } + MappingMetaData mappingMd = mappings.get(indexName); + if (mappingMd != null) { + builder.put(IndexMetaData.builder(indexMetaData).putMapping(mappingMd)); + } + } + + return ClusterState.builder(currentState).metaData(builder).build(); + } } public void putMapping(final PutMappingClusterStateUpdateRequest request, final ActionListener listener) { + clusterService.submitStateUpdateTask("put-mapping [" + request.type() + "]", + request, + ClusterStateTaskConfig.build(Priority.HIGH, request.masterNodeTimeout()), + putMappingExecutor, + new AckedClusterStateTaskListener() { - clusterService.submitStateUpdateTask("put-mapping [" + request.type() + "]", Priority.HIGH, new AckedClusterStateUpdateTask(request, listener) { - - @Override - protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { - return new ClusterStateUpdateResponse(acknowledged); - } - - @Override - public ClusterState execute(final ClusterState currentState) throws Exception { - List indicesToClose = new ArrayList<>(); - try { - for (String index : request.indices()) { - if (!currentState.metaData().hasIndex(index)) { - throw new IndexNotFoundException(index); - } + @Override + public void onFailure(String source, Throwable t) { + listener.onFailure(t); } - // pre create indices here and add mappings to them so we can merge the mappings here if needed - for (String index : request.indices()) { - if (indicesService.hasIndex(index)) { - continue; - } - final IndexMetaData indexMetaData = currentState.metaData().index(index); - IndexService indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.EMPTY_LIST); - indicesToClose.add(indexMetaData.getIndex()); - // make sure to add custom default mapping if exists - if (indexMetaData.getMappings().containsKey(MapperService.DEFAULT_MAPPING)) { - indexService.mapperService().merge(MapperService.DEFAULT_MAPPING, indexMetaData.getMappings().get(MapperService.DEFAULT_MAPPING).source(), false, request.updateAllTypes()); - } - // only add the current relevant mapping (if exists) - if (indexMetaData.getMappings().containsKey(request.type())) { - indexService.mapperService().merge(request.type(), indexMetaData.getMappings().get(request.type()).source(), false, request.updateAllTypes()); - } + @Override + public boolean mustAck(DiscoveryNode discoveryNode) { + return true; } - Map newMappers = new HashMap<>(); - Map existingMappers = new HashMap<>(); - for (String index : request.indices()) { - IndexService indexService = indicesService.indexServiceSafe(index); - // try and parse it (no need to add it here) so we can bail early in case of parsing exception - DocumentMapper newMapper; - DocumentMapper existingMapper = indexService.mapperService().documentMapper(request.type()); - if (MapperService.DEFAULT_MAPPING.equals(request.type())) { - // _default_ types do not go through merging, but we do test the new settings. Also don't apply the old default - newMapper = indexService.mapperService().parse(request.type(), new CompressedXContent(request.source()), false); - } else { - newMapper = indexService.mapperService().parse(request.type(), new CompressedXContent(request.source()), existingMapper == null); - if (existingMapper != null) { - // first, simulate - MergeResult mergeResult = existingMapper.merge(newMapper.mapping(), true, request.updateAllTypes()); - // if we have conflicts, throw an exception - if (mergeResult.hasConflicts()) { - throw new MergeMappingException(mergeResult.buildConflicts()); - } - } else { - // TODO: can we find a better place for this validation? - // The reason this validation is here is that the mapper service doesn't learn about - // new types all at once , which can create a false error. - - // For example in MapperService we can't distinguish between a create index api call - // and a put mapping api call, so we don't which type did exist before. - // Also the order of the mappings may be backwards. - if (newMapper.parentFieldMapper().active()) { - IndexMetaData indexMetaData = currentState.metaData().index(index); - for (ObjectCursor mapping : indexMetaData.getMappings().values()) { - if (newMapper.parentFieldMapper().type().equals(mapping.value.type())) { - throw new IllegalArgumentException("can't add a _parent field that points to an already existing type"); - } - } - } - } - } - - - newMappers.put(index, newMapper); - if (existingMapper != null) { - existingMappers.put(index, existingMapper); - } + @Override + public void onAllNodesAcked(@Nullable Throwable t) { + listener.onResponse(new ClusterStateUpdateResponse(true)); } - String mappingType = request.type(); - if (mappingType == null) { - mappingType = newMappers.values().iterator().next().type(); - } else if (!mappingType.equals(newMappers.values().iterator().next().type())) { - throw new InvalidTypeNameException("Type name provided does not match type name within mapping definition"); - } - if (!MapperService.DEFAULT_MAPPING.equals(mappingType) && !PercolatorService.TYPE_NAME.equals(mappingType) && mappingType.charAt(0) == '_') { - throw new InvalidTypeNameException("Document mapping type name can't start with '_'"); + @Override + public void onAckTimeout() { + listener.onResponse(new ClusterStateUpdateResponse(false)); } - final Map mappings = new HashMap<>(); - for (Map.Entry entry : newMappers.entrySet()) { - String index = entry.getKey(); - // do the actual merge here on the master, and update the mapping source - DocumentMapper newMapper = entry.getValue(); - IndexService indexService = indicesService.indexService(index); - if (indexService == null) { - continue; - } - - CompressedXContent existingSource = null; - if (existingMappers.containsKey(entry.getKey())) { - existingSource = existingMappers.get(entry.getKey()).mappingSource(); - } - DocumentMapper mergedMapper = indexService.mapperService().merge(newMapper.type(), newMapper.mappingSource(), false, request.updateAllTypes()); - CompressedXContent updatedSource = mergedMapper.mappingSource(); - - if (existingSource != null) { - if (existingSource.equals(updatedSource)) { - // same source, no changes, ignore it - } else { - // use the merged mapping source - mappings.put(index, new MappingMetaData(mergedMapper)); - if (logger.isDebugEnabled()) { - logger.debug("[{}] update_mapping [{}] with source [{}]", index, mergedMapper.type(), updatedSource); - } else if (logger.isInfoEnabled()) { - logger.info("[{}] update_mapping [{}]", index, mergedMapper.type()); - } - } - } else { - mappings.put(index, new MappingMetaData(mergedMapper)); - if (logger.isDebugEnabled()) { - logger.debug("[{}] create_mapping [{}] with source [{}]", index, newMapper.type(), updatedSource); - } else if (logger.isInfoEnabled()) { - logger.info("[{}] create_mapping [{}]", index, newMapper.type()); - } - } + @Override + public TimeValue ackTimeout() { + return request.ackTimeout(); } - - if (mappings.isEmpty()) { - // no changes, return - return currentState; - } - - MetaData.Builder builder = MetaData.builder(currentState.metaData()); - for (String indexName : request.indices()) { - IndexMetaData indexMetaData = currentState.metaData().index(indexName); - if (indexMetaData == null) { - throw new IndexNotFoundException(indexName); - } - MappingMetaData mappingMd = mappings.get(indexName); - if (mappingMd != null) { - builder.put(IndexMetaData.builder(indexMetaData).putMapping(mappingMd)); - } - } - - return ClusterState.builder(currentState).metaData(builder).build(); - } finally { - for (String index : indicesToClose) { - indicesService.removeIndex(index, "created for mapping processing"); - } - } - } - }); + }); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java index 1a928dd41ea..eaa1eefd25e 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java @@ -24,11 +24,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeSettingsClusterStateUpdateRequest; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.cluster.AckedClusterStateUpdateTask; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.routing.RoutingTable; @@ -44,13 +40,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.settings.IndexDynamicSettings; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; +import java.util.*; import static org.elasticsearch.common.settings.Settings.settingsBuilder; @@ -219,7 +209,8 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements } final Settings openSettings = updatedSettingsBuilder.build(); - clusterService.submitStateUpdateTask("update-settings", Priority.URGENT, new AckedClusterStateUpdateTask(request, listener) { + clusterService.submitStateUpdateTask("update-settings", + new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { @Override protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { @@ -334,7 +325,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements public void upgradeIndexSettings(final UpgradeSettingsClusterStateUpdateRequest request, final ActionListener listener) { - clusterService.submitStateUpdateTask("update-index-compatibility-versions", Priority.URGENT, new AckedClusterStateUpdateTask(request, listener) { + clusterService.submitStateUpdateTask("update-index-compatibility-versions", new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { @Override protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java index 6f43e880e3f..5cd4366bea4 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java @@ -147,7 +147,7 @@ public class RoutingService extends AbstractLifecycleComponent i return; } logger.trace("rerouting {}", reason); - clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")", Priority.HIGH, new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")", new ClusterStateUpdateTask(Priority.HIGH) { @Override public ClusterState execute(ClusterState currentState) { rerouting.set(false); diff --git a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index c2300739a7d..ce936c83d47 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -20,16 +20,8 @@ package org.elasticsearch.cluster.service; import org.elasticsearch.Version; -import org.elasticsearch.cluster.AckedClusterStateUpdateTask; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.ClusterState.Builder; -import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.LocalNodeMasterListener; -import org.elasticsearch.cluster.TimeoutClusterStateListener; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.MetaData; @@ -41,6 +33,7 @@ import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; @@ -49,13 +42,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.util.concurrent.CountDown; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.util.concurrent.FutureUtils; -import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; -import org.elasticsearch.common.util.concurrent.PrioritizedRunnable; +import org.elasticsearch.common.util.concurrent.*; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoveryService; @@ -63,18 +50,10 @@ import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executor; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; @@ -111,6 +90,7 @@ public class InternalClusterService extends AbstractLifecycleComponent priorityClusterStateListeners = new CopyOnWriteArrayList<>(); private final Collection clusterStateListeners = new CopyOnWriteArrayList<>(); private final Collection lastClusterStateListeners = new CopyOnWriteArrayList<>(); + private final Map> updateTasksPerExecutor = new HashMap<>(); // TODO this is rather frequently changing I guess a Synced Set would be better here and a dedicated remove API private final Collection postAppliedListeners = new CopyOnWriteArrayList<>(); private final Iterable preAppliedListeners = Iterables.concat(priorityClusterStateListeners, clusterStateListeners, lastClusterStateListeners); @@ -289,30 +269,47 @@ public class InternalClusterService extends AbstractLifecycleComponent void submitStateUpdateTask(final String source, final T task, + final ClusterStateTaskConfig config, + final ClusterStateTaskExecutor executor, + final ClusterStateTaskListener listener + ) { if (!lifecycle.started()) { return; } try { - final UpdateTask task = new UpdateTask(source, priority, updateTask); - if (updateTask.timeout() != null) { - updateTasksExecutor.execute(task, threadPool.scheduler(), updateTask.timeout(), new Runnable() { + final UpdateTask updateTask = new UpdateTask<>(source, task, config, executor, listener); + + synchronized (updateTasksPerExecutor) { + List pendingTasks = updateTasksPerExecutor.get(executor); + if (pendingTasks == null) { + pendingTasks = new ArrayList<>(); + updateTasksPerExecutor.put(executor, pendingTasks); + } + pendingTasks.add(updateTask); + } + + if (config.timeout() != null) { + updateTasksExecutor.execute(updateTask, threadPool.scheduler(), config.timeout(), new Runnable() { @Override public void run() { threadPool.generic().execute(new Runnable() { @Override public void run() { - updateTask.onFailure(task.source(), new ProcessClusterEventTimeoutException(updateTask.timeout(), task.source())); + if (updateTask.processed.getAndSet(true) == false) { + listener.onFailure(source, new ProcessClusterEventTimeoutException(config.timeout(), source)); + } } }); } }); } else { - updateTasksExecutor.execute(task); + updateTasksExecutor.execute(updateTask); } } catch (EsRejectedExecutionException e) { // ignore cases where we are shutting down..., there is really nothing interesting @@ -379,188 +376,241 @@ public class InternalClusterService extends AbstractLifecycleComponent void runTasksForExecutor(ClusterStateTaskExecutor executor) { + final ArrayList> toExecute = new ArrayList<>(); + final ArrayList sources = new ArrayList<>(); + synchronized (updateTasksPerExecutor) { + List pending = updateTasksPerExecutor.remove(executor); + if (pending != null) { + for (Iterator iter = pending.iterator(); iter.hasNext(); ) { + UpdateTask task = iter.next(); + if (task.processed.getAndSet(true) == false) { + logger.trace("will process [{}]", task.source); + toExecute.add((UpdateTask) task); + sources.add(task.source); + } else { + logger.trace("skipping [{}], already processed", task.source); + } + } + } + } + if (toExecute.isEmpty()) { + return; + } + final String source = Strings.collectionToCommaDelimitedString(sources); + if (!lifecycle.started()) { + logger.debug("processing [{}]: ignoring, cluster_service not started", source); + return; + } + logger.debug("processing [{}]: execute", source); + ClusterState previousClusterState = clusterState; + if (!previousClusterState.nodes().localNodeMaster() && executor.runOnlyOnMaster()) { + logger.debug("failing [{}]: local node is no longer master", source); + toExecute.stream().forEach(task -> task.listener.onNoLongerMaster(task.source)); + return; + } + ClusterStateTaskExecutor.Result result; + long startTimeNS = System.nanoTime(); + try { + List inputs = toExecute.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList()); + result = executor.execute(previousClusterState, inputs); + } catch (Throwable e) { + TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS))); + if (logger.isTraceEnabled()) { + StringBuilder sb = new StringBuilder("failed to execute cluster state update in ").append(executionTime).append(", state:\nversion [").append(previousClusterState.version()).append("], source [").append(source).append("]\n"); + sb.append(previousClusterState.nodes().prettyPrint()); + sb.append(previousClusterState.routingTable().prettyPrint()); + sb.append(previousClusterState.getRoutingNodes().prettyPrint()); + logger.trace(sb.toString(), e); + } + warnAboutSlowTaskIfNeeded(executionTime, source); + result = new ClusterStateTaskExecutor.Result(previousClusterState, Collections.nCopies(toExecute.size(), e)); + } + assert result.failures.size() == toExecute.size(); - public final ClusterStateUpdateTask updateTask; - - UpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) { - super(priority, source); - this.updateTask = updateTask; + ClusterState newClusterState = result.resultingState; + final ArrayList> proccessedListeners = new ArrayList<>(); + // fail all tasks that have failed and extract those that are waiting for results + for (int i = 0; i < toExecute.size(); i++) { + final UpdateTask task = toExecute.get(i); + final Throwable failure = result.failures.get(i); + if (failure == null) { + proccessedListeners.add(task); + } else { + task.listener.onFailure(task.source, failure); + } } - @Override - public void run() { - if (!lifecycle.started()) { - logger.debug("processing [{}]: ignoring, cluster_service not started", source); - return; - } - logger.debug("processing [{}]: execute", source); - ClusterState previousClusterState = clusterState; - if (!previousClusterState.nodes().localNodeMaster() && updateTask.runOnlyOnMaster()) { - logger.debug("failing [{}]: local node is no longer master", source); - updateTask.onNoLongerMaster(source); - return; - } - ClusterState newClusterState; - long startTimeNS = System.nanoTime(); - try { - newClusterState = updateTask.execute(previousClusterState); - } catch (Throwable e) { - TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS))); - if (logger.isTraceEnabled()) { - StringBuilder sb = new StringBuilder("failed to execute cluster state update in ").append(executionTime).append(", state:\nversion [").append(previousClusterState.version()).append("], source [").append(source).append("]\n"); - sb.append(previousClusterState.nodes().prettyPrint()); - sb.append(previousClusterState.routingTable().prettyPrint()); - sb.append(previousClusterState.getRoutingNodes().prettyPrint()); - logger.trace(sb.toString(), e); - } - warnAboutSlowTaskIfNeeded(executionTime, source); - updateTask.onFailure(source, e); - return; - } - - if (previousClusterState == newClusterState) { - if (updateTask instanceof AckedClusterStateUpdateTask) { + if (previousClusterState == newClusterState) { + for (UpdateTask task : proccessedListeners) { + if (task.listener instanceof AckedClusterStateTaskListener) { //no need to wait for ack if nothing changed, the update can be counted as acknowledged - ((AckedClusterStateUpdateTask) updateTask).onAllNodesAcked(null); + ((AckedClusterStateTaskListener) task.listener).onAllNodesAcked(null); } - updateTask.clusterStateProcessed(source, previousClusterState, newClusterState); - TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS))); - logger.debug("processing [{}]: took {} no change in cluster_state", source, executionTime); - warnAboutSlowTaskIfNeeded(executionTime, source); - return; + task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState); } + TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS))); + logger.debug("processing [{}]: took {} no change in cluster_state", source, executionTime); + warnAboutSlowTaskIfNeeded(executionTime, source); + return; + } - try { - Discovery.AckListener ackListener = new NoOpAckListener(); - if (newClusterState.nodes().localNodeMaster()) { - // only the master controls the version numbers - Builder builder = ClusterState.builder(newClusterState).incrementVersion(); - if (previousClusterState.routingTable() != newClusterState.routingTable()) { - builder.routingTable(RoutingTable.builder(newClusterState.routingTable()).version(newClusterState.routingTable().version() + 1).build()); - } - if (previousClusterState.metaData() != newClusterState.metaData()) { - builder.metaData(MetaData.builder(newClusterState.metaData()).version(newClusterState.metaData().version() + 1)); - } - newClusterState = builder.build(); - - if (updateTask instanceof AckedClusterStateUpdateTask) { - final AckedClusterStateUpdateTask ackedUpdateTask = (AckedClusterStateUpdateTask) updateTask; - if (ackedUpdateTask.ackTimeout() == null || ackedUpdateTask.ackTimeout().millis() == 0) { - ackedUpdateTask.onAckTimeout(); + try { + ArrayList ackListeners = new ArrayList<>(); + if (newClusterState.nodes().localNodeMaster()) { + // only the master controls the version numbers + Builder builder = ClusterState.builder(newClusterState).incrementVersion(); + if (previousClusterState.routingTable() != newClusterState.routingTable()) { + builder.routingTable(RoutingTable.builder(newClusterState.routingTable()).version(newClusterState.routingTable().version() + 1).build()); + } + if (previousClusterState.metaData() != newClusterState.metaData()) { + builder.metaData(MetaData.builder(newClusterState.metaData()).version(newClusterState.metaData().version() + 1)); + } + newClusterState = builder.build(); + for (UpdateTask task : proccessedListeners) { + if (task.listener instanceof AckedClusterStateTaskListener) { + final AckedClusterStateTaskListener ackedListener = (AckedClusterStateTaskListener) task.listener; + if (ackedListener.ackTimeout() == null || ackedListener.ackTimeout().millis() == 0) { + ackedListener.onAckTimeout(); } else { try { - ackListener = new AckCountDownListener(ackedUpdateTask, newClusterState.version(), newClusterState.nodes(), threadPool); + ackListeners.add(new AckCountDownListener(ackedListener, newClusterState.version(), newClusterState.nodes(), threadPool)); } catch (EsRejectedExecutionException ex) { if (logger.isDebugEnabled()) { logger.debug("Couldn't schedule timeout thread - node might be shutting down", ex); } //timeout straightaway, otherwise we could wait forever as the timeout thread has not started - ackedUpdateTask.onAckTimeout(); + ackedListener.onAckTimeout(); } } } } - - newClusterState.status(ClusterState.ClusterStateStatus.BEING_APPLIED); - - if (logger.isTraceEnabled()) { - StringBuilder sb = new StringBuilder("cluster state updated, source [").append(source).append("]\n"); - sb.append(newClusterState.prettyPrint()); - logger.trace(sb.toString()); - } else if (logger.isDebugEnabled()) { - logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), source); - } - - ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(source, newClusterState, previousClusterState); - // new cluster state, notify all listeners - final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta(); - if (nodesDelta.hasChanges() && logger.isInfoEnabled()) { - String summary = nodesDelta.shortSummary(); - if (summary.length() > 0) { - logger.info("{}, reason: {}", summary, source); - } - } - - // TODO, do this in parallel (and wait) - for (DiscoveryNode node : nodesDelta.addedNodes()) { - if (!nodeRequiresConnection(node)) { - continue; - } - try { - transportService.connectToNode(node); - } catch (Throwable e) { - // the fault detection will detect it as failed as well - logger.warn("failed to connect to node [" + node + "]", e); - } - } - - // if we are the master, publish the new state to all nodes - // we publish here before we send a notification to all the listeners, since if it fails - // we don't want to notify - if (newClusterState.nodes().localNodeMaster()) { - logger.debug("publishing cluster state version [{}]", newClusterState.version()); - try { - discoveryService.publish(clusterChangedEvent, ackListener); - } catch (Discovery.FailedToCommitClusterStateException t) { - logger.warn("failing [{}]: failed to commit cluster state version [{}]", t, source, newClusterState.version()); - updateTask.onFailure(source, t); - return; - } - } - - // update the current cluster state - clusterState = newClusterState; - logger.debug("set local cluster state to version {}", newClusterState.version()); - for (ClusterStateListener listener : preAppliedListeners) { - try { - listener.clusterChanged(clusterChangedEvent); - } catch (Exception ex) { - logger.warn("failed to notify ClusterStateListener", ex); - } - } - - for (DiscoveryNode node : nodesDelta.removedNodes()) { - try { - transportService.disconnectFromNode(node); - } catch (Throwable e) { - logger.warn("failed to disconnect to node [" + node + "]", e); - } - } - - newClusterState.status(ClusterState.ClusterStateStatus.APPLIED); - - for (ClusterStateListener listener : postAppliedListeners) { - try { - listener.clusterChanged(clusterChangedEvent); - } catch (Exception ex) { - logger.warn("failed to notify ClusterStateListener", ex); - } - } - - //manual ack only from the master at the end of the publish - if (newClusterState.nodes().localNodeMaster()) { - try { - ackListener.onNodeAck(newClusterState.nodes().localNode(), null); - } catch (Throwable t) { - logger.debug("error while processing ack for master node [{}]", t, newClusterState.nodes().localNode()); - } - } - - updateTask.clusterStateProcessed(source, previousClusterState, newClusterState); - - TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS))); - logger.debug("processing [{}]: took {} done applying updated cluster_state (version: {}, uuid: {})", source, executionTime, newClusterState.version(), newClusterState.stateUUID()); - warnAboutSlowTaskIfNeeded(executionTime, source); - } catch (Throwable t) { - TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS))); - StringBuilder sb = new StringBuilder("failed to apply updated cluster state in ").append(executionTime).append(":\nversion [").append(newClusterState.version()).append("], uuid [").append(newClusterState.stateUUID()).append("], source [").append(source).append("]\n"); - sb.append(newClusterState.nodes().prettyPrint()); - sb.append(newClusterState.routingTable().prettyPrint()); - sb.append(newClusterState.getRoutingNodes().prettyPrint()); - logger.warn(sb.toString(), t); - // TODO: do we want to call updateTask.onFailure here? } + final Discovery.AckListener ackListener = new DelegetingAckListener(ackListeners); + + newClusterState.status(ClusterState.ClusterStateStatus.BEING_APPLIED); + + if (logger.isTraceEnabled()) { + StringBuilder sb = new StringBuilder("cluster state updated, source [").append(source).append("]\n"); + sb.append(newClusterState.prettyPrint()); + logger.trace(sb.toString()); + } else if (logger.isDebugEnabled()) { + logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), source); + } + + ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(source, newClusterState, previousClusterState); + // new cluster state, notify all listeners + final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta(); + if (nodesDelta.hasChanges() && logger.isInfoEnabled()) { + String summary = nodesDelta.shortSummary(); + if (summary.length() > 0) { + logger.info("{}, reason: {}", summary, source); + } + } + + // TODO, do this in parallel (and wait) + for (DiscoveryNode node : nodesDelta.addedNodes()) { + if (!nodeRequiresConnection(node)) { + continue; + } + try { + transportService.connectToNode(node); + } catch (Throwable e) { + // the fault detection will detect it as failed as well + logger.warn("failed to connect to node [" + node + "]", e); + } + } + + // if we are the master, publish the new state to all nodes + // we publish here before we send a notification to all the listeners, since if it fails + // we don't want to notify + if (newClusterState.nodes().localNodeMaster()) { + logger.debug("publishing cluster state version [{}]", newClusterState.version()); + try { + discoveryService.publish(clusterChangedEvent, ackListener); + } catch (Discovery.FailedToCommitClusterStateException t) { + logger.warn("failing [{}]: failed to commit cluster state version [{}]", t, source, newClusterState.version()); + proccessedListeners.forEach(task -> task.listener.onFailure(task.source, t)); + return; + } + } + + // update the current cluster state + clusterState = newClusterState; + logger.debug("set local cluster state to version {}", newClusterState.version()); + for (ClusterStateListener listener : preAppliedListeners) { + try { + listener.clusterChanged(clusterChangedEvent); + } catch (Exception ex) { + logger.warn("failed to notify ClusterStateListener", ex); + } + } + + for (DiscoveryNode node : nodesDelta.removedNodes()) { + try { + transportService.disconnectFromNode(node); + } catch (Throwable e) { + logger.warn("failed to disconnect to node [" + node + "]", e); + } + } + + newClusterState.status(ClusterState.ClusterStateStatus.APPLIED); + + for (ClusterStateListener listener : postAppliedListeners) { + try { + listener.clusterChanged(clusterChangedEvent); + } catch (Exception ex) { + logger.warn("failed to notify ClusterStateListener", ex); + } + } + + //manual ack only from the master at the end of the publish + if (newClusterState.nodes().localNodeMaster()) { + try { + ackListener.onNodeAck(newClusterState.nodes().localNode(), null); + } catch (Throwable t) { + logger.debug("error while processing ack for master node [{}]", t, newClusterState.nodes().localNode()); + } + } + + for (UpdateTask task : proccessedListeners) { + task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState); + } + + TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS))); + logger.debug("processing [{}]: took {} done applying updated cluster_state (version: {}, uuid: {})", source, executionTime, newClusterState.version(), newClusterState.stateUUID()); + warnAboutSlowTaskIfNeeded(executionTime, source); + } catch (Throwable t) { + TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS))); + StringBuilder sb = new StringBuilder("failed to apply updated cluster state in ").append(executionTime).append(":\nversion [").append(newClusterState.version()).append("], uuid [").append(newClusterState.stateUUID()).append("], source [").append(source).append("]\n"); + sb.append(newClusterState.nodes().prettyPrint()); + sb.append(newClusterState.routingTable().prettyPrint()); + sb.append(newClusterState.getRoutingNodes().prettyPrint()); + logger.warn(sb.toString(), t); + // TODO: do we want to call updateTask.onFailure here? + } + + } + + class UpdateTask extends SourcePrioritizedRunnable { + + public final T task; + public final ClusterStateTaskConfig config; + public final ClusterStateTaskExecutor executor; + public final ClusterStateTaskListener listener; + public final AtomicBoolean processed = new AtomicBoolean(); + + UpdateTask(String source, T task, ClusterStateTaskConfig config, ClusterStateTaskExecutor executor, ClusterStateTaskListener listener) { + super(config.priority(), source); + this.task = task; + this.config = config; + this.executor = executor; + this.listener = listener; + } + + @Override + public void run() { + runTasksForExecutor(executor); } } @@ -729,13 +779,24 @@ public class InternalClusterService extends AbstractLifecycleComponent listeners; + + private DelegetingAckListener(List listeners) { + this.listeners = listeners; + } + @Override public void onNodeAck(DiscoveryNode node, @Nullable Throwable t) { + for (Discovery.AckListener listener : listeners) { + listener.onNodeAck(node, t); + } } @Override public void onTimeout() { + throw new UnsupportedOperationException("no timeout delegation"); } } @@ -743,20 +804,20 @@ public class InternalClusterService extends AbstractLifecycleComponent ackTimeoutCallback; private Throwable lastFailure; - AckCountDownListener(AckedClusterStateUpdateTask ackedUpdateTask, long clusterStateVersion, DiscoveryNodes nodes, ThreadPool threadPool) { - this.ackedUpdateTask = ackedUpdateTask; + AckCountDownListener(AckedClusterStateTaskListener ackedTaskListener, long clusterStateVersion, DiscoveryNodes nodes, ThreadPool threadPool) { + this.ackedTaskListener = ackedTaskListener; this.clusterStateVersion = clusterStateVersion; this.nodes = nodes; int countDown = 0; for (DiscoveryNode node : nodes) { - if (ackedUpdateTask.mustAck(node)) { + if (ackedTaskListener.mustAck(node)) { countDown++; } } @@ -764,7 +825,7 @@ public class InternalClusterService extends AbstractLifecycleComponent joinCallbacksToRespondTo = new ArrayList<>(); private boolean nodeAdded = false; + public ProcessJoinsTask(Priority priority) { + super(priority); + } + @Override public ClusterState execute(ClusterState currentState) { DiscoveryNodes.Builder nodesBuilder; diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index e5ec230fd66..03111d141ef 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -320,7 +320,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen } catch (FailedToCommitClusterStateException t) { // cluster service logs a WARN message logger.debug("failed to publish cluster state version [{}] (not enough nodes acknowledged, min master nodes [{}])", clusterChangedEvent.state().version(), electMaster.minimumMasterNodes()); - clusterService.submitStateUpdateTask("zen-disco-failed-to-publish", Priority.IMMEDIATE, new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("zen-disco-failed-to-publish", new ClusterStateUpdateTask(Priority.IMMEDIATE) { @Override public ClusterState execute(ClusterState currentState) { return rejoin(currentState, "failed to publish to min_master_nodes"); @@ -498,7 +498,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen return; } if (localNodeMaster()) { - clusterService.submitStateUpdateTask("zen-disco-node_left(" + node + ")", Priority.IMMEDIATE, new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("zen-disco-node_left(" + node + ")", new ClusterStateUpdateTask(Priority.IMMEDIATE) { @Override public ClusterState execute(ClusterState currentState) { DiscoveryNodes.Builder builder = DiscoveryNodes.builder(currentState.nodes()).remove(node.id()); @@ -538,7 +538,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen // nothing to do here... return; } - clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, Priority.IMMEDIATE, new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, new ClusterStateUpdateTask(Priority.IMMEDIATE) { @Override public ClusterState execute(ClusterState currentState) { if (currentState.nodes().get(node.id()) == null) { @@ -587,7 +587,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen // We only set the new value. If the master doesn't see enough nodes it will revoke it's mastership. return; } - clusterService.submitStateUpdateTask("zen-disco-minimum_master_nodes_changed", Priority.IMMEDIATE, new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("zen-disco-minimum_master_nodes_changed", new ClusterStateUpdateTask(Priority.IMMEDIATE) { @Override public ClusterState execute(ClusterState currentState) { // check if we have enough master nodes, if not, we need to move into joining the cluster again @@ -627,7 +627,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen logger.info("master_left [{}], reason [{}]", cause, masterNode, reason); - clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.IMMEDIATE, new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", new ClusterStateUpdateTask(Priority.IMMEDIATE) { @Override public boolean runOnlyOnMaster() { @@ -694,7 +694,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen } void processNextPendingClusterState(String reason) { - clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + reason + "])", Priority.URGENT, new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + reason + "])", new ClusterStateUpdateTask(Priority.URGENT) { @Override public boolean runOnlyOnMaster() { return false; @@ -1059,7 +1059,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen return; } logger.debug("got a ping from another master {}. resolving who should rejoin. current ping count: [{}]", pingRequest.masterNode(), pingsWhileMaster.get()); - clusterService.submitStateUpdateTask("ping from another master", Priority.IMMEDIATE, new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("ping from another master", new ClusterStateUpdateTask(Priority.IMMEDIATE) { @Override public ClusterState execute(ClusterState currentState) throws Exception { @@ -1114,7 +1114,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen class RejoinClusterRequestHandler implements TransportRequestHandler { @Override public void messageReceived(final RejoinClusterRequest request, final TransportChannel channel) throws Exception { - clusterService.submitStateUpdateTask("received a request to rejoin the cluster from [" + request.fromNodeId + "]", Priority.IMMEDIATE, new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("received a request to rejoin the cluster from [" + request.fromNodeId + "]", new ClusterStateUpdateTask(Priority.IMMEDIATE) { @Override public boolean runOnlyOnMaster() { diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java index 96a865f4da4..1fb6c06a73c 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java @@ -43,23 +43,14 @@ import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.ThreadPool; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.*; /** * @@ -711,8 +702,8 @@ public class ClusterServiceIT extends ESIntegTestCase { .build(); internalCluster().startNode(settings); ClusterService clusterService = internalCluster().getInstance(ClusterService.class); - BlockingTask block = new BlockingTask(); - clusterService.submitStateUpdateTask("test", Priority.IMMEDIATE, block); + BlockingTask block = new BlockingTask(Priority.IMMEDIATE); + clusterService.submitStateUpdateTask("test", block); int taskCount = randomIntBetween(5, 20); Priority[] priorities = Priority.values(); @@ -721,7 +712,7 @@ public class ClusterServiceIT extends ESIntegTestCase { CountDownLatch latch = new CountDownLatch(taskCount); for (int i = 0; i < taskCount; i++) { Priority priority = priorities[randomIntBetween(0, priorities.length - 1)]; - clusterService.submitStateUpdateTask("test", priority, new PrioritiezedTask(priority, latch, tasks)); + clusterService.submitStateUpdateTask("test", new PrioritiezedTask(priority, latch, tasks)); } block.release(); @@ -730,9 +721,9 @@ public class ClusterServiceIT extends ESIntegTestCase { Priority prevPriority = null; for (PrioritiezedTask task : tasks) { if (prevPriority == null) { - prevPriority = task.priority; + prevPriority = task.priority(); } else { - assertThat(task.priority.sameOrAfter(prevPriority), is(true)); + assertThat(task.priority().sameOrAfter(prevPriority), is(true)); } } } @@ -947,6 +938,10 @@ public class ClusterServiceIT extends ESIntegTestCase { private static class BlockingTask extends ClusterStateUpdateTask { private final CountDownLatch latch = new CountDownLatch(1); + public BlockingTask(Priority priority) { + super(priority); + } + @Override public ClusterState execute(ClusterState currentState) throws Exception { latch.await(); @@ -965,12 +960,11 @@ public class ClusterServiceIT extends ESIntegTestCase { private static class PrioritiezedTask extends ClusterStateUpdateTask { - private final Priority priority; private final CountDownLatch latch; private final List tasks; private PrioritiezedTask(Priority priority, CountDownLatch latch, List tasks) { - this.priority = priority; + super(priority); this.latch = latch; this.tasks = tasks; } diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index f9dbda217d0..b14792a2c33 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -25,11 +25,7 @@ import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -59,16 +55,7 @@ import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.discovery.ClusterDiscoveryConfiguration; -import org.elasticsearch.test.disruption.BlockClusterStateProcessing; -import org.elasticsearch.test.disruption.IntermittentLongGCDisruption; -import org.elasticsearch.test.disruption.LongGCDisruption; -import org.elasticsearch.test.disruption.NetworkDelaysPartition; -import org.elasticsearch.test.disruption.NetworkDisconnectPartition; -import org.elasticsearch.test.disruption.NetworkPartition; -import org.elasticsearch.test.disruption.NetworkUnresponsivePartition; -import org.elasticsearch.test.disruption.ServiceDisruptionScheme; -import org.elasticsearch.test.disruption.SingleNodeDisruption; -import org.elasticsearch.test.disruption.SlowClusterStateProcessing; +import org.elasticsearch.test.disruption.*; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportException; @@ -78,31 +65,15 @@ import org.elasticsearch.transport.TransportService; import org.junit.Before; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.*; @ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0) @ESIntegTestCase.SuppressLocalMode @@ -650,7 +621,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { // but will be queued and once the old master node un-freezes it gets executed. // The old master node will send this update + the cluster state where he is flagged as master to the other // nodes that follow the new master. These nodes should ignore this update. - internalCluster().getInstance(ClusterService.class, oldMasterNode).submitStateUpdateTask("sneaky-update", Priority.IMMEDIATE, new ClusterStateUpdateTask() { + internalCluster().getInstance(ClusterService.class, oldMasterNode).submitStateUpdateTask("sneaky-update", new ClusterStateUpdateTask(Priority.IMMEDIATE) { @Override public ClusterState execute(ClusterState currentState) throws Exception { return ClusterState.builder(currentState).build(); diff --git a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java index cb6a979d38d..fc4dd4f6487 100644 --- a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java +++ b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java @@ -27,13 +27,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.RoutingNode; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; @@ -56,11 +50,7 @@ import org.elasticsearch.test.disruption.BlockClusterStateProcessing; import org.elasticsearch.test.disruption.SingleNodeDisruption; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.transport.ConnectTransportException; -import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.*; import java.io.IOException; import java.nio.file.Files; @@ -407,7 +397,7 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase { // disable relocations when we do this, to make sure the shards are not relocated from node2 // due to rebalancing, and delete its content client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE)).get(); - internalCluster().getInstance(ClusterService.class, nonMasterNode).submitStateUpdateTask("test", Priority.IMMEDIATE, new ClusterStateUpdateTask() { + internalCluster().getInstance(ClusterService.class, nonMasterNode).submitStateUpdateTask("test", new ClusterStateUpdateTask(Priority.IMMEDIATE) { @Override public ClusterState execute(ClusterState currentState) throws Exception { IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder("test"); diff --git a/core/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/core/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index ffddcfc1619..51ae038ca0d 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/core/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -20,12 +20,7 @@ package org.elasticsearch.snapshots; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.metadata.SnapshotId; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.service.PendingClusterTask; @@ -208,7 +203,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase { private void addBlock() { // We should block after this task - add blocking cluster state update task - clusterService.submitStateUpdateTask("test_block", passThroughPriority, new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("test_block", new ClusterStateUpdateTask(passThroughPriority) { @Override public ClusterState execute(ClusterState currentState) throws Exception { while(System.currentTimeMillis() < stopWaitingAt) { diff --git a/test-framework/src/main/java/org/elasticsearch/test/cluster/NoopClusterService.java b/test-framework/src/main/java/org/elasticsearch/test/cluster/NoopClusterService.java index 834e7d540c4..cb3d643f555 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/cluster/NoopClusterService.java +++ b/test-framework/src/main/java/org/elasticsearch/test/cluster/NoopClusterService.java @@ -25,7 +25,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.service.PendingClusterTask; -import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.transport.DummyTransportAddress; @@ -115,12 +114,12 @@ public class NoopClusterService implements ClusterService { } @Override - public void submitStateUpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) { + public void submitStateUpdateTask(String source, ClusterStateUpdateTask updateTask) { } @Override - public void submitStateUpdateTask(String source, ClusterStateUpdateTask updateTask) { + public void submitStateUpdateTask(String source, T task, ClusterStateTaskConfig config, ClusterStateTaskExecutor executor, ClusterStateTaskListener listener) { } diff --git a/test-framework/src/main/java/org/elasticsearch/test/cluster/TestClusterService.java b/test-framework/src/main/java/org/elasticsearch/test/cluster/TestClusterService.java index b13963961a0..3845a71c45e 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/cluster/TestClusterService.java +++ b/test-framework/src/main/java/org/elasticsearch/test/cluster/TestClusterService.java @@ -28,7 +28,6 @@ import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.elasticsearch.cluster.service.PendingClusterTask; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.logging.ESLogger; @@ -40,10 +39,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Queue; +import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledFuture; @@ -183,31 +179,34 @@ public class TestClusterService implements ClusterService { } @Override - synchronized public void submitStateUpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) { - logger.debug("processing [{}]", source); - if (state().nodes().localNodeMaster() == false && updateTask.runOnlyOnMaster()) { - updateTask.onNoLongerMaster(source); - logger.debug("failed [{}], no longer master", source); - return; - } - ClusterState newState; - ClusterState previousClusterState = state; - try { - newState = updateTask.execute(previousClusterState); - } catch (Exception e) { - updateTask.onFailure(source, new ElasticsearchException("failed to process cluster state update task [" + source + "]", e)); - return; - } - setStateAndNotifyListeners(newState); - if (updateTask instanceof ClusterStateUpdateTask) { - ((ClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newState); - } - logger.debug("finished [{}]", source); + public void submitStateUpdateTask(String source, ClusterStateUpdateTask updateTask) { + submitStateUpdateTask(source, null, updateTask, updateTask, updateTask); } @Override - public void submitStateUpdateTask(String source, ClusterStateUpdateTask updateTask) { - submitStateUpdateTask(source, Priority.NORMAL, updateTask); + synchronized public void submitStateUpdateTask(String source, T task, ClusterStateTaskConfig config, ClusterStateTaskExecutor executor, ClusterStateTaskListener listener) { + logger.debug("processing [{}]", source); + if (state().nodes().localNodeMaster() == false && executor.runOnlyOnMaster()) { + listener.onNoLongerMaster(source); + logger.debug("failed [{}], no longer master", source); + return; + } + ClusterStateTaskExecutor.Result result; + ClusterState previousClusterState = state; + try { + result = executor.execute(previousClusterState, Arrays.asList(task)); + } catch (Exception e) { + result = new ClusterStateTaskExecutor.Result(previousClusterState, Arrays.asList(e)); + } + if (result.failures.get(0) != null) { + listener.onFailure(source, new ElasticsearchException("failed to process cluster state update task [" + source + "]", + result.failures.get(0))); + return; + } + setStateAndNotifyListeners(result.resultingState); + listener.clusterStateProcessed(source, previousClusterState, result.resultingState); + logger.debug("finished [{}]", source); + } @Override diff --git a/test-framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java b/test-framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java index 8154abfbd33..e318843e84f 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java +++ b/test-framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java @@ -58,7 +58,7 @@ public class BlockClusterStateProcessing extends SingleNodeDisruption { boolean success = disruptionLatch.compareAndSet(null, new CountDownLatch(1)); assert success : "startDisrupting called without waiting on stopDistrupting to complete"; final CountDownLatch started = new CountDownLatch(1); - clusterService.submitStateUpdateTask("service_disruption_block", Priority.IMMEDIATE, new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("service_disruption_block", new ClusterStateUpdateTask(Priority.IMMEDIATE) { @Override public boolean runOnlyOnMaster() { diff --git a/test-framework/src/main/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java b/test-framework/src/main/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java index 3c56f8305c0..b9c663686b1 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java +++ b/test-framework/src/main/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java @@ -102,7 +102,7 @@ public class SlowClusterStateProcessing extends SingleNodeDisruption { return false; } final AtomicBoolean stopped = new AtomicBoolean(false); - clusterService.submitStateUpdateTask("service_disruption_delay", Priority.IMMEDIATE, new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("service_disruption_delay", new ClusterStateUpdateTask(Priority.IMMEDIATE) { @Override public boolean runOnlyOnMaster() {