From 51d6b9ab316906abc80950c7f4a980ff2bf1a6cb Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 12 Feb 2019 14:02:56 +0100 Subject: [PATCH] Fix CloseWhileRelocatingShardsIT (#38728) --- .../state/CloseWhileRelocatingShardsIT.java | 81 +++++++++++++------ 1 file changed, 56 insertions(+), 25 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java index 99c50a839ab..2125184baef 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java @@ -18,8 +18,10 @@ */ package org.elasticsearch.indices.state; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; @@ -32,13 +34,14 @@ import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocation import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.recovery.PeerRecoverySourceService; import org.elasticsearch.indices.recovery.StartRecoveryRequest; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.test.transport.StubbableTransport; import org.elasticsearch.transport.TransportService; import java.util.ArrayList; @@ -57,6 +60,7 @@ import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsClosed; import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsOpened; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; @ESIntegTestCase.ClusterScope(minNumDataNodes = 2) public class CloseWhileRelocatingShardsIT extends ESIntegTestCase { @@ -68,9 +72,11 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase { @Override protected Settings nodeSettings(int nodeOrdinal) { + final int maxRecoveries = Integer.MAX_VALUE; return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), Integer.MAX_VALUE) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), maxRecoveries) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), maxRecoveries) .put(ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(), -1) .build(); } @@ -80,7 +86,6 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase { return 3; } - @TestLogging("org.elasticsearch.cluster.metadata.MetaDataIndexStateService:DEBUG,org.elasticsearch.action.admin.indices.close:DEBUG") public void testCloseWhileRelocatingShards() throws Exception { final String[] indices = new String[randomIntBetween(3, 5)]; final Map docsPerIndex = new HashMap<>(); @@ -119,21 +124,19 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase { final String targetNode = internalCluster().startDataOnlyNode(); ensureClusterSizeConsistency(); // wait for the master to finish processing join. - final MockTransportService targetTransportService = - (MockTransportService) internalCluster().getInstance(TransportService.class, targetNode); - final Set acknowledgedCloses = ConcurrentCollections.newConcurrentSet(); try { final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName()); + final ClusterState state = clusterService.state(); final CountDownLatch latch = new CountDownLatch(indices.length); - final CountDownLatch release = new CountDownLatch(1); + final CountDownLatch release = new CountDownLatch(indices.length); // relocate one shard for every index to be closed final AllocationCommands commands = new AllocationCommands(); for (final String index : indices) { final NumShards numShards = getNumShards(index); final int shardId = numShards.numPrimaries == 1 ? 0 : randomIntBetween(0, numShards.numPrimaries - 1); - final IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(index); + final IndexRoutingTable indexRoutingTable = state.routingTable().index(index); final ShardRouting primary = indexRoutingTable.shard(shardId).primaryShard(); assertTrue(primary.started()); @@ -146,24 +149,49 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase { currentNodeId = replica.currentNodeId(); } } + commands.add(new MoveAllocationCommand(index, shardId, state.nodes().resolveNode(currentNodeId).getName(), targetNode)); + } - final DiscoveryNode sourceNode = clusterService.state().nodes().resolveNode(primary.currentNodeId()); - targetTransportService.addSendBehavior(internalCluster().getInstance(TransportService.class, sourceNode.getName()), - (connection, requestId, action, request, options) -> { - if (PeerRecoverySourceService.Actions.START_RECOVERY.equals(action)) { - logger.debug("blocking recovery of shard {}", ((StartRecoveryRequest) request).shardId()); - latch.countDown(); - try { - release.await(); - logger.debug("releasing recovery of shard {}", ((StartRecoveryRequest) request).shardId()); - } catch (InterruptedException e) { - throw new AssertionError(e); - } - } - connection.sendRequest(requestId, action, request, options); + // Build the list of shards for which recoveries will be blocked + final Set blockedShards = commands.commands().stream() + .map(c -> (MoveAllocationCommand) c) + .map(c -> new ShardId(clusterService.state().metaData().index(c.index()).getIndex(), c.shardId())) + .collect(Collectors.toSet()); + assertThat(blockedShards, hasSize(indices.length)); + + final Set acknowledgedCloses = ConcurrentCollections.newConcurrentSet(); + final Set interruptedRecoveries = ConcurrentCollections.newConcurrentSet(); + + // Create a SendRequestBehavior that will block outgoing start recovery request + final StubbableTransport.SendRequestBehavior sendBehavior = (connection, requestId, action, request, options) -> { + if (PeerRecoverySourceService.Actions.START_RECOVERY.equals(action)) { + final StartRecoveryRequest startRecoveryRequest = ((StartRecoveryRequest) request); + if (blockedShards.contains(startRecoveryRequest.shardId())) { + logger.debug("blocking recovery of shard {}", startRecoveryRequest.shardId()); + latch.countDown(); + try { + release.await(); + logger.debug("releasing recovery of shard {}", startRecoveryRequest.shardId()); + } catch (final InterruptedException e) { + logger.warn(() -> new ParameterizedMessage("exception when releasing recovery of shard {}", + startRecoveryRequest.shardId()), e); + interruptedRecoveries.add(startRecoveryRequest.shardId().getIndexName()); + Thread.currentThread().interrupt(); + return; } - ); - commands.add(new MoveAllocationCommand(index, shardId, currentNodeId, targetNode)); + } + } + connection.sendRequest(requestId, action, request, options); + }; + + final MockTransportService targetTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, targetNode); + + for (DiscoveryNode node : state.getNodes()) { + if (node.isDataNode() && node.getName().equals(targetNode) == false) { + final TransportService sourceTransportService = internalCluster().getInstance(TransportService.class, node.getName()); + targetTransportService.addSendBehavior(sourceTransportService, sendBehavior); + } } assertAcked(client().admin().cluster().reroute(new ClusterRerouteRequest().commands(commands)).get()); @@ -222,12 +250,15 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase { targetTransportService.clearAllRules(); + // If a shard recovery has been interrupted, we expect its index to be closed + interruptedRecoveries.forEach(CloseIndexIT::assertIndexIsClosed); + assertThat("Consider that the test failed if no indices were successfully closed", acknowledgedCloses.size(), greaterThan(0)); assertAcked(client().admin().indices().prepareOpen("index-*")); ensureGreen(indices); for (String index : acknowledgedCloses) { - long docsCount = client().prepareSearch(index).setSize(0).get().getHits().getTotalHits().value; + long docsCount = client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get().getHits().getTotalHits().value; assertEquals("Expected " + docsPerIndex.get(index) + " docs in index " + index + " but got " + docsCount + " (close acknowledged=" + acknowledgedCloses.contains(index) + ")", (long) docsPerIndex.get(index), docsCount); }