diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 1b43a33627b..8a7340f2ae4 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -20,9 +20,7 @@ package org.elasticsearch.cluster.action.shard; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingService; @@ -46,7 +44,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry; @@ -64,7 +61,6 @@ public class ShardStateAction extends AbstractComponent { private final RoutingService routingService; private final BlockingQueue startedShardsQueue = ConcurrentCollections.newBlockingQueue(); - private final BlockingQueue failedShardQueue = ConcurrentCollections.newBlockingQueue(); @Inject public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService, @@ -141,54 +137,52 @@ public class ShardStateAction extends AbstractComponent { }); } + private final ShardFailedClusterStateHandler shardFailedClusterStateHandler = new ShardFailedClusterStateHandler(); + private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) { logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry); - failedShardQueue.add(shardRoutingEntry); - clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]", - new ClusterStateUpdateTask(Priority.HIGH) { + clusterService.submitStateUpdateTask( + "shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]", + shardRoutingEntry, + ClusterStateTaskConfig.build(Priority.HIGH), + shardFailedClusterStateHandler, + shardFailedClusterStateHandler); + } - @Override - public ClusterState execute(ClusterState currentState) { - if (shardRoutingEntry.processed) { - return currentState; + class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor, ClusterStateTaskListener { + @Override + public BatchResult execute(ClusterState currentState, List tasks) throws Exception { + BatchResult.Builder builder = BatchResult.builder(); + ClusterState accumulator = ClusterState.builder(currentState).build(); + for (ShardRoutingEntry task : tasks) { + task.processed = true; + try { + RoutingAllocation.Result result = allocationService.applyFailedShard( + currentState, + new FailedRerouteAllocation.FailedShard(task.shardRouting, task.message, task.failure)); + builder.success(task); + if (result.changed()) { + accumulator = ClusterState.builder(accumulator).routingResult(result).build(); + } + } catch (Throwable t) { + builder.failure(task, t); } - - List shardRoutingEntries = new ArrayList<>(); - failedShardQueue.drainTo(shardRoutingEntries); - - // nothing to process (a previous event has processed it already) - if (shardRoutingEntries.isEmpty()) { - return currentState; - } - - List shardRoutingsToBeApplied = new ArrayList<>(shardRoutingEntries.size()); - - // mark all entries as processed - for (ShardRoutingEntry entry : shardRoutingEntries) { - entry.processed = true; - shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(entry.shardRouting, entry.message, entry.failure)); - } - - RoutingAllocation.Result routingResult = allocationService.applyFailedShards(currentState, shardRoutingsToBeApplied); - if (!routingResult.changed()) { - return currentState; - } - return ClusterState.builder(currentState).routingResult(routingResult).build(); } + return builder.build(accumulator); + } - @Override - public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { if (oldState != newState && newState.getRoutingNodes().unassigned().size() > 0) { logger.trace("unassigned shards after shard failures. scheduling a reroute."); routingService.reroute("unassigned shards after shard failures, scheduling a reroute"); } - } - }); + } + + @Override + public void onFailure(String source, Throwable t) { + logger.error("unexpected failure during [{}]", t, source); + } } private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) { diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index f819d6fde0a..e19d51981a6 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -98,7 +98,11 @@ public class AllocationService extends AbstractComponent { } public RoutingAllocation.Result applyFailedShard(ClusterState clusterState, ShardRouting failedShard) { - return applyFailedShards(clusterState, Collections.singletonList(new FailedRerouteAllocation.FailedShard(failedShard, null, null))); + return applyFailedShard(clusterState, new FailedRerouteAllocation.FailedShard(failedShard, null, null)); + } + + public RoutingAllocation.Result applyFailedShard(ClusterState clusterState, FailedRerouteAllocation.FailedShard failedShard) { + return applyFailedShards(clusterState, Collections.singletonList(failedShard)); } /**