From 89a09b9bedc7665abc9b6c5dcadb3f487a6626de Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 15 Dec 2015 21:45:47 -0500 Subject: [PATCH] Master should wait on cluster state publication when failing a shard When a client sends a request to fail a shard to the master, the current behavior is that the master will submit the cluster state update task and then immediately send a successful response back to the client; additionally, if there are any failures while processing the cluster state update task to fail the shard, then the client will never be notified of these failures. This commit modifies the master behavior when handling requests to fail a shard. In particular, the master will now wait until successful publication of the cluster state update before notifying the request client that the shard is marked as failed; additionally, the client is now notified of any failures during the execution of the cluster state update task. Relates #14252 --- .../action/shard/ShardStateAction.java | 73 +++++++++++++------ 1 file changed, 51 insertions(+), 22 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index c2ac791aa16..3a01ced6ebf 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingService; @@ -53,6 +54,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Locale; import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry; @@ -113,7 +115,7 @@ public class ShardStateAction extends AbstractComponent { @Override public void handleException(TransportException exp) { - logger.warn("failed to send failed shard to {}", exp, masterNode); + logger.warn("unexpected failure while sending request to [{}] to fail shard [{}]", exp, masterNode, shardRoutingEntry); listener.onShardFailedFailure(masterNode, exp); } }); @@ -122,22 +124,62 @@ public class ShardStateAction extends AbstractComponent { private class ShardFailedTransportHandler implements TransportRequestHandler { @Override public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception { - handleShardFailureOnMaster(request); - channel.sendResponse(TransportResponse.Empty.INSTANCE); + handleShardFailureOnMaster(request, new ClusterStateTaskListener() { + @Override + public void onFailure(String source, Throwable t) { + logger.error("unexpected failure while failing shard [{}]", t, request.shardRouting); + try { + channel.sendResponse(t); + } catch (Throwable channelThrowable) { + logger.warn("failed to send failure [{}] while failing shard [{}]", channelThrowable, t, request.shardRouting); + } + } + + @Override + public void onNoLongerMaster(String source) { + logger.error("no longer master while failing shard [{}]", request.shardRouting); + try { + channel.sendResponse(new NotMasterException(source)); + } catch (Throwable channelThrowable) { + logger.warn("failed to send no longer master while failing shard [{}]", channelThrowable, request.shardRouting); + } + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + try { + int numberOfUnassignedShards = newState.getRoutingNodes().unassigned().size(); + if (oldState != newState && numberOfUnassignedShards > 0) { + String reason = String.format(Locale.ROOT, "[%d] unassigned shards after failing shard [%s]", numberOfUnassignedShards, request.shardRouting); + if (logger.isTraceEnabled()) { + logger.trace(reason + ", scheduling a reroute"); + } + routingService.reroute(reason); + } + } finally { + try { + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } catch (Throwable channelThrowable) { + logger.warn("failed to send response while failing shard [{}]", channelThrowable, request.shardRouting); + } + } + } + } + ); } } - class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor, ClusterStateTaskListener { + class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor { @Override public BatchResult execute(ClusterState currentState, List tasks) throws Exception { BatchResult.Builder batchResultBuilder = BatchResult.builder(); - List shardRoutingsToBeApplied = new ArrayList<>(tasks.size()); + List failedShards = new ArrayList<>(tasks.size()); for (ShardRoutingEntry task : tasks) { - shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(task.shardRouting, task.message, task.failure)); + failedShards.add(new FailedRerouteAllocation.FailedShard(task.shardRouting, task.message, task.failure)); } ClusterState maybeUpdatedState = currentState; try { - RoutingAllocation.Result result = allocationService.applyFailedShards(currentState, shardRoutingsToBeApplied); + RoutingAllocation.Result result = allocationService.applyFailedShards(currentState, failedShards); if (result.changed()) { maybeUpdatedState = ClusterState.builder(currentState).routingResult(result).build(); } @@ -147,31 +189,18 @@ public class ShardStateAction extends AbstractComponent { } return batchResultBuilder.build(maybeUpdatedState); } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (oldState != newState && newState.getRoutingNodes().unassigned().size() > 0) { - logger.trace("unassigned shards after shard failures. scheduling a reroute."); - routingService.reroute("unassigned shards after shard failures, scheduling a reroute"); - } - } - - @Override - public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); - } } private final ShardFailedClusterStateHandler shardFailedClusterStateHandler = new ShardFailedClusterStateHandler(); - private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) { + private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry, ClusterStateTaskListener listener) { logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry); clusterService.submitStateUpdateTask( "shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]", shardRoutingEntry, ClusterStateTaskConfig.build(Priority.HIGH), shardFailedClusterStateHandler, - shardFailedClusterStateHandler); + listener); } public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) {