From a44655763e150522d58cbd8113fb1b30a475b8c7 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 22 Nov 2016 19:35:47 +0100 Subject: [PATCH] Allow master to assign primary shard to node that has shard store locked during shard state fetching (#21656) PR #19416 added a safety mechanism to shard state fetching to only access the store when the shard lock can be acquired. This can lead to the following situation however where a shard has not fully shut down yet while the shard fetching is going on, resulting in a ShardLockObtainFailedException. PrimaryShardAllocator that decides where to allocate primary shards sees this exception and treats the shard as unusable. If this is the only shard copy in the cluster, the cluster stays red and a new shard fetching cycle will not be triggered as shard state fetching treats exceptions while opening the store as permanent failures. This commit makes it so that PrimaryShardAllocator treats the locked shard as a possible allocation target (although with the least priority). --- .../gateway/PrimaryShardAllocator.java | 65 ++++++++++++------- .../org/elasticsearch/index/store/Store.java | 5 +- .../gateway/PrimaryShardAllocatorTests.java | 64 ++++++++++++++++++ 3 files changed, 107 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java b/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java index 53788f7b84d..222ecad0cc1 100644 --- a/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java @@ -38,6 +38,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.ShardLockObtainFailedException; import org.elasticsearch.gateway.AsyncShardFetch.FetchResult; import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards; import org.elasticsearch.index.shard.ShardStateMetaData; @@ -256,6 +257,11 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator { return nodeDecisions; } + private static final Comparator NO_STORE_EXCEPTION_FIRST_COMPARATOR = + Comparator.comparing((NodeGatewayStartedShards state) -> state.storeException() == null).reversed(); + private static final Comparator PRIMARY_FIRST_COMPARATOR = + Comparator.comparing(NodeGatewayStartedShards::primary).reversed(); + /** * Builds a list of nodes. If matchAnyShard is set to false, only nodes that have an allocation id matching * inSyncAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but @@ -265,8 +271,7 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator { Set ignoreNodes, Set inSyncAllocationIds, FetchResult shardState, Logger logger) { - LinkedList matchingNodeShardStates = new LinkedList<>(); - LinkedList nonMatchingNodeShardStates = new LinkedList<>(); + List nodeShardStates = new ArrayList<>(); int numberOfAllocationsFound = 0; for (NodeGatewayStartedShards nodeShardState : shardState.getData().values()) { DiscoveryNode node = nodeShardState.getNode(); @@ -287,31 +292,36 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator { } } else { final String finalAllocationId = allocationId; - logger.trace((Supplier) () -> new ParameterizedMessage("[{}] on node [{}] has allocation id [{}] but the store can not be opened, treating as no allocation id", shard, nodeShardState.getNode(), finalAllocationId), nodeShardState.storeException()); - allocationId = null; + if (nodeShardState.storeException() instanceof ShardLockObtainFailedException) { + logger.trace((Supplier) () -> new ParameterizedMessage("[{}] on node [{}] has allocation id [{}] but the store can not be opened as it's locked, treating as valid shard", shard, nodeShardState.getNode(), finalAllocationId), nodeShardState.storeException()); + } else { + logger.trace((Supplier) () -> new ParameterizedMessage("[{}] on node [{}] has allocation id [{}] but the store can not be opened, treating as no allocation id", shard, nodeShardState.getNode(), finalAllocationId), nodeShardState.storeException()); + allocationId = null; + } } if (allocationId != null) { + assert nodeShardState.storeException() == null || + nodeShardState.storeException() instanceof ShardLockObtainFailedException : + "only allow store that can be opened or that throws a ShardLockObtainFailedException while being opened but got a store throwing " + nodeShardState.storeException(); numberOfAllocationsFound++; - if (inSyncAllocationIds.contains(allocationId)) { - if (nodeShardState.primary()) { - matchingNodeShardStates.addFirst(nodeShardState); - } else { - matchingNodeShardStates.addLast(nodeShardState); - } - } else if (matchAnyShard) { - if (nodeShardState.primary()) { - nonMatchingNodeShardStates.addFirst(nodeShardState); - } else { - nonMatchingNodeShardStates.addLast(nodeShardState); - } + if (matchAnyShard || inSyncAllocationIds.contains(nodeShardState.allocationId())) { + nodeShardStates.add(nodeShardState); } } } - List nodeShardStates = new ArrayList<>(); - nodeShardStates.addAll(matchingNodeShardStates); - nodeShardStates.addAll(nonMatchingNodeShardStates); + final Comparator comparator; // allocation preference + if (matchAnyShard) { + // prefer shards with matching allocation ids + Comparator matchingAllocationsFirst = Comparator.comparing( + (NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId())).reversed(); + comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR).thenComparing(PRIMARY_FIRST_COMPARATOR); + } else { + comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR); + } + + nodeShardStates.sort(comparator); if (logger.isTraceEnabled()) { logger.trace("{} candidates for allocation: {}", shard, nodeShardStates.stream().map(s -> s.getNode().getName()).collect(Collectors.joining(", "))); @@ -412,10 +422,19 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator { logger.trace("[{}] on node [{}] has allocation id [{}]", shard, nodeShardState.getNode(), nodeShardState.allocationId()); } } else { - final long finalVerison = version; - // when there is an store exception, we disregard the reported version and assign it as no version (same as shard does not exist) - logger.trace((Supplier) () -> new ParameterizedMessage("[{}] on node [{}] has version [{}] but the store can not be opened, treating no version", shard, nodeShardState.getNode(), finalVerison), nodeShardState.storeException()); - version = ShardStateMetaData.NO_VERSION; + final long finalVersion = version; + if (nodeShardState.storeException() instanceof ShardLockObtainFailedException) { + logger.trace((Supplier) () -> new ParameterizedMessage("[{}] on node [{}] has version [{}] but the store can not be opened as it's locked, treating as valid shard", shard, nodeShardState.getNode(), finalVersion), nodeShardState.storeException()); + if (nodeShardState.allocationId() != null) { + version = Long.MAX_VALUE; // shard was already selected in a 5.x cluster as primary, prefer this shard copy again. + } else { + version = 0L; // treat as lowest version so that this shard is the least likely to be selected as primary + } + } else { + // disregard the reported version and assign it as no version (same as shard does not exist) + logger.trace((Supplier) () -> new ParameterizedMessage("[{}] on node [{}] has version [{}] but the store can not be opened, treating no version", shard, nodeShardState.getNode(), finalVersion), nodeShardState.storeException()); + version = ShardStateMetaData.NO_VERSION; + } } if (version != ShardStateMetaData.NO_VERSION) { diff --git a/core/src/main/java/org/elasticsearch/index/store/Store.java b/core/src/main/java/org/elasticsearch/index/store/Store.java index 6f345b58797..fcc079d7909 100644 --- a/core/src/main/java/org/elasticsearch/index/store/Store.java +++ b/core/src/main/java/org/elasticsearch/index/store/Store.java @@ -414,15 +414,12 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref * segment infos and possible corruption markers. If the index can not * be opened, an exception is thrown */ - public static void tryOpenIndex(Path indexLocation, ShardId shardId, NodeEnvironment.ShardLocker shardLocker, Logger logger) throws IOException { + public static void tryOpenIndex(Path indexLocation, ShardId shardId, NodeEnvironment.ShardLocker shardLocker, Logger logger) throws IOException, ShardLockObtainFailedException { try (ShardLock lock = shardLocker.lock(shardId, TimeUnit.SECONDS.toMillis(5)); Directory dir = new SimpleFSDirectory(indexLocation)) { failIfCorrupted(dir, shardId); SegmentInfos segInfo = Lucene.readSegmentInfos(dir); logger.trace("{} loaded segment info [{}]", shardId, segInfo); - } catch (ShardLockObtainFailedException ex) { - logger.error((Supplier) () -> new ParameterizedMessage("{} unable to acquire shard lock", shardId), ex); - throw new IOException(ex); } } diff --git a/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java b/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java index 2af4d49f742..08d806fa790 100644 --- a/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java +++ b/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java @@ -46,6 +46,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.env.ShardLockObtainFailedException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardStateMetaData; import org.elasticsearch.snapshots.Snapshot; @@ -174,6 +175,69 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); } + /** + * Tests that when the node returns a ShardLockObtainFailedException, it will be considered as a valid shard copy + */ + public void testShardLockObtainFailedException() { + final RoutingAllocation allocation; + boolean useAllocationIds = randomBoolean(); + if (useAllocationIds) { + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, + randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); + testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean(), + new ShardLockObtainFailedException(shardId, "test")); + } else { + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_1_1); + testAllocator.addData(node1, 3, null, randomBoolean(), new ShardLockObtainFailedException(shardId, "test")); + } + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); + assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.getId())); + if (useAllocationIds) { + // check that allocation id is reused + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo("allocId1")); + } + assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); + } + + /** + * Tests that when one node returns a ShardLockObtainFailedException and another properly loads the store, it will + * select the second node as target + */ + public void testShardLockObtainFailedExceptionPreferOtherValidCopies() { + final RoutingAllocation allocation; + boolean useAllocationIds = randomBoolean(); + String allocId1 = randomAsciiOfLength(10); + String allocId2 = randomAsciiOfLength(10); + if (useAllocationIds) { + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, + randomFrom(Version.V_2_0_0, Version.CURRENT), allocId1, allocId2); + testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, allocId1, randomBoolean(), + new ShardLockObtainFailedException(shardId, "test")); + testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, allocId2, randomBoolean(), null); + } else { + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_1_1); + testAllocator.addData(node1, 3, null, randomBoolean(), new ShardLockObtainFailedException(shardId, "test")); + if (randomBoolean()) { + testAllocator.addData(node2, randomIntBetween(2, 4), null, randomBoolean(), null); + } else { + testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, "some alloc id", randomBoolean(), null); + } + } + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); + assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.getId())); + if (useAllocationIds) { + // check that allocation id is reused + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo(allocId2)); + } + assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); + } + /** * Tests that when there is a node to allocate the shard to, it will be allocated to it. */