[CORE] Don't update indexShard if it has been removed before
Today we have logic that removes a shard from the indexservice if the shard has changed ie. from replica to primary or if it's recovery source vanished etc. This can cause shards from been not allocated at all on a nodes causeing delete requests to timeout since we were waiting for shards on nodes that got dropped due to a IndexShardMissingException Closes #7509
This commit is contained in:
parent
c0aef4adc4
commit
1bb0677df7
|
@ -43,6 +43,7 @@ import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||||
import org.elasticsearch.common.compress.CompressedString;
|
import org.elasticsearch.common.compress.CompressedString;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
|
import org.elasticsearch.common.inject.Injector;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||||
|
@ -548,11 +549,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
// if the current and global routing are initializing, but are still not the same, its a different "shard" being allocated
|
// if the current and global routing are initializing, but are still not the same, its a different "shard" being allocated
|
||||||
// for example: a shard that recovers from one node and now needs to recover to another node,
|
// for example: a shard that recovers from one node and now needs to recover to another node,
|
||||||
// or a replica allocated and then allocating a primary because the primary failed on another node
|
// or a replica allocated and then allocating a primary because the primary failed on another node
|
||||||
|
boolean shardHasBeenRemoved = false;
|
||||||
if (currentRoutingEntry.initializing() && shardRouting.initializing() && !currentRoutingEntry.equals(shardRouting)) {
|
if (currentRoutingEntry.initializing() && shardRouting.initializing() && !currentRoutingEntry.equals(shardRouting)) {
|
||||||
logger.debug("[{}][{}] removing shard (different instance of it allocated on this node, current [{}], global [{}])", shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting);
|
logger.debug("[{}][{}] removing shard (different instance of it allocated on this node, current [{}], global [{}])", shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting);
|
||||||
// cancel recovery just in case we are in recovery (its fine if we are not in recovery, it will be a noop).
|
// cancel recovery just in case we are in recovery (its fine if we are not in recovery, it will be a noop).
|
||||||
recoveryTarget.cancelRecovery(indexShard);
|
recoveryTarget.cancelRecovery(indexShard);
|
||||||
indexService.removeShard(shardRouting.id(), "removing shard (different instance of it allocated on this node)");
|
indexService.removeShard(shardRouting.id(), "removing shard (different instance of it allocated on this node)");
|
||||||
|
shardHasBeenRemoved = true;
|
||||||
} else if (isPeerRecovery(shardRouting)) {
|
} else if (isPeerRecovery(shardRouting)) {
|
||||||
// check if there is an existing recovery going, and if so, and the source node is not the same, cancel the recovery to restart it
|
// check if there is an existing recovery going, and if so, and the source node is not the same, cancel the recovery to restart it
|
||||||
RecoveryStatus recoveryStatus = recoveryTarget.recoveryStatus(indexShard);
|
RecoveryStatus recoveryStatus = recoveryTarget.recoveryStatus(indexShard);
|
||||||
|
@ -563,11 +566,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
logger.debug("[{}][{}] removing shard (recovery source changed), current [{}], global [{}])", shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting);
|
logger.debug("[{}][{}] removing shard (recovery source changed), current [{}], global [{}])", shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting);
|
||||||
recoveryTarget.cancelRecovery(indexShard);
|
recoveryTarget.cancelRecovery(indexShard);
|
||||||
indexService.removeShard(shardRouting.id(), "removing shard (recovery source node changed)");
|
indexService.removeShard(shardRouting.id(), "removing shard (recovery source node changed)");
|
||||||
|
shardHasBeenRemoved = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (shardHasBeenRemoved == false && !shardRouting.equals(indexShard.routingEntry())) {
|
||||||
if (!shardRouting.equals(indexShard.routingEntry())) {
|
// if we happen to remove the shardRouting by id above we don't need to jump in here!
|
||||||
indexShard.routingEntry(shardRouting);
|
indexShard.routingEntry(shardRouting);
|
||||||
indexService.shardInjectorSafe(shardId).getInstance(IndexShardGatewayService.class).routingStateChanged();
|
indexService.shardInjectorSafe(shardId).getInstance(IndexShardGatewayService.class).routingStateChanged();
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,9 +99,9 @@ public class SearchWithRandomExceptionsTests extends ElasticsearchIntegrationTes
|
||||||
.setSettings(settings)
|
.setSettings(settings)
|
||||||
.addMapping("type", mapping).execute().actionGet();
|
.addMapping("type", mapping).execute().actionGet();
|
||||||
numInitialDocs = between(10, 100);
|
numInitialDocs = between(10, 100);
|
||||||
ensureYellow();
|
ensureGreen();
|
||||||
for (int i = 0; i < numInitialDocs ; i++) {
|
for (int i = 0; i < numInitialDocs ; i++) {
|
||||||
client().prepareIndex("test", "initial", "" + i).setTimeout(TimeValue.timeValueSeconds(1)).setSource("test", "init").get();
|
client().prepareIndex("test", "initial", "" + i).setSource("test", "init").get();
|
||||||
}
|
}
|
||||||
client().admin().indices().prepareRefresh("test").execute().get();
|
client().admin().indices().prepareRefresh("test").execute().get();
|
||||||
client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).execute().get();
|
client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).execute().get();
|
||||||
|
|
Loading…
Reference in New Issue