diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index 784ba65796d..595d9a6d3f1 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -40,7 +40,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.Murmur3HashFunction; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; @@ -67,7 +66,6 @@ import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.discovery.ClusterDiscoveryConfiguration; import org.elasticsearch.test.discovery.TestZenDiscovery; -import org.elasticsearch.test.disruption.BlockClusterStateProcessing; import org.elasticsearch.test.disruption.IntermittentLongGCDisruption; import org.elasticsearch.test.disruption.LongGCDisruption; import org.elasticsearch.test.disruption.NetworkDisruption; @@ -1127,20 +1125,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { .setSource("{\"int_field\":1}", XContentType.JSON)); } indexRandom(true, indexRequestBuilderList); - SingleNodeDisruption disruption = new BlockClusterStateProcessing(node_2, random()); - internalCluster().setDisruptionScheme(disruption); - MockTransportService transportServiceNode2 = (MockTransportService) internalCluster().getInstance(TransportService.class, node_2); - CountDownLatch beginRelocationLatch = new CountDownLatch(1); - CountDownLatch endRelocationLatch = new CountDownLatch(1); - transportServiceNode2.addTracer(new IndicesStoreIntegrationIT.ReclocationStartEndTracer(logger, beginRelocationLatch, - endRelocationLatch)); - internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, node_1, node_2)).get(); - // wait for relocation to start - beginRelocationLatch.await(); - disruption.startDisrupting(); - // wait for relocation to finish - endRelocationLatch.await(); + IndicesStoreIntegrationIT.relocateAndBlockCompletion(logger, "test", 0, node_1, node_2); // now search for the documents and see if we get a reply assertThat(client().prepareSearch().setSize(0).get().getHits().getTotalHits(), equalTo(100L)); } diff --git a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java index df32324987c..13f00b152df 100644 --- a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java +++ b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java @@ -52,7 +52,6 @@ import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.disruption.BlockClusterStateProcessing; -import org.elasticsearch.test.disruption.SingleNodeDisruption; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.TransportRequest; @@ -134,21 +133,7 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase { logger.info("--> move shard from node_1 to node_3, and wait for relocation to finish"); if (randomBoolean()) { // sometimes add cluster-state delay to trigger observers in IndicesStore.ShardActiveRequestHandler - SingleNodeDisruption disruption = new BlockClusterStateProcessing(node_3, random()); - internalCluster().setDisruptionScheme(disruption); - MockTransportService transportServiceNode3 = (MockTransportService) internalCluster().getInstance(TransportService.class, node_3); - CountDownLatch beginRelocationLatch = new CountDownLatch(1); - CountDownLatch endRelocationLatch = new CountDownLatch(1); - transportServiceNode3.addTracer(new ReclocationStartEndTracer(logger, beginRelocationLatch, endRelocationLatch)); - internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, node_1, node_3)).get(); - // wait for relocation to start - logger.info("--> waiting for relocation to start"); - beginRelocationLatch.await(); - logger.info("--> starting disruption"); - disruption.startDisrupting(); - // wait for relocation to finish - logger.info("--> waiting for relocation to finish"); - endRelocationLatch.await(); + BlockClusterStateProcessing disruption = relocateAndBlockCompletion(logger, "test", 0, node_1, node_3); // wait a little so that cluster state observer is registered sleep(50); logger.info("--> stopping disruption"); @@ -170,6 +155,56 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase { } + /** + * relocate a shard and block cluster state processing on the relocation target node to activate the shard + */ + public static BlockClusterStateProcessing relocateAndBlockCompletion(Logger logger, String index, int shard, String nodeFrom, + String nodeTo) throws InterruptedException { + BlockClusterStateProcessing disruption = new BlockClusterStateProcessing(nodeTo, random()); + internalCluster().setDisruptionScheme(disruption); + MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeTo); + ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nodeTo); + CountDownLatch beginRelocationLatch = new CountDownLatch(1); + CountDownLatch receivedShardExistsRequestLatch = new CountDownLatch(1); + // use a tracer on the target node to track relocation start and end + transportService.addTracer(new MockTransportService.Tracer() { + @Override + public void receivedRequest(long requestId, String action) { + if (action.equals(PeerRecoveryTargetService.Actions.FILES_INFO)) { + logger.info("received: {}, relocation starts", action); + beginRelocationLatch.countDown(); + } else if (action.equals(IndicesStore.ACTION_SHARD_EXISTS)) { + // Whenever a node deletes a shard because it was relocated somewhere else, it first + // checks if enough other copies are started somewhere else. The node sends a ShardActiveRequest + // to the other nodes that should have a copy according to cluster state. + receivedShardExistsRequestLatch.countDown(); + logger.info("received: {}, relocation done", action); + } else if (action.equals(PeerRecoveryTargetService.Actions.WAIT_CLUSTERSTATE)) { + logger.info("received: {}, waiting on cluster state", action); + // ensure that relocation target node is on the same cluster state as relocation source before proceeding with + // this request. If the target does not have the relocating cluster state exposed through ClusterService.state(), + // then waitForClusterState will have to register a ClusterObserver with the ClusterService, which can cause + // a race with the BlockClusterStateProcessing block that is added below. + try { + assertBusy(() -> assertTrue( + clusterService.state().routingTable().index(index).shard(shard).primaryShard().relocating())); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + }); + internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(index, shard, nodeFrom, nodeTo)).get(); + logger.info("--> waiting for relocation to start"); + beginRelocationLatch.await(); + logger.info("--> starting disruption"); + disruption.startDisrupting(); + logger.info("--> waiting for relocation to finish"); + receivedShardExistsRequestLatch.await(); + logger.info("--> relocation completed (but cluster state processing block still in place)"); + return disruption; + } + /* Test that shard is deleted in case ShardActiveRequest after relocation and next incoming cluster state is an index delete. */ public void testShardCleanupIfShardDeletionAfterRelocationFailedAndIndexDeleted() throws Exception { final String node_1 = internalCluster().startNode(); @@ -449,40 +484,4 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase { awaitBusy(() -> !Files.exists(indexDirectory(server, index))); return Files.exists(indexDirectory(server, index)); } - - /** - * This Tracer can be used to signal start and end of a recovery. - * This is used to test the following: - * Whenever a node deletes a shard because it was relocated somewhere else, it first - * checks if enough other copies are started somewhere else. The node sends a ShardActiveRequest - * to the other nodes that should have a copy according to cluster state. - * The nodes that receive this request check if the shard is in state STARTED in which case they - * respond with "true". If they have the shard in POST_RECOVERY they register a cluster state - * observer that checks at each update if the shard has moved to STARTED. - * To test that this mechanism actually works, this can be triggered by blocking the cluster - * state processing when a recover starts and only unblocking it shortly after the node receives - * the ShardActiveRequest. - */ - public static class ReclocationStartEndTracer extends MockTransportService.Tracer { - private final Logger logger; - private final CountDownLatch beginRelocationLatch; - private final CountDownLatch receivedShardExistsRequestLatch; - - public ReclocationStartEndTracer(Logger logger, CountDownLatch beginRelocationLatch, CountDownLatch receivedShardExistsRequestLatch) { - this.logger = logger; - this.beginRelocationLatch = beginRelocationLatch; - this.receivedShardExistsRequestLatch = receivedShardExistsRequestLatch; - } - - @Override - public void receivedRequest(long requestId, String action) { - if (action.equals(PeerRecoveryTargetService.Actions.FILES_INFO)) { - logger.info("received: {}, relocation starts", action); - beginRelocationLatch.countDown(); - } else if (action.equals(IndicesStore.ACTION_SHARD_EXISTS)) { - receivedShardExistsRequestLatch.countDown(); - logger.info("received: {}, relocation done", action); - } - } - } }