Reduced the number of ClusterStateUpdateTask variants

In the past ClusterStateUpdateTask was an interface and we had various derived marker interfaces to control behavior. Since then we moved ClusterStateUpdateTask to be an abstract class but we kept the old hierarchy of implementations. All of those (but the AckedClusterStateUpdateTask) can be folded into ClusterStateUpdateTask, adding correct default behavior.

Closes #13735
This commit is contained in:
Boaz Leskes 2015-09-23 12:05:25 +02:00
parent d6c1880f08
commit d62f864317
29 changed files with 199 additions and 358 deletions

View File

@ -73,7 +73,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
protected void masterOperation(final ClusterHealthRequest request, final ClusterState unusedState, final ActionListener<ClusterHealthResponse> listener) {
if (request.waitForEvents() != null) {
final long endTimeMS = TimeValue.nsecToMSec(System.nanoTime()) + request.timeout().millis();
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", request.waitForEvents(), new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", request.waitForEvents(), new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return currentState;

View File

@ -28,7 +28,7 @@ import org.elasticsearch.common.unit.TimeValue;
* An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when
* all the nodes have acknowledged a cluster state update request
*/
public abstract class AckedClusterStateUpdateTask<Response> extends TimeoutClusterStateUpdateTask {
public abstract class AckedClusterStateUpdateTask<Response> extends ClusterStateUpdateTask {
private final ActionListener<Response> listener;
private final AckedRequest request;
@ -73,11 +73,6 @@ public abstract class AckedClusterStateUpdateTask<Response> extends TimeoutClust
listener.onFailure(t);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
/**
* Acknowledgement timeout, maximum time interval to wait for acknowledgements
*/

View File

@ -1,32 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;
/**
* This is a marker interface to indicate that the task should be executed
* even if the current node is not a master.
*/
public abstract class ClusterStateNonMasterUpdateTask extends ClusterStateUpdateTask {
@Override
public boolean runOnlyOnMaster() {
return false;
}
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.cluster;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
/**
@ -51,4 +53,22 @@ abstract public class ClusterStateUpdateTask {
public void onNoLongerMaster(String source) {
onFailure(source, new EsRejectedExecutionException("no longer master. source: [" + source + "]"));
}
/**
* Called when the result of the {@link #execute(ClusterState)} have been processed
* properly by all listeners.
*/
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
/**
* If the cluster state update task wasn't processed by the provided timeout, call
* {@link #onFailure(String, Throwable)}. May return null to indicate no timeout is needed (default).
*/
@Nullable
public TimeValue timeout() {
return null;
}
}

View File

@ -1,31 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;
/**
* A combination between {@link org.elasticsearch.cluster.ProcessedClusterStateUpdateTask} and
* {@link org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask} to allow easy creation of anonymous classes
*/
abstract public class ProcessedClusterStateNonMasterUpdateTask extends ProcessedClusterStateUpdateTask {
@Override
public boolean runOnlyOnMaster() {
return false;
}
}

View File

@ -1,33 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;
/**
* An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when
* the cluster state update has been processed.
*/
public abstract class ProcessedClusterStateUpdateTask extends ClusterStateUpdateTask {
/**
* Called when the result of the {@link #execute(ClusterState)} have been processed
* properly by all listeners.
*/
public abstract void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState);
}

View File

@ -1,35 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;
import org.elasticsearch.common.unit.TimeValue;
/**
* An extension interface to {@link org.elasticsearch.cluster.ClusterStateUpdateTask} that allows to associate
* a timeout.
*/
abstract public class TimeoutClusterStateUpdateTask extends ProcessedClusterStateUpdateTask {
/**
* If the cluster state update task wasn't processed by the provided timeout, call
* {@link #onFailure(String, Throwable)}
*/
abstract public TimeValue timeout();
}

View File

@ -23,11 +23,10 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@ -37,7 +36,6 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.threadpool.ThreadPool;
@ -129,7 +127,7 @@ public class ShardStateAction extends AbstractComponent {
private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) {
logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
failedShardQueue.add(shardRoutingEntry);
clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]", Priority.HIGH, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {

View File

@ -22,7 +22,7 @@ package org.elasticsearch.cluster.metadata;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.routing.RoutingTable;
@ -100,7 +100,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
private void deleteIndex(final Request request, final Listener userListener, Semaphore mdLock) {
final DeleteIndexListener listener = new DeleteIndexListener(mdLock, userListener);
clusterService.submitStateUpdateTask("delete-index [" + request.index + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("delete-index [" + request.index + "]", Priority.URGENT, new ClusterStateUpdateTask() {
@Override
public TimeValue timeout() {

View File

@ -23,7 +23,7 @@ import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.ValidationException;
@ -36,13 +36,7 @@ import org.elasticsearch.indices.IndexTemplateAlreadyExistsException;
import org.elasticsearch.indices.IndexTemplateMissingException;
import org.elasticsearch.indices.InvalidIndexTemplateException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.*;
/**
* Service responsible for submitting index templates updates
@ -62,7 +56,7 @@ public class MetaDataIndexTemplateService extends AbstractComponent {
}
public void removeTemplates(final RemoveRequest request, final RemoveListener listener) {
clusterService.submitStateUpdateTask("remove-index-template [" + request.name + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("remove-index-template [" + request.name + "]", Priority.URGENT, new ClusterStateUpdateTask() {
@Override
public TimeValue timeout() {
@ -149,7 +143,7 @@ public class MetaDataIndexTemplateService extends AbstractComponent {
}
final IndexTemplateMetaData template = templateBuilder.build();
clusterService.submitStateUpdateTask("create-index-template [" + request.name + "], cause [" + request.cause + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("create-index-template [" + request.name + "], cause [" + request.cause + "]", Priority.URGENT, new ClusterStateUpdateTask() {
@Override
public TimeValue timeout() {

View File

@ -25,7 +25,7 @@ import org.elasticsearch.action.admin.indices.mapping.put.PutMappingClusterState
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.Tuple;
@ -43,14 +43,7 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidTypeNameException;
import org.elasticsearch.percolator.PercolatorService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
/**
* Service responsible for submitting mapping changes
*/
@ -301,7 +294,7 @@ public class MetaDataMappingService extends AbstractComponent {
insertOrder = ++refreshOrUpdateInsertOrder;
refreshOrUpdateQueue.add(new RefreshTask(index, indexUUID, types));
}
clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", Priority.HIGH, new ClusterStateUpdateTask() {
private volatile List<MappingTask> allTasks;
@Override

View File

@ -41,8 +41,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.StringText;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.common.util.concurrent.*;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.node.settings.NodeSettingsService;
@ -275,15 +275,14 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
try {
final UpdateTask task = new UpdateTask(source, priority, updateTask);
if (updateTask instanceof TimeoutClusterStateUpdateTask) {
final TimeoutClusterStateUpdateTask timeoutUpdateTask = (TimeoutClusterStateUpdateTask) updateTask;
updateTasksExecutor.execute(task, threadPool.scheduler(), timeoutUpdateTask.timeout(), new Runnable() {
if (updateTask.timeout() != null) {
updateTasksExecutor.execute(task, threadPool.scheduler(), updateTask.timeout(), new Runnable() {
@Override
public void run() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
timeoutUpdateTask.onFailure(task.source(), new ProcessClusterEventTimeoutException(timeoutUpdateTask.timeout(), task.source()));
updateTask.onFailure(task.source(), new ProcessClusterEventTimeoutException(updateTask.timeout(), task.source()));
}
});
}
@ -401,9 +400,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
//no need to wait for ack if nothing changed, the update can be counted as acknowledged
((AckedClusterStateUpdateTask) updateTask).onAllNodesAcked(null);
}
if (updateTask instanceof ProcessedClusterStateUpdateTask) {
((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newClusterState);
}
updateTask.clusterStateProcessed(source, previousClusterState, newClusterState);
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
logger.debug("processing [{}]: took {} no change in cluster_state", source, executionTime);
warnAboutSlowTaskIfNeeded(executionTime, source);
@ -526,9 +523,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
}
if (updateTask instanceof ProcessedClusterStateUpdateTask) {
((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newClusterState);
}
updateTask.clusterStateProcessed(source, previousClusterState, newClusterState);
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
logger.debug("processing [{}]: took {} done applying updated cluster_state (version: {}, uuid: {})", source, executionTime, newClusterState.version(), newClusterState.stateUUID());

View File

@ -124,7 +124,13 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
// we are the first master (and the master)
master = true;
final LocalDiscovery master = firstMaster;
clusterService.submitStateUpdateTask("local-disco-initial_connect(master)", new ProcessedClusterStateNonMasterUpdateTask() {
clusterService.submitStateUpdateTask("local-disco-initial_connect(master)", new ClusterStateUpdateTask() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
@ -150,7 +156,12 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
} else if (firstMaster != null) {
// update as fast as we can the local node state with the new metadata (so we create indices for example)
final ClusterState masterState = firstMaster.clusterService.state();
clusterService.submitStateUpdateTask("local-disco(detected_master)", new ClusterStateNonMasterUpdateTask() {
clusterService.submitStateUpdateTask("local-disco(detected_master)", new ClusterStateUpdateTask() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public ClusterState execute(ClusterState currentState) {
// make sure we have the local node id set, we might need it as a result of the new metadata
@ -166,7 +177,12 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
// tell the master to send the fact that we are here
final LocalDiscovery master = firstMaster;
firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode + "])", new ProcessedClusterStateNonMasterUpdateTask() {
firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode + "])", new ClusterStateUpdateTask() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
@ -230,7 +246,12 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
}
final LocalDiscovery master = firstMaster;
master.clusterService.submitStateUpdateTask("local-disco-update", new ClusterStateNonMasterUpdateTask() {
master.clusterService.submitStateUpdateTask("local-disco-update", new ClusterStateUpdateTask() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes newNodes = currentState.nodes().removeDeadMembers(newMembers, master.localNode.id());
@ -351,7 +372,12 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
assert nodeSpecificClusterState.nodes().masterNode() != null : "received a cluster state without a master";
assert !nodeSpecificClusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock()) : "received a cluster state with a master block";
discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateNonMasterUpdateTask() {
discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ClusterStateUpdateTask() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public ClusterState execute(ClusterState currentState) {
if (currentState.supersedes(nodeSpecificClusterState)) {

View File

@ -21,8 +21,7 @@ package org.elasticsearch.discovery.zen;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -133,7 +132,13 @@ public class NodeJoinController extends AbstractComponent {
/** utility method to fail the given election context under the cluster state thread */
private void failContext(final ElectionContext context, final String reason, final Throwable throwable) {
clusterService.submitStateUpdateTask("zen-disco-join(failure [" + reason + "])", Priority.IMMEDIATE, new ClusterStateNonMasterUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-join(failure [" + reason + "])", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
context.onFailure(throwable);
@ -343,7 +348,7 @@ public class NodeJoinController extends AbstractComponent {
* Processes any pending joins via a ClusterState update task.
* Note: this task automatically fails (and fails all pending joins) if the current node is not marked as master
*/
class ProcessJoinsTask extends ProcessedClusterStateUpdateTask {
class ProcessJoinsTask extends ClusterStateUpdateTask {
private final List<MembershipAction.JoinCallback> joinCallbacksToRespondTo = new ArrayList<>();
private boolean nodeAdded = false;

View File

@ -210,7 +210,13 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
this.nodeJoinController = new NodeJoinController(clusterService, routingService, discoverySettings, settings);
// start the join thread from a cluster state update. See {@link JoinThreadControl} for details.
clusterService.submitStateUpdateTask("initial_join", new ClusterStateNonMasterUpdateTask() {
clusterService.submitStateUpdateTask("initial_join", new ClusterStateUpdateTask() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered
@ -397,7 +403,12 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
// finalize join through the cluster state update thread
final DiscoveryNode finalMasterNode = masterNode;
clusterService.submitStateUpdateTask("finalize_join (" + masterNode + ")", new ClusterStateNonMasterUpdateTask() {
clusterService.submitStateUpdateTask("finalize_join (" + masterNode + ")", new ClusterStateUpdateTask() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
if (!success) {
@ -523,7 +534,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
// nothing to do here...
return;
}
clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, Priority.IMMEDIATE, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, Priority.IMMEDIATE, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (currentState.nodes().get(node.id()) == null) {
@ -570,7 +581,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
// We only set the new value. If the master doesn't see enough nodes it will revoke it's mastership.
return;
}
clusterService.submitStateUpdateTask("zen-disco-minimum_master_nodes_changed", Priority.IMMEDIATE, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-minimum_master_nodes_changed", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
// check if we have enough master nodes, if not, we need to move into joining the cluster again
@ -610,7 +621,13 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
logger.info("master_left [{}], reason [{}]", masterNode, reason);
clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.IMMEDIATE, new ProcessedClusterStateNonMasterUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public ClusterState execute(ClusterState currentState) {
if (!masterNode.id().equals(currentState.nodes().masterNodeId())) {
@ -671,7 +688,12 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
void processNextPendingClusterState(String reason) {
clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + reason + "])", Priority.URGENT, new ProcessedClusterStateNonMasterUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + reason + "])", Priority.URGENT, new ClusterStateUpdateTask() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
ClusterState newClusterState = null;
@Override
@ -1086,7 +1108,13 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
class RejoinClusterRequestHandler implements TransportRequestHandler<RejoinClusterRequest> {
@Override
public void messageReceived(final RejoinClusterRequest request, final TransportChannel channel) throws Exception {
clusterService.submitStateUpdateTask("received a request to rejoin the cluster from [" + request.fromNodeId + "]", Priority.IMMEDIATE, new ClusterStateNonMasterUpdateTask() {
clusterService.submitStateUpdateTask("received a request to rejoin the cluster from [" + request.fromNodeId + "]", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public ClusterState execute(ClusterState currentState) {
try {
@ -1097,11 +1125,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return rejoin(currentState, "received a request to rejoin the cluster from [" + request.fromNodeId + "]");
}
@Override
public void onNoLongerMaster(String source) {
// already logged
}
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);

View File

@ -23,7 +23,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateNonMasterUpdateTask;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
@ -342,21 +342,23 @@ public class MasterFaultDetection extends FaultDetection {
if (!nodes.localNodeMaster() || !nodes.nodeExists(request.nodeId)) {
logger.trace("checking ping from [{}] under a cluster state thread", request.nodeId);
clusterService.submitStateUpdateTask("master ping (from: [" + request.nodeId + "])", new ProcessedClusterStateNonMasterUpdateTask() {
clusterService.submitStateUpdateTask("master ping (from: [" + request.nodeId + "])", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// if we are no longer master, fail...
DiscoveryNodes nodes = currentState.nodes();
if (!nodes.localNodeMaster()) {
throw new NotMasterException("local node is not master");
}
if (!nodes.nodeExists(request.nodeId)) {
throw new NodeDoesNotExistOnMasterException();
}
return currentState;
}
@Override
public void onNoLongerMaster(String source) {
onFailure(source, new NotMasterException("local node is not master"));
}
@Override
public void onFailure(String source, @Nullable Throwable t) {
if (t == null) {

View File

@ -214,7 +214,7 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
@Override
public void onSuccess(final ClusterState recoveredState) {
logger.trace("successful state recovery, importing cluster state...");
clusterService.submitStateUpdateTask("local-gateway-elected-state", new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("local-gateway-elected-state", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
assert currentState.metaData().indices().isEmpty();

View File

@ -21,7 +21,7 @@ package org.elasticsearch.gateway;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
@ -112,7 +112,7 @@ public class LocalAllocateDangledIndices extends AbstractComponent {
for (int i = 0; i < request.indices.length; i++) {
indexNames[i] = request.indices[i].index();
}
clusterService.submitStateUpdateTask("allocation dangled indices " + Arrays.toString(indexNames), new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("allocation dangled indices " + Arrays.toString(indexNames), new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (currentState.blocks().disableStatePersistence()) {

View File

@ -287,7 +287,12 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
return;
}
clusterService.submitStateUpdateTask("indices_store ([" + shardId + "] active fully on other nodes)", new ClusterStateNonMasterUpdateTask() {
clusterService.submitStateUpdateTask("indices_store ([" + shardId + "] active fully on other nodes)", new ClusterStateUpdateTask() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
if (clusterState.getVersion() != currentState.getVersion()) {

View File

@ -27,28 +27,11 @@ import com.google.common.collect.ImmutableSet;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.RestoreInProgress.ShardRestoreStatus;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RestoreSource;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.metadata.*;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.settings.ClusterDynamicSettings;
@ -70,36 +53,15 @@ import org.elasticsearch.index.shard.StoreRecoveryService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_INDEX_UUID;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_LEGACY_ROUTING_HASH_FUNCTION;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_LEGACY_ROUTING_USE_TYPE;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_MINIMUM_COMPATIBLE;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_UPGRADED;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX_CLOSED_BLOCK;
/**
@ -211,7 +173,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
// Now we can start the actual restore process by adding shards to be recovered in the cluster state
// and updating cluster metadata (global and index) as needed
clusterService.submitStateUpdateTask(request.cause(), new TimeoutClusterStateUpdateTask() {
clusterService.submitStateUpdateTask(request.cause(), new ClusterStateUpdateTask() {
RestoreInfo restoreInfo = null;
@Override
@ -525,7 +487,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
logger.trace("received updated snapshot restore state [{}]", request);
updatedSnapshotStateQueue.add(request);
clusterService.submitStateUpdateTask("update snapshot state", new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("update snapshot state", new ClusterStateUpdateTask() {
private final List<UpdateIndexShardRestoreStatusRequest> drainedRequests = new ArrayList<>();
private Map<SnapshotId, Tuple<RestoreInfo, Map<ShardId, ShardRestoreStatus>>> batchedRestoreInfo = null;

View File

@ -24,21 +24,10 @@ import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.metadata.*;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
@ -61,14 +50,7 @@ import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import static org.elasticsearch.cluster.SnapshotsInProgress.completed;
@ -182,7 +164,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
public void createSnapshot(final SnapshotRequest request, final CreateSnapshotListener listener) {
final SnapshotId snapshotId = new SnapshotId(request.repository(), request.name());
validate(snapshotId);
clusterService.submitStateUpdateTask(request.cause(), new TimeoutClusterStateUpdateTask() {
clusterService.submitStateUpdateTask(request.cause(), new ClusterStateUpdateTask() {
private SnapshotsInProgress.Entry newSnapshot = null;
@ -303,7 +285,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
endSnapshot(snapshot);
return;
}
clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshotId().getSnapshot() + "]", new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshotId().getSnapshot() + "]", new ClusterStateUpdateTask() {
boolean accepted = false;
SnapshotsInProgress.Entry updatedSnapshot;
String failure = null;
@ -805,7 +787,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
* @param t exception if snapshot failed
*/
private void removeSnapshotFromClusterState(final SnapshotId snapshotId, final SnapshotInfo snapshot, final Throwable t) {
clusterService.submitStateUpdateTask("remove snapshot metadata", new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
@ -860,7 +842,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
*/
public void deleteSnapshot(final SnapshotId snapshotId, final DeleteSnapshotListener listener) {
validate(snapshotId);
clusterService.submitStateUpdateTask("delete snapshot", new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask() {
boolean waitForSnapshot = false;

View File

@ -22,11 +22,7 @@ package org.elasticsearch.tribe;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
@ -50,11 +46,7 @@ import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.rest.RestStatus;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
/**
@ -217,7 +209,12 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
@Override
public void clusterChanged(final ClusterChangedEvent event) {
logger.debug("[{}] received cluster event, [{}]", tribeName, event.source());
clusterService.submitStateUpdateTask("cluster event from " + tribeName + ", " + event.source(), new ClusterStateNonMasterUpdateTask() {
clusterService.submitStateUpdateTask("cluster event from " + tribeName + ", " + event.source(), new ClusterStateUpdateTask() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
ClusterState tribeState = event.state();

View File

@ -42,12 +42,7 @@ import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -55,11 +50,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.test.ESIntegTestCase.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.*;
/**
*
@ -100,7 +91,7 @@ public class ClusterServiceIT extends ESIntegTestCase {
final CountDownLatch timedOut = new CountDownLatch(1);
final AtomicBoolean executeCalled = new AtomicBoolean();
clusterService1.submitStateUpdateTask("test2", new TimeoutClusterStateUpdateTask() {
clusterService1.submitStateUpdateTask("test2", new ClusterStateUpdateTask() {
@Override
public TimeValue timeout() {
return TimeValue.timeValueMillis(2);
@ -325,7 +316,12 @@ public class ClusterServiceIT extends ESIntegTestCase {
taskFailed[0] = true;
final CountDownLatch latch2 = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test", new ClusterStateNonMasterUpdateTask() {
clusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
taskFailed[0] = false;
@ -525,7 +521,7 @@ public class ClusterServiceIT extends ESIntegTestCase {
invoked1.await();
final CountDownLatch invoked2 = new CountDownLatch(9);
for (int i = 2; i <= 10; i++) {
clusterService.submitStateUpdateTask(Integer.toString(i), new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask(Integer.toString(i), new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return currentState;
@ -758,7 +754,7 @@ public class ClusterServiceIT extends ESIntegTestCase {
rootLogger.addAppender(mockAppender);
try {
final CountDownLatch latch = new CountDownLatch(4);
clusterService1.submitStateUpdateTask("test1", new ProcessedClusterStateUpdateTask() {
clusterService1.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return currentState;
@ -774,7 +770,7 @@ public class ClusterServiceIT extends ESIntegTestCase {
fail();
}
});
clusterService1.submitStateUpdateTask("test2", new ProcessedClusterStateUpdateTask() {
clusterService1.submitStateUpdateTask("test2", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
@ -790,7 +786,7 @@ public class ClusterServiceIT extends ESIntegTestCase {
latch.countDown();
}
});
clusterService1.submitStateUpdateTask("test3", new ProcessedClusterStateUpdateTask() {
clusterService1.submitStateUpdateTask("test3", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState).incrementVersion().build();
@ -808,7 +804,7 @@ public class ClusterServiceIT extends ESIntegTestCase {
});
// Additional update task to make sure all previous logging made it to the logger
// We don't check logging for this on since there is no guarantee that it will occur before our check
clusterService1.submitStateUpdateTask("test4", new ProcessedClusterStateUpdateTask() {
clusterService1.submitStateUpdateTask("test4", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return currentState;
@ -851,7 +847,7 @@ public class ClusterServiceIT extends ESIntegTestCase {
try {
final CountDownLatch latch = new CountDownLatch(5);
final CountDownLatch processedFirstTask = new CountDownLatch(1);
clusterService1.submitStateUpdateTask("test1", new ProcessedClusterStateUpdateTask() {
clusterService1.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Thread.sleep(100);
@ -874,7 +870,7 @@ public class ClusterServiceIT extends ESIntegTestCase {
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder()
.put(InternalClusterService.SETTING_CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD, "10ms")));
clusterService1.submitStateUpdateTask("test2", new ProcessedClusterStateUpdateTask() {
clusterService1.submitStateUpdateTask("test2", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Thread.sleep(100);
@ -891,7 +887,7 @@ public class ClusterServiceIT extends ESIntegTestCase {
latch.countDown();
}
});
clusterService1.submitStateUpdateTask("test3", new ProcessedClusterStateUpdateTask() {
clusterService1.submitStateUpdateTask("test3", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Thread.sleep(100);
@ -908,7 +904,7 @@ public class ClusterServiceIT extends ESIntegTestCase {
fail();
}
});
clusterService1.submitStateUpdateTask("test4", new ProcessedClusterStateUpdateTask() {
clusterService1.submitStateUpdateTask("test4", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Thread.sleep(100);
@ -927,7 +923,7 @@ public class ClusterServiceIT extends ESIntegTestCase {
});
// Additional update task to make sure all previous logging made it to the logger
// We don't check logging for this on since there is no guarantee that it will occur before our check
clusterService1.submitStateUpdateTask("test5", new ProcessedClusterStateUpdateTask() {
clusterService1.submitStateUpdateTask("test5", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return currentState;

View File

@ -372,7 +372,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
final AtomicReference<Throwable> failure = new AtomicReference<>();
logger.debug("--> submitting for cluster state to be rejected");
final ClusterService masterClusterService = internalCluster().clusterService(master);
masterClusterService.submitStateUpdateTask("test", new ProcessedClusterStateUpdateTask() {
masterClusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();

View File

@ -35,7 +35,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaData.Custom;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
@ -81,18 +81,8 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import static org.elasticsearch.test.ESIntegTestCase.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
import static org.hamcrest.Matchers.*;
/**
*/
@ -272,7 +262,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
private void updateClusterState(final ClusterStateUpdater updater) throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
clusterService.submitStateUpdateTask("test", new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return updater.execute(currentState);

View File

@ -29,11 +29,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRes
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStage;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.*;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
@ -43,7 +39,7 @@ import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress.Entry;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
@ -83,24 +79,8 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.index.shard.IndexShard.INDEX_REFRESH_INTERVAL;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesExist;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesMissing;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertIndexTemplateExists;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertIndexTemplateMissing;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
import static org.hamcrest.Matchers.*;
public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCase {
@ -1839,7 +1819,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
logger.info("--> emulate an orphan snapshot");
clusterService.submitStateUpdateTask("orphan snapshot test", new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("orphan snapshot test", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {

View File

@ -32,7 +32,6 @@ import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
@ -200,8 +199,8 @@ public class TestClusterService implements ClusterService {
return;
}
setStateAndNotifyListeners(newState);
if (updateTask instanceof ProcessedClusterStateUpdateTask) {
((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newState);
if (updateTask instanceof ClusterStateUpdateTask) {
((ClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newState);
}
logger.debug("finished [{}]", source);
}

View File

@ -20,7 +20,7 @@ package org.elasticsearch.test.disruption;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.InternalTestCluster;
@ -58,7 +58,12 @@ public class BlockClusterStateProcessing extends SingleNodeDisruption {
boolean success = disruptionLatch.compareAndSet(null, new CountDownLatch(1));
assert success : "startDisrupting called without waiting on stopDistrupting to complete";
final CountDownLatch started = new CountDownLatch(1);
clusterService.submitStateUpdateTask("service_disruption_block", Priority.IMMEDIATE, new ClusterStateNonMasterUpdateTask() {
clusterService.submitStateUpdateTask("service_disruption_block", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {

View File

@ -20,7 +20,7 @@ package org.elasticsearch.test.disruption;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.InternalTestCluster;
@ -102,7 +102,12 @@ public class SlowClusterStateProcessing extends SingleNodeDisruption {
return false;
}
final AtomicBoolean stopped = new AtomicBoolean(false);
clusterService.submitStateUpdateTask("service_disruption_delay", Priority.IMMEDIATE, new ClusterStateNonMasterUpdateTask() {
clusterService.submitStateUpdateTask("service_disruption_delay", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {