diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index c2ac791aa16..3a01ced6ebf 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingService; @@ -53,6 +54,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Locale; import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry; @@ -113,7 +115,7 @@ public class ShardStateAction extends AbstractComponent { @Override public void handleException(TransportException exp) { - logger.warn("failed to send failed shard to {}", exp, masterNode); + logger.warn("unexpected failure while sending request to [{}] to fail shard [{}]", exp, masterNode, shardRoutingEntry); listener.onShardFailedFailure(masterNode, exp); } }); @@ -122,22 +124,62 @@ public class ShardStateAction extends AbstractComponent { private class ShardFailedTransportHandler implements TransportRequestHandler { @Override public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception { - handleShardFailureOnMaster(request); - channel.sendResponse(TransportResponse.Empty.INSTANCE); + handleShardFailureOnMaster(request, new ClusterStateTaskListener() { + @Override + public void onFailure(String source, Throwable t) { + logger.error("unexpected failure while failing shard [{}]", t, request.shardRouting); + try { + channel.sendResponse(t); + } catch (Throwable channelThrowable) { + logger.warn("failed to send failure [{}] while failing shard [{}]", channelThrowable, t, request.shardRouting); + } + } + + @Override + public void onNoLongerMaster(String source) { + logger.error("no longer master while failing shard [{}]", request.shardRouting); + try { + channel.sendResponse(new NotMasterException(source)); + } catch (Throwable channelThrowable) { + logger.warn("failed to send no longer master while failing shard [{}]", channelThrowable, request.shardRouting); + } + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + try { + int numberOfUnassignedShards = newState.getRoutingNodes().unassigned().size(); + if (oldState != newState && numberOfUnassignedShards > 0) { + String reason = String.format(Locale.ROOT, "[%d] unassigned shards after failing shard [%s]", numberOfUnassignedShards, request.shardRouting); + if (logger.isTraceEnabled()) { + logger.trace(reason + ", scheduling a reroute"); + } + routingService.reroute(reason); + } + } finally { + try { + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } catch (Throwable channelThrowable) { + logger.warn("failed to send response while failing shard [{}]", channelThrowable, request.shardRouting); + } + } + } + } + ); } } - class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor, ClusterStateTaskListener { + class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor { @Override public BatchResult execute(ClusterState currentState, List tasks) throws Exception { BatchResult.Builder batchResultBuilder = BatchResult.builder(); - List shardRoutingsToBeApplied = new ArrayList<>(tasks.size()); + List failedShards = new ArrayList<>(tasks.size()); for (ShardRoutingEntry task : tasks) { - shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(task.shardRouting, task.message, task.failure)); + failedShards.add(new FailedRerouteAllocation.FailedShard(task.shardRouting, task.message, task.failure)); } ClusterState maybeUpdatedState = currentState; try { - RoutingAllocation.Result result = allocationService.applyFailedShards(currentState, shardRoutingsToBeApplied); + RoutingAllocation.Result result = allocationService.applyFailedShards(currentState, failedShards); if (result.changed()) { maybeUpdatedState = ClusterState.builder(currentState).routingResult(result).build(); } @@ -147,31 +189,18 @@ public class ShardStateAction extends AbstractComponent { } return batchResultBuilder.build(maybeUpdatedState); } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (oldState != newState && newState.getRoutingNodes().unassigned().size() > 0) { - logger.trace("unassigned shards after shard failures. scheduling a reroute."); - routingService.reroute("unassigned shards after shard failures, scheduling a reroute"); - } - } - - @Override - public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); - } } private final ShardFailedClusterStateHandler shardFailedClusterStateHandler = new ShardFailedClusterStateHandler(); - private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) { + private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry, ClusterStateTaskListener listener) { logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry); clusterService.submitStateUpdateTask( "shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]", shardRoutingEntry, ClusterStateTaskConfig.build(Priority.HIGH), shardFailedClusterStateHandler, - shardFailedClusterStateHandler); + listener); } public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) {