Don't recover a shard if it just failed on this node and wasn't reassigned to this node by master yet.

additional places where we should track failed shards, and call clean as part of the top level calls
This commit is contained in:
Shay Banon 2013-08-28 14:11:49 +02:00
parent ed2740a50a
commit 25d42e5caf
1 changed files with 39 additions and 13 deletions

View File

@ -92,7 +92,17 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// a list of shards that failed during recovery
// we keep track of these shards in order to prevent repeated recovery of these shards on each cluster state update
private final ConcurrentMap<ShardId, Long> failedShards = ConcurrentCollections.newConcurrentMap();
private final ConcurrentMap<ShardId, FailedShard> failedShards = ConcurrentCollections.newConcurrentMap();
static class FailedShard {
public final long version;
public final long timestamp;
FailedShard(long version) {
this.version = version;
this.timestamp = System.currentTimeMillis();
}
}
private final Object mutex = new Object();
@ -163,6 +173,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
return;
}
cleanFailedShards(event);
applyNewIndices(event);
applyMappings(event);
applyAliases(event);
@ -503,8 +514,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
DiscoveryNodes nodes = event.state().nodes();
cleanFailedShards(routingTable, nodes);
for (final ShardRouting shardRouting : routingNodes) {
final IndexService indexService = indicesService.indexService(shardRouting.index());
if (indexService == null) {
@ -515,9 +524,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
final int shardId = shardRouting.id();
if (!indexService.hasShard(shardId) && shardRouting.started()) {
if (!failedShards.containsKey(shardRouting.shardId())) {
// the master thinks we are started, but we don't have this shard at all, mark it as failed
logger.warn("[{}][{}] master [{}] marked shard as started, but shard has not been created, mark shard as failed", shardRouting.index(), shardId, nodes.masterNode());
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
shardStateAction.shardFailed(shardRouting, "master " + nodes.masterNode() + " marked shard as started, but shard has not been created, mark shard as failed");
}
continue;
}
@ -549,23 +561,35 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
}
private void cleanFailedShards(RoutingTable routingTable, DiscoveryNodes nodes) {
private void cleanFailedShards(final ClusterChangedEvent event) {
RoutingTable routingTable = event.state().routingTable();
RoutingNode routingNodes = event.state().readOnlyRoutingNodes().nodesToShards().get(event.state().nodes().localNodeId());
if (routingNodes == null) {
failedShards.clear();
return;
}
DiscoveryNodes nodes = event.state().nodes();
long now = System.currentTimeMillis();
String localNodeId = nodes.localNodeId();
Iterator<Map.Entry<ShardId, Long>> iterator = failedShards.entrySet().iterator();
Iterator<Map.Entry<ShardId, FailedShard>> iterator = failedShards.entrySet().iterator();
shards:
while (iterator.hasNext()) {
Map.Entry<ShardId, Long> entry = iterator.next();
Map.Entry<ShardId, FailedShard> entry = iterator.next();
FailedShard failedShard = entry.getValue();
IndexRoutingTable indexRoutingTable = routingTable.index(entry.getKey().getIndex());
if (indexRoutingTable != null) {
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(entry.getKey().id());
if (shardRoutingTable != null) {
for (ShardRouting shardRouting : shardRoutingTable.assignedShards()) {
if (localNodeId.equals(shardRouting.currentNodeId())) {
if (shardRouting.version() == entry.getValue()) {
// It's the same failed shard - keep it
// we have a timeout here just to make sure we don't have dangled failed shards for some reason
// its just another safely layer
if (shardRouting.version() == failedShard.version && ((now - failedShard.timestamp) < TimeValue.timeValueMinutes(60).millis())) {
// It's the same failed shard - keep it if it hasn't timed out
continue shards;
} else {
// Different version - remove it
// Different version or expired, remove it
break;
}
}
@ -625,6 +649,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to remove shard after failed creation", e1, shardRouting.index(), shardRouting.id());
}
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
shardStateAction.shardFailed(shardRouting, "Failed to create shard, message [" + detailedMessage(e) + "]");
return;
}
@ -746,7 +771,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private void handleRecoveryFailure(IndexService indexService, ShardRouting shardRouting, boolean sendShardFailure, Throwable failure) {
logger.warn("[{}][{}] failed to start shard", failure, indexService.index().name(), shardRouting.shardId().id());
synchronized (mutex) {
failedShards.put(shardRouting.shardId(), shardRouting.version());
if (indexService.hasShard(shardRouting.shardId().id())) {
try {
indexService.removeShard(shardRouting.shardId().id(), "recovery failure [" + ExceptionsHelper.detailedMessage(failure) + "]");
@ -758,6 +782,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
if (sendShardFailure) {
try {
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
shardStateAction.shardFailed(shardRouting, "Failed to start shard, message [" + detailedMessage(failure) + "]");
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to mark shard as failed after a failed start", e1, indexService.index().name(), shardRouting.id());
@ -796,6 +821,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
}
try {
failedShards.put(fShardRouting.shardId(), new FailedShard(fShardRouting.version()));
shardStateAction.shardFailed(fShardRouting, "engine failure, message [" + detailedMessage(failure) + "]");
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to mark shard as failed after a failed engine", e1, indexService.index().name(), shardId.id());