Use allocation ids to prevent repeated recovery of failed shards

Closes #16346
This commit is contained in:
Yannick Welsch 2016-02-01 17:12:20 +01:00
parent 26f77eb70d
commit cd53772178
1 changed files with 19 additions and 51 deletions

View File

@ -39,13 +39,11 @@ import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.IndexService;
@ -93,26 +91,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private static final ShardStateAction.Listener SHARD_STATE_ACTION_LISTENER = new ShardStateAction.Listener() {};
// a map of mappings type we have seen per index due to cluster state
// we need this so we won't remove types automatically created as part of the indexing process
private final ConcurrentMap<Tuple<String, String>, Boolean> seenMappings = ConcurrentCollections.newConcurrentMap();
// a list of shards that failed during recovery
// we keep track of these shards in order to prevent repeated recovery of these shards on each cluster state update
private final ConcurrentMap<ShardId, FailedShard> failedShards = ConcurrentCollections.newConcurrentMap();
private final ConcurrentMap<ShardId, ShardRouting> failedShards = ConcurrentCollections.newConcurrentMap();
private final RestoreService restoreService;
private final RepositoriesService repositoriesService;
static class FailedShard {
public final long version;
public final long timestamp;
FailedShard(long version) {
this.version = version;
this.timestamp = System.currentTimeMillis();
}
}
private final Object mutex = new Object();
private final FailedShardHandler failedShardHandler = new FailedShardHandler();
@ -431,11 +415,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
RoutingTable routingTable = event.state().routingTable();
RoutingNodes.RoutingNodeIterator routingNode = event.state().getRoutingNodes().routingNodeIter(event.state().nodes().localNodeId());
if (routingNode == null) {
failedShards.clear();
return;
}
DiscoveryNodes nodes = event.state().nodes();
for (final ShardRouting shardRouting : routingNode) {
@ -507,40 +486,29 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
private void cleanFailedShards(final ClusterChangedEvent event) {
RoutingTable routingTable = event.state().routingTable();
RoutingNodes.RoutingNodeIterator routingNode = event.state().getRoutingNodes().routingNodeIter(event.state().nodes().localNodeId());
if (routingNode == null) {
failedShards.clear();
return;
}
DiscoveryNodes nodes = event.state().nodes();
long now = System.currentTimeMillis();
String localNodeId = nodes.localNodeId();
Iterator<Map.Entry<ShardId, FailedShard>> iterator = failedShards.entrySet().iterator();
shards:
while (iterator.hasNext()) {
Map.Entry<ShardId, FailedShard> entry = iterator.next();
FailedShard failedShard = entry.getValue();
IndexRoutingTable indexRoutingTable = routingTable.index(entry.getKey().getIndex());
if (indexRoutingTable != null) {
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(entry.getKey().id());
if (shardRoutingTable != null) {
for (ShardRouting shardRouting : shardRoutingTable.assignedShards()) {
if (localNodeId.equals(shardRouting.currentNodeId())) {
// we have a timeout here just to make sure we don't have dangled failed shards for some reason
// its just another safely layer
if (shardRouting.version() == failedShard.version && ((now - failedShard.timestamp) < TimeValue.timeValueMinutes(60).millis())) {
// It's the same failed shard - keep it if it hasn't timed out
continue shards;
} else {
// Different version or expired, remove it
break;
}
}
}
}
RoutingTable routingTable = event.state().routingTable();
for (Iterator<Map.Entry<ShardId, ShardRouting>> iterator = failedShards.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<ShardId, ShardRouting> entry = iterator.next();
ShardId failedShardId = entry.getKey();
ShardRouting failedShardRouting = entry.getValue();
IndexRoutingTable indexRoutingTable = routingTable.index(failedShardId.getIndex());
if (indexRoutingTable == null) {
iterator.remove();
continue;
}
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(failedShardId.id());
if (shardRoutingTable == null) {
iterator.remove();
continue;
}
if (shardRoutingTable.assignedShards().stream().noneMatch(shr -> shr.isSameAllocation(failedShardRouting))) {
iterator.remove();
}
iterator.remove();
}
}
@ -788,7 +756,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private void sendFailShard(ShardRouting shardRouting, String indexUUID, String message, @Nullable Throwable failure) {
try {
logger.warn("[{}] marking and sending shard failed due to [{}]", failure, shardRouting.shardId(), message);
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
failedShards.put(shardRouting.shardId(), shardRouting);
shardStateAction.shardFailed(shardRouting, indexUUID, message, failure, SHARD_STATE_ACTION_LISTENER);
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to mark shard as failed (because of [{}])", e1, shardRouting.getIndexName(), shardRouting.getId(), message);