From ed2740a50a06b6afa5c82cbb5cbd1df19a8eb84c Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Mon, 19 Aug 2013 14:30:07 -0400 Subject: [PATCH] Don't recover a shard if it just failed on this node and wasn't reassigned to this node by master yet. When recovery of a shard fails on a node, the node sends notification to the master with information about the failure. During the period between the shard failure and the time when notification about the failure reaches the master, any changes in shard allocations can trigger the node with the failed shard to allocate this shard again. This allocation (especially if successful) creates a ripple effect of the shard going through failure/started states in order to match the delayed states processed by master. Under certain condition, a node involved in this process might generate warning messages: "marked shard as started, but shard has not been created, mark shard as failed". This fix makes sure that nodes keep track of failed shard allocations and will not try to allocate such shards repeatedly while waiting for the failure notification to be processed by master. --- .../cluster/IndicesClusterStateService.java | 43 +++++++++++++++++-- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 5aa262561ec..becabd7af96 100644 --- a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -63,9 +63,7 @@ import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.indices.recovery.StartRecoveryRequest; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; +import java.util.*; import java.util.concurrent.ConcurrentMap; import static com.google.common.collect.Maps.newHashMap; @@ -92,6 +90,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent, Boolean> seenMappings = ConcurrentCollections.newConcurrentMap(); + // a list of shards that failed during recovery + // we keep track of these shards in order to prevent repeated recovery of these shards on each cluster state update + private final ConcurrentMap failedShards = ConcurrentCollections.newConcurrentMap(); + private final Object mutex = new Object(); private final FailedEngineHandler failedEngineHandler = new FailedEngineHandler(); @@ -496,10 +498,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent> iterator = failedShards.entrySet().iterator(); + shards: + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + IndexRoutingTable indexRoutingTable = routingTable.index(entry.getKey().getIndex()); + if (indexRoutingTable != null) { + IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(entry.getKey().id()); + if (shardRoutingTable != null) { + for (ShardRouting shardRouting : shardRoutingTable.assignedShards()) { + if (localNodeId.equals(shardRouting.currentNodeId())) { + if (shardRouting.version() == entry.getValue()) { + // It's the same failed shard - keep it + continue shards; + } else { + // Different version - remove it + break; + } + } + } + } + } + iterator.remove(); + } + } + private void applyInitializingShard(final RoutingTable routingTable, final DiscoveryNodes nodes, final IndexShardRoutingTable indexShardRouting, final ShardRouting shardRouting) throws ElasticSearchException { final IndexService indexService = indicesService.indexService(shardRouting.index()); if (indexService == null) { @@ -571,6 +602,11 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent