diff --git a/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java b/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java index 53788f7b84d..222ecad0cc1 100644 --- a/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java @@ -38,6 +38,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.ShardLockObtainFailedException; import org.elasticsearch.gateway.AsyncShardFetch.FetchResult; import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards; import org.elasticsearch.index.shard.ShardStateMetaData; @@ -256,6 +257,11 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator { return nodeDecisions; } + private static final Comparator NO_STORE_EXCEPTION_FIRST_COMPARATOR = + Comparator.comparing((NodeGatewayStartedShards state) -> state.storeException() == null).reversed(); + private static final Comparator PRIMARY_FIRST_COMPARATOR = + Comparator.comparing(NodeGatewayStartedShards::primary).reversed(); + /** * Builds a list of nodes. If matchAnyShard is set to false, only nodes that have an allocation id matching * inSyncAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but @@ -265,8 +271,7 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator { Set ignoreNodes, Set inSyncAllocationIds, FetchResult shardState, Logger logger) { - LinkedList matchingNodeShardStates = new LinkedList<>(); - LinkedList nonMatchingNodeShardStates = new LinkedList<>(); + List nodeShardStates = new ArrayList<>(); int numberOfAllocationsFound = 0; for (NodeGatewayStartedShards nodeShardState : shardState.getData().values()) { DiscoveryNode node = nodeShardState.getNode(); @@ -287,31 +292,36 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator { } } else { final String finalAllocationId = allocationId; - logger.trace((Supplier) () -> new ParameterizedMessage("[{}] on node [{}] has allocation id [{}] but the store can not be opened, treating as no allocation id", shard, nodeShardState.getNode(), finalAllocationId), nodeShardState.storeException()); - allocationId = null; + if (nodeShardState.storeException() instanceof ShardLockObtainFailedException) { + logger.trace((Supplier) () -> new ParameterizedMessage("[{}] on node [{}] has allocation id [{}] but the store can not be opened as it's locked, treating as valid shard", shard, nodeShardState.getNode(), finalAllocationId), nodeShardState.storeException()); + } else { + logger.trace((Supplier) () -> new ParameterizedMessage("[{}] on node [{}] has allocation id [{}] but the store can not be opened, treating as no allocation id", shard, nodeShardState.getNode(), finalAllocationId), nodeShardState.storeException()); + allocationId = null; + } } if (allocationId != null) { + assert nodeShardState.storeException() == null || + nodeShardState.storeException() instanceof ShardLockObtainFailedException : + "only allow store that can be opened or that throws a ShardLockObtainFailedException while being opened but got a store throwing " + nodeShardState.storeException(); numberOfAllocationsFound++; - if (inSyncAllocationIds.contains(allocationId)) { - if (nodeShardState.primary()) { - matchingNodeShardStates.addFirst(nodeShardState); - } else { - matchingNodeShardStates.addLast(nodeShardState); - } - } else if (matchAnyShard) { - if (nodeShardState.primary()) { - nonMatchingNodeShardStates.addFirst(nodeShardState); - } else { - nonMatchingNodeShardStates.addLast(nodeShardState); - } + if (matchAnyShard || inSyncAllocationIds.contains(nodeShardState.allocationId())) { + nodeShardStates.add(nodeShardState); } } } - List nodeShardStates = new ArrayList<>(); - nodeShardStates.addAll(matchingNodeShardStates); - nodeShardStates.addAll(nonMatchingNodeShardStates); + final Comparator comparator; // allocation preference + if (matchAnyShard) { + // prefer shards with matching allocation ids + Comparator matchingAllocationsFirst = Comparator.comparing( + (NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId())).reversed(); + comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR).thenComparing(PRIMARY_FIRST_COMPARATOR); + } else { + comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR); + } + + nodeShardStates.sort(comparator); if (logger.isTraceEnabled()) { logger.trace("{} candidates for allocation: {}", shard, nodeShardStates.stream().map(s -> s.getNode().getName()).collect(Collectors.joining(", "))); @@ -412,10 +422,19 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator { logger.trace("[{}] on node [{}] has allocation id [{}]", shard, nodeShardState.getNode(), nodeShardState.allocationId()); } } else { - final long finalVerison = version; - // when there is an store exception, we disregard the reported version and assign it as no version (same as shard does not exist) - logger.trace((Supplier) () -> new ParameterizedMessage("[{}] on node [{}] has version [{}] but the store can not be opened, treating no version", shard, nodeShardState.getNode(), finalVerison), nodeShardState.storeException()); - version = ShardStateMetaData.NO_VERSION; + final long finalVersion = version; + if (nodeShardState.storeException() instanceof ShardLockObtainFailedException) { + logger.trace((Supplier) () -> new ParameterizedMessage("[{}] on node [{}] has version [{}] but the store can not be opened as it's locked, treating as valid shard", shard, nodeShardState.getNode(), finalVersion), nodeShardState.storeException()); + if (nodeShardState.allocationId() != null) { + version = Long.MAX_VALUE; // shard was already selected in a 5.x cluster as primary, prefer this shard copy again. + } else { + version = 0L; // treat as lowest version so that this shard is the least likely to be selected as primary + } + } else { + // disregard the reported version and assign it as no version (same as shard does not exist) + logger.trace((Supplier) () -> new ParameterizedMessage("[{}] on node [{}] has version [{}] but the store can not be opened, treating no version", shard, nodeShardState.getNode(), finalVersion), nodeShardState.storeException()); + version = ShardStateMetaData.NO_VERSION; + } } if (version != ShardStateMetaData.NO_VERSION) { diff --git a/core/src/main/java/org/elasticsearch/index/store/Store.java b/core/src/main/java/org/elasticsearch/index/store/Store.java index 6f345b58797..fcc079d7909 100644 --- a/core/src/main/java/org/elasticsearch/index/store/Store.java +++ b/core/src/main/java/org/elasticsearch/index/store/Store.java @@ -414,15 +414,12 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref * segment infos and possible corruption markers. If the index can not * be opened, an exception is thrown */ - public static void tryOpenIndex(Path indexLocation, ShardId shardId, NodeEnvironment.ShardLocker shardLocker, Logger logger) throws IOException { + public static void tryOpenIndex(Path indexLocation, ShardId shardId, NodeEnvironment.ShardLocker shardLocker, Logger logger) throws IOException, ShardLockObtainFailedException { try (ShardLock lock = shardLocker.lock(shardId, TimeUnit.SECONDS.toMillis(5)); Directory dir = new SimpleFSDirectory(indexLocation)) { failIfCorrupted(dir, shardId); SegmentInfos segInfo = Lucene.readSegmentInfos(dir); logger.trace("{} loaded segment info [{}]", shardId, segInfo); - } catch (ShardLockObtainFailedException ex) { - logger.error((Supplier) () -> new ParameterizedMessage("{} unable to acquire shard lock", shardId), ex); - throw new IOException(ex); } } diff --git a/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java b/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java index 2af4d49f742..08d806fa790 100644 --- a/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java +++ b/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java @@ -46,6 +46,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.env.ShardLockObtainFailedException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardStateMetaData; import org.elasticsearch.snapshots.Snapshot; @@ -174,6 +175,69 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); } + /** + * Tests that when the node returns a ShardLockObtainFailedException, it will be considered as a valid shard copy + */ + public void testShardLockObtainFailedException() { + final RoutingAllocation allocation; + boolean useAllocationIds = randomBoolean(); + if (useAllocationIds) { + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, + randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1"); + testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean(), + new ShardLockObtainFailedException(shardId, "test")); + } else { + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_1_1); + testAllocator.addData(node1, 3, null, randomBoolean(), new ShardLockObtainFailedException(shardId, "test")); + } + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); + assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.getId())); + if (useAllocationIds) { + // check that allocation id is reused + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo("allocId1")); + } + assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); + } + + /** + * Tests that when one node returns a ShardLockObtainFailedException and another properly loads the store, it will + * select the second node as target + */ + public void testShardLockObtainFailedExceptionPreferOtherValidCopies() { + final RoutingAllocation allocation; + boolean useAllocationIds = randomBoolean(); + String allocId1 = randomAsciiOfLength(10); + String allocId2 = randomAsciiOfLength(10); + if (useAllocationIds) { + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, + randomFrom(Version.V_2_0_0, Version.CURRENT), allocId1, allocId2); + testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, allocId1, randomBoolean(), + new ShardLockObtainFailedException(shardId, "test")); + testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, allocId2, randomBoolean(), null); + } else { + allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_1_1); + testAllocator.addData(node1, 3, null, randomBoolean(), new ShardLockObtainFailedException(shardId, "test")); + if (randomBoolean()) { + testAllocator.addData(node2, randomIntBetween(2, 4), null, randomBoolean(), null); + } else { + testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, "some alloc id", randomBoolean(), null); + } + } + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); + assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.getId())); + if (useAllocationIds) { + // check that allocation id is reused + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo(allocId2)); + } + assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); + } + /** * Tests that when there is a node to allocate the shard to, it will be allocated to it. */