diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 7fc12eb8bab..f5afec1d5e3 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -39,13 +39,11 @@ import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.Callback; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.IndexService; @@ -93,26 +91,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent, Boolean> seenMappings = ConcurrentCollections.newConcurrentMap(); - // a list of shards that failed during recovery // we keep track of these shards in order to prevent repeated recovery of these shards on each cluster state update - private final ConcurrentMap failedShards = ConcurrentCollections.newConcurrentMap(); + private final ConcurrentMap failedShards = ConcurrentCollections.newConcurrentMap(); private final RestoreService restoreService; private final RepositoriesService repositoriesService; - static class FailedShard { - public final long version; - public final long timestamp; - - FailedShard(long version) { - this.version = version; - this.timestamp = System.currentTimeMillis(); - } - } - private final Object mutex = new Object(); private final FailedShardHandler failedShardHandler = new FailedShardHandler(); @@ -431,11 +415,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent> iterator = failedShards.entrySet().iterator(); - shards: - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - FailedShard failedShard = entry.getValue(); - IndexRoutingTable indexRoutingTable = routingTable.index(entry.getKey().getIndex()); - if (indexRoutingTable != null) { - IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(entry.getKey().id()); - if (shardRoutingTable != null) { - for (ShardRouting shardRouting : shardRoutingTable.assignedShards()) { - if (localNodeId.equals(shardRouting.currentNodeId())) { - // we have a timeout here just to make sure we don't have dangled failed shards for some reason - // its just another safely layer - if (shardRouting.version() == failedShard.version && ((now - failedShard.timestamp) < TimeValue.timeValueMinutes(60).millis())) { - // It's the same failed shard - keep it if it hasn't timed out - continue shards; - } else { - // Different version or expired, remove it - break; - } - } - } - } + RoutingTable routingTable = event.state().routingTable(); + for (Iterator> iterator = failedShards.entrySet().iterator(); iterator.hasNext(); ) { + Map.Entry entry = iterator.next(); + ShardId failedShardId = entry.getKey(); + ShardRouting failedShardRouting = entry.getValue(); + IndexRoutingTable indexRoutingTable = routingTable.index(failedShardId.getIndex()); + if (indexRoutingTable == null) { + iterator.remove(); + continue; + } + IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(failedShardId.id()); + if (shardRoutingTable == null) { + iterator.remove(); + continue; + } + if (shardRoutingTable.assignedShards().stream().noneMatch(shr -> shr.isSameAllocation(failedShardRouting))) { + iterator.remove(); } - iterator.remove(); } } @@ -788,7 +756,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent