From a9cd42c05d95c44a7c3d73eaa8cb624cba60644b Mon Sep 17 00:00:00 2001 From: Howard Date: Tue, 1 Oct 2019 17:53:53 +0800 Subject: [PATCH] Cancel recoveries even if all shards assigned (#46520) We cancel ongoing peer recoveries if a node joins the cluster with a completely up-to-date copy of a shard, because we can use such a copy to recover a replica instantly. However, today we only look for recoveries to cancel while there are unassigned shards in the cluster. This means that we do not contemplate the cancellation of the last few recoveries since recovering shards are not unassigned. It might take much longer for these recoveries to complete than would be necessary if they were cancelled. This commit fixes this by checking for cancellable recoveries even if all shards are assigned. --- .../routing/allocation/AllocationService.java | 8 +- .../gateway/GatewayAllocator.java | 5 +- .../indices/recovery/IndexRecoveryIT.java | 81 +++++++++++++++++++ 3 files changed, 88 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index eb139589724..b238946e0f6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -396,11 +396,9 @@ public class AllocationService { assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation.nodes()).isEmpty() : "auto-expand replicas out of sync with number of nodes in the cluster"; - // now allocate all the unassigned to available nodes - if (allocation.routingNodes().unassigned().size() > 0) { - removeDelayMarkers(allocation); - gatewayAllocator.allocateUnassigned(allocation); - } + removeDelayMarkers(allocation); + // try to allocate existing shard copies first + gatewayAllocator.allocateUnassigned(allocation); shardsAllocator.allocate(allocation); assert RoutingNodes.assertShardStats(allocation.routingNodes()); diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index 62ef18d20d2..adbbe9ee32f 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -117,7 +117,10 @@ public class GatewayAllocator { unassigned.sort(PriorityComparator.getAllocationComparator(allocation)); // sort for priority ordering primaryShardAllocator.allocateUnassigned(allocation); - replicaShardAllocator.processExistingRecoveries(allocation); + if (allocation.routingNodes().hasInactiveShards()) { + // cancel existing recoveries if we have a better match + replicaShardAllocator.processExistingRecoveries(allocation); + } replicaShardAllocator.allocateUnassigned(allocation); } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index d3110461e37..5ebf84a8911 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -46,6 +46,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.PeerRecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; @@ -324,6 +325,86 @@ public class IndexRecoveryIT extends ESIntegTestCase { assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), numOfDocs); } + public void testCancelNewShardRecoveryAndUsesExistingShardCopy() throws Exception { + logger.info("--> start node A"); + final String nodeA = internalCluster().startNode(); + + logger.info("--> create index on node: {}", nodeA); + createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT) + .getShards()[0].getStats().getStore().size(); + + logger.info("--> start node B"); + // force a shard recovery from nodeA to nodeB + final String nodeB = internalCluster().startNode(); + + logger.info("--> add replica for {} on node: {}", INDEX_NAME, nodeB); + assertAcked(client().admin().indices().prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), 0))); + ensureGreen(INDEX_NAME); + + logger.info("--> start node C"); + final String nodeC = internalCluster().startNode(); + + // do sync flush to gen sync id + assertThat(client().admin().indices().prepareSyncedFlush(INDEX_NAME).get().failedShards(), equalTo(0)); + + // hold peer recovery on phase 2 after nodeB down + CountDownLatch phase1ReadyBlocked = new CountDownLatch(1); + CountDownLatch allowToCompletePhase1Latch = new CountDownLatch(1); + MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeA); + transportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (PeerRecoveryTargetService.Actions.CLEAN_FILES.equals(action)) { + phase1ReadyBlocked.countDown(); + try { + allowToCompletePhase1Latch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + connection.sendRequest(requestId, action, request, options); + }); + + logger.info("--> restart node B"); + internalCluster().restartNode(nodeB, + new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + phase1ReadyBlocked.await(); + // nodeB stopped, peer recovery from nodeA to nodeC, it will be cancelled after nodeB get started. + RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); + + List recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); + List nodeCRecoveryStates = findRecoveriesForTargetNode(nodeC, recoveryStates); + assertThat(nodeCRecoveryStates.size(), equalTo(1)); + + assertOnGoingRecoveryState(nodeCRecoveryStates.get(0), 0, PeerRecoverySource.INSTANCE, + false, nodeA, nodeC); + validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex()); + + return super.onNodeStopped(nodeName); + } + }); + + // wait for peer recovery from nodeA to nodeB which is a no-op recovery so it skips the CLEAN_FILES stage and hence is not blocked + ensureGreen(); + allowToCompletePhase1Latch.countDown(); + transportService.clearAllRules(); + + // make sure nodeA has primary and nodeB has replica + ClusterState state = client().admin().cluster().prepareState().get().getState(); + List startedShards = state.routingTable().shardsWithState(ShardRoutingState.STARTED); + assertThat(startedShards.size(), equalTo(2)); + for (ShardRouting shardRouting : startedShards) { + if (shardRouting.primary()) { + assertThat(state.nodes().get(shardRouting.currentNodeId()).getName(), equalTo(nodeA)); + } else { + assertThat(state.nodes().get(shardRouting.currentNodeId()).getName(), equalTo(nodeB)); + } + } + } + public void testRerouteRecovery() throws Exception { logger.info("--> start node A"); final String nodeA = internalCluster().startNode();