Allow master to assign primary shard to node that has shard store locked during shard state fetching (#21656)
PR #19416 added a safety mechanism to shard state fetching to only access the store when the shard lock can be acquired. This can lead to the following situation however where a shard has not fully shut down yet while the shard fetching is going on, resulting in a ShardLockObtainFailedException. PrimaryShardAllocator that decides where to allocate primary shards sees this exception and treats the shard as unusable. If this is the only shard copy in the cluster, the cluster stays red and a new shard fetching cycle will not be triggered as shard state fetching treats exceptions while opening the store as permanent failures. This commit makes it so that PrimaryShardAllocator treats the locked shard as a possible allocation target (although with the least priority).
This commit is contained in:
parent
080d55a393
commit
a44655763e
|
@ -38,6 +38,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type;
|
|||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.env.ShardLockObtainFailedException;
|
||||
import org.elasticsearch.gateway.AsyncShardFetch.FetchResult;
|
||||
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards;
|
||||
import org.elasticsearch.index.shard.ShardStateMetaData;
|
||||
|
@ -256,6 +257,11 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
|
|||
return nodeDecisions;
|
||||
}
|
||||
|
||||
private static final Comparator<NodeGatewayStartedShards> NO_STORE_EXCEPTION_FIRST_COMPARATOR =
|
||||
Comparator.comparing((NodeGatewayStartedShards state) -> state.storeException() == null).reversed();
|
||||
private static final Comparator<NodeGatewayStartedShards> PRIMARY_FIRST_COMPARATOR =
|
||||
Comparator.comparing(NodeGatewayStartedShards::primary).reversed();
|
||||
|
||||
/**
|
||||
* Builds a list of nodes. If matchAnyShard is set to false, only nodes that have an allocation id matching
|
||||
* inSyncAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but
|
||||
|
@ -265,8 +271,7 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
|
|||
Set<String> ignoreNodes, Set<String> inSyncAllocationIds,
|
||||
FetchResult<NodeGatewayStartedShards> shardState,
|
||||
Logger logger) {
|
||||
LinkedList<NodeGatewayStartedShards> matchingNodeShardStates = new LinkedList<>();
|
||||
LinkedList<NodeGatewayStartedShards> nonMatchingNodeShardStates = new LinkedList<>();
|
||||
List<NodeGatewayStartedShards> nodeShardStates = new ArrayList<>();
|
||||
int numberOfAllocationsFound = 0;
|
||||
for (NodeGatewayStartedShards nodeShardState : shardState.getData().values()) {
|
||||
DiscoveryNode node = nodeShardState.getNode();
|
||||
|
@ -287,31 +292,36 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
|
|||
}
|
||||
} else {
|
||||
final String finalAllocationId = allocationId;
|
||||
if (nodeShardState.storeException() instanceof ShardLockObtainFailedException) {
|
||||
logger.trace((Supplier<?>) () -> new ParameterizedMessage("[{}] on node [{}] has allocation id [{}] but the store can not be opened as it's locked, treating as valid shard", shard, nodeShardState.getNode(), finalAllocationId), nodeShardState.storeException());
|
||||
} else {
|
||||
logger.trace((Supplier<?>) () -> new ParameterizedMessage("[{}] on node [{}] has allocation id [{}] but the store can not be opened, treating as no allocation id", shard, nodeShardState.getNode(), finalAllocationId), nodeShardState.storeException());
|
||||
allocationId = null;
|
||||
}
|
||||
}
|
||||
|
||||
if (allocationId != null) {
|
||||
assert nodeShardState.storeException() == null ||
|
||||
nodeShardState.storeException() instanceof ShardLockObtainFailedException :
|
||||
"only allow store that can be opened or that throws a ShardLockObtainFailedException while being opened but got a store throwing " + nodeShardState.storeException();
|
||||
numberOfAllocationsFound++;
|
||||
if (inSyncAllocationIds.contains(allocationId)) {
|
||||
if (nodeShardState.primary()) {
|
||||
matchingNodeShardStates.addFirst(nodeShardState);
|
||||
} else {
|
||||
matchingNodeShardStates.addLast(nodeShardState);
|
||||
}
|
||||
} else if (matchAnyShard) {
|
||||
if (nodeShardState.primary()) {
|
||||
nonMatchingNodeShardStates.addFirst(nodeShardState);
|
||||
} else {
|
||||
nonMatchingNodeShardStates.addLast(nodeShardState);
|
||||
}
|
||||
if (matchAnyShard || inSyncAllocationIds.contains(nodeShardState.allocationId())) {
|
||||
nodeShardStates.add(nodeShardState);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
List<NodeGatewayStartedShards> nodeShardStates = new ArrayList<>();
|
||||
nodeShardStates.addAll(matchingNodeShardStates);
|
||||
nodeShardStates.addAll(nonMatchingNodeShardStates);
|
||||
final Comparator<NodeGatewayStartedShards> comparator; // allocation preference
|
||||
if (matchAnyShard) {
|
||||
// prefer shards with matching allocation ids
|
||||
Comparator<NodeGatewayStartedShards> matchingAllocationsFirst = Comparator.comparing(
|
||||
(NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId())).reversed();
|
||||
comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR).thenComparing(PRIMARY_FIRST_COMPARATOR);
|
||||
} else {
|
||||
comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR);
|
||||
}
|
||||
|
||||
nodeShardStates.sort(comparator);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("{} candidates for allocation: {}", shard, nodeShardStates.stream().map(s -> s.getNode().getName()).collect(Collectors.joining(", ")));
|
||||
|
@ -412,11 +422,20 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
|
|||
logger.trace("[{}] on node [{}] has allocation id [{}]", shard, nodeShardState.getNode(), nodeShardState.allocationId());
|
||||
}
|
||||
} else {
|
||||
final long finalVerison = version;
|
||||
// when there is an store exception, we disregard the reported version and assign it as no version (same as shard does not exist)
|
||||
logger.trace((Supplier<?>) () -> new ParameterizedMessage("[{}] on node [{}] has version [{}] but the store can not be opened, treating no version", shard, nodeShardState.getNode(), finalVerison), nodeShardState.storeException());
|
||||
final long finalVersion = version;
|
||||
if (nodeShardState.storeException() instanceof ShardLockObtainFailedException) {
|
||||
logger.trace((Supplier<?>) () -> new ParameterizedMessage("[{}] on node [{}] has version [{}] but the store can not be opened as it's locked, treating as valid shard", shard, nodeShardState.getNode(), finalVersion), nodeShardState.storeException());
|
||||
if (nodeShardState.allocationId() != null) {
|
||||
version = Long.MAX_VALUE; // shard was already selected in a 5.x cluster as primary, prefer this shard copy again.
|
||||
} else {
|
||||
version = 0L; // treat as lowest version so that this shard is the least likely to be selected as primary
|
||||
}
|
||||
} else {
|
||||
// disregard the reported version and assign it as no version (same as shard does not exist)
|
||||
logger.trace((Supplier<?>) () -> new ParameterizedMessage("[{}] on node [{}] has version [{}] but the store can not be opened, treating no version", shard, nodeShardState.getNode(), finalVersion), nodeShardState.storeException());
|
||||
version = ShardStateMetaData.NO_VERSION;
|
||||
}
|
||||
}
|
||||
|
||||
if (version != ShardStateMetaData.NO_VERSION) {
|
||||
numberOfAllocationsFound++;
|
||||
|
|
|
@ -414,15 +414,12 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
* segment infos and possible corruption markers. If the index can not
|
||||
* be opened, an exception is thrown
|
||||
*/
|
||||
public static void tryOpenIndex(Path indexLocation, ShardId shardId, NodeEnvironment.ShardLocker shardLocker, Logger logger) throws IOException {
|
||||
public static void tryOpenIndex(Path indexLocation, ShardId shardId, NodeEnvironment.ShardLocker shardLocker, Logger logger) throws IOException, ShardLockObtainFailedException {
|
||||
try (ShardLock lock = shardLocker.lock(shardId, TimeUnit.SECONDS.toMillis(5));
|
||||
Directory dir = new SimpleFSDirectory(indexLocation)) {
|
||||
failIfCorrupted(dir, shardId);
|
||||
SegmentInfos segInfo = Lucene.readSegmentInfos(dir);
|
||||
logger.trace("{} loaded segment info [{}]", shardId, segInfo);
|
||||
} catch (ShardLockObtainFailedException ex) {
|
||||
logger.error((Supplier<?>) () -> new ParameterizedMessage("{} unable to acquire shard lock", shardId), ex);
|
||||
throw new IOException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.elasticsearch.common.Nullable;
|
|||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.env.ShardLockObtainFailedException;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.ShardStateMetaData;
|
||||
import org.elasticsearch.snapshots.Snapshot;
|
||||
|
@ -174,6 +175,69 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that when the node returns a ShardLockObtainFailedException, it will be considered as a valid shard copy
|
||||
*/
|
||||
public void testShardLockObtainFailedException() {
|
||||
final RoutingAllocation allocation;
|
||||
boolean useAllocationIds = randomBoolean();
|
||||
if (useAllocationIds) {
|
||||
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED,
|
||||
randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
|
||||
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean(),
|
||||
new ShardLockObtainFailedException(shardId, "test"));
|
||||
} else {
|
||||
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_1_1);
|
||||
testAllocator.addData(node1, 3, null, randomBoolean(), new ShardLockObtainFailedException(shardId, "test"));
|
||||
}
|
||||
testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(allocation.routingNodesChanged(), equalTo(true));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.getId()));
|
||||
if (useAllocationIds) {
|
||||
// check that allocation id is reused
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo("allocId1"));
|
||||
}
|
||||
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that when one node returns a ShardLockObtainFailedException and another properly loads the store, it will
|
||||
* select the second node as target
|
||||
*/
|
||||
public void testShardLockObtainFailedExceptionPreferOtherValidCopies() {
|
||||
final RoutingAllocation allocation;
|
||||
boolean useAllocationIds = randomBoolean();
|
||||
String allocId1 = randomAsciiOfLength(10);
|
||||
String allocId2 = randomAsciiOfLength(10);
|
||||
if (useAllocationIds) {
|
||||
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED,
|
||||
randomFrom(Version.V_2_0_0, Version.CURRENT), allocId1, allocId2);
|
||||
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, allocId1, randomBoolean(),
|
||||
new ShardLockObtainFailedException(shardId, "test"));
|
||||
testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, allocId2, randomBoolean(), null);
|
||||
} else {
|
||||
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_1_1);
|
||||
testAllocator.addData(node1, 3, null, randomBoolean(), new ShardLockObtainFailedException(shardId, "test"));
|
||||
if (randomBoolean()) {
|
||||
testAllocator.addData(node2, randomIntBetween(2, 4), null, randomBoolean(), null);
|
||||
} else {
|
||||
testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, "some alloc id", randomBoolean(), null);
|
||||
}
|
||||
}
|
||||
testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(allocation.routingNodesChanged(), equalTo(true));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.getId()));
|
||||
if (useAllocationIds) {
|
||||
// check that allocation id is reused
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo(allocId2));
|
||||
}
|
||||
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that when there is a node to allocate the shard to, it will be allocated to it.
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue