Don't recover a shard if it just failed on this node and wasn't reassigned to this node by master yet.
When recovery of a shard fails on a node, the node sends notification to the master with information about the failure. During the period between the shard failure and the time when notification about the failure reaches the master, any changes in shard allocations can trigger the node with the failed shard to allocate this shard again. This allocation (especially if successful) creates a ripple effect of the shard going through failure/started states in order to match the delayed states processed by master. Under certain condition, a node involved in this process might generate warning messages: "marked shard as started, but shard has not been created, mark shard as failed". This fix makes sure that nodes keep track of failed shard allocations and will not try to allocate such shards repeatedly while waiting for the failure notification to be processed by master.
This commit is contained in:
parent
513c761aee
commit
ed2740a50a
|
@ -63,9 +63,7 @@ import org.elasticsearch.indices.recovery.RecoveryTarget;
|
|||
import org.elasticsearch.indices.recovery.StartRecoveryRequest;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import static com.google.common.collect.Maps.newHashMap;
|
||||
|
@ -92,6 +90,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
// we need this so we won't remove types automatically created as part of the indexing process
|
||||
private final ConcurrentMap<Tuple<String, String>, Boolean> seenMappings = ConcurrentCollections.newConcurrentMap();
|
||||
|
||||
// a list of shards that failed during recovery
|
||||
// we keep track of these shards in order to prevent repeated recovery of these shards on each cluster state update
|
||||
private final ConcurrentMap<ShardId, Long> failedShards = ConcurrentCollections.newConcurrentMap();
|
||||
|
||||
private final Object mutex = new Object();
|
||||
|
||||
private final FailedEngineHandler failedEngineHandler = new FailedEngineHandler();
|
||||
|
@ -496,10 +498,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
RoutingTable routingTable = event.state().routingTable();
|
||||
RoutingNode routingNodes = event.state().readOnlyRoutingNodes().nodesToShards().get(event.state().nodes().localNodeId());
|
||||
if (routingNodes == null) {
|
||||
failedShards.clear();
|
||||
return;
|
||||
}
|
||||
DiscoveryNodes nodes = event.state().nodes();
|
||||
|
||||
cleanFailedShards(routingTable, nodes);
|
||||
|
||||
for (final ShardRouting shardRouting : routingNodes) {
|
||||
final IndexService indexService = indicesService.indexService(shardRouting.index());
|
||||
|
@ -545,6 +549,33 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
}
|
||||
}
|
||||
|
||||
private void cleanFailedShards(RoutingTable routingTable, DiscoveryNodes nodes) {
|
||||
String localNodeId = nodes.localNodeId();
|
||||
Iterator<Map.Entry<ShardId, Long>> iterator = failedShards.entrySet().iterator();
|
||||
shards:
|
||||
while (iterator.hasNext()) {
|
||||
Map.Entry<ShardId, Long> entry = iterator.next();
|
||||
IndexRoutingTable indexRoutingTable = routingTable.index(entry.getKey().getIndex());
|
||||
if (indexRoutingTable != null) {
|
||||
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(entry.getKey().id());
|
||||
if (shardRoutingTable != null) {
|
||||
for (ShardRouting shardRouting : shardRoutingTable.assignedShards()) {
|
||||
if (localNodeId.equals(shardRouting.currentNodeId())) {
|
||||
if (shardRouting.version() == entry.getValue()) {
|
||||
// It's the same failed shard - keep it
|
||||
continue shards;
|
||||
} else {
|
||||
// Different version - remove it
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
|
||||
private void applyInitializingShard(final RoutingTable routingTable, final DiscoveryNodes nodes, final IndexShardRoutingTable indexShardRouting, final ShardRouting shardRouting) throws ElasticSearchException {
|
||||
final IndexService indexService = indicesService.indexService(shardRouting.index());
|
||||
if (indexService == null) {
|
||||
|
@ -571,6 +602,11 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
}
|
||||
// if there is no shard, create it
|
||||
if (!indexService.hasShard(shardId)) {
|
||||
if (failedShards.containsKey(shardRouting.shardId())) {
|
||||
// already tried to create this shard but it failed - ignore
|
||||
logger.trace("[{}][{}] not initializing, this shards failed to recover on this node before, waiting for reassignment", shardRouting.index(), shardRouting.id());
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("[{}][{}] creating shard", shardRouting.index(), shardId);
|
||||
|
@ -710,6 +746,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
private void handleRecoveryFailure(IndexService indexService, ShardRouting shardRouting, boolean sendShardFailure, Throwable failure) {
|
||||
logger.warn("[{}][{}] failed to start shard", failure, indexService.index().name(), shardRouting.shardId().id());
|
||||
synchronized (mutex) {
|
||||
failedShards.put(shardRouting.shardId(), shardRouting.version());
|
||||
if (indexService.hasShard(shardRouting.shardId().id())) {
|
||||
try {
|
||||
indexService.removeShard(shardRouting.shardId().id(), "recovery failure [" + ExceptionsHelper.detailedMessage(failure) + "]");
|
||||
|
|
Loading…
Reference in New Issue