diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index eb139589724..b238946e0f6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -396,11 +396,9 @@ public class AllocationService { assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation.nodes()).isEmpty() : "auto-expand replicas out of sync with number of nodes in the cluster"; - // now allocate all the unassigned to available nodes - if (allocation.routingNodes().unassigned().size() > 0) { - removeDelayMarkers(allocation); - gatewayAllocator.allocateUnassigned(allocation); - } + removeDelayMarkers(allocation); + // try to allocate existing shard copies first + gatewayAllocator.allocateUnassigned(allocation); shardsAllocator.allocate(allocation); assert RoutingNodes.assertShardStats(allocation.routingNodes()); diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index 62ef18d20d2..adbbe9ee32f 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -117,7 +117,10 @@ public class GatewayAllocator { unassigned.sort(PriorityComparator.getAllocationComparator(allocation)); // sort for priority ordering primaryShardAllocator.allocateUnassigned(allocation); - replicaShardAllocator.processExistingRecoveries(allocation); + if (allocation.routingNodes().hasInactiveShards()) { + // cancel existing recoveries if we have a better match + replicaShardAllocator.processExistingRecoveries(allocation); + } replicaShardAllocator.allocateUnassigned(allocation); } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index d3110461e37..5ebf84a8911 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -46,6 +46,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.PeerRecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; @@ -324,6 +325,86 @@ public class IndexRecoveryIT extends ESIntegTestCase { assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), numOfDocs); } + public void testCancelNewShardRecoveryAndUsesExistingShardCopy() throws Exception { + logger.info("--> start node A"); + final String nodeA = internalCluster().startNode(); + + logger.info("--> create index on node: {}", nodeA); + createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT) + .getShards()[0].getStats().getStore().size(); + + logger.info("--> start node B"); + // force a shard recovery from nodeA to nodeB + final String nodeB = internalCluster().startNode(); + + logger.info("--> add replica for {} on node: {}", INDEX_NAME, nodeB); + assertAcked(client().admin().indices().prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), 0))); + ensureGreen(INDEX_NAME); + + logger.info("--> start node C"); + final String nodeC = internalCluster().startNode(); + + // do sync flush to gen sync id + assertThat(client().admin().indices().prepareSyncedFlush(INDEX_NAME).get().failedShards(), equalTo(0)); + + // hold peer recovery on phase 2 after nodeB down + CountDownLatch phase1ReadyBlocked = new CountDownLatch(1); + CountDownLatch allowToCompletePhase1Latch = new CountDownLatch(1); + MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeA); + transportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (PeerRecoveryTargetService.Actions.CLEAN_FILES.equals(action)) { + phase1ReadyBlocked.countDown(); + try { + allowToCompletePhase1Latch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + connection.sendRequest(requestId, action, request, options); + }); + + logger.info("--> restart node B"); + internalCluster().restartNode(nodeB, + new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + phase1ReadyBlocked.await(); + // nodeB stopped, peer recovery from nodeA to nodeC, it will be cancelled after nodeB get started. + RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); + + List recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); + List nodeCRecoveryStates = findRecoveriesForTargetNode(nodeC, recoveryStates); + assertThat(nodeCRecoveryStates.size(), equalTo(1)); + + assertOnGoingRecoveryState(nodeCRecoveryStates.get(0), 0, PeerRecoverySource.INSTANCE, + false, nodeA, nodeC); + validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex()); + + return super.onNodeStopped(nodeName); + } + }); + + // wait for peer recovery from nodeA to nodeB which is a no-op recovery so it skips the CLEAN_FILES stage and hence is not blocked + ensureGreen(); + allowToCompletePhase1Latch.countDown(); + transportService.clearAllRules(); + + // make sure nodeA has primary and nodeB has replica + ClusterState state = client().admin().cluster().prepareState().get().getState(); + List startedShards = state.routingTable().shardsWithState(ShardRoutingState.STARTED); + assertThat(startedShards.size(), equalTo(2)); + for (ShardRouting shardRouting : startedShards) { + if (shardRouting.primary()) { + assertThat(state.nodes().get(shardRouting.currentNodeId()).getName(), equalTo(nodeA)); + } else { + assertThat(state.nodes().get(shardRouting.currentNodeId()).getName(), equalTo(nodeB)); + } + } + } + public void testRerouteRecovery() throws Exception { logger.info("--> start node A"); final String nodeA = internalCluster().startNode();