From d78cd66b519cd130402b77d37f6ddcf57850aae8 Mon Sep 17 00:00:00 2001 From: Britta Weber Date: Tue, 21 Jul 2015 22:06:55 +0200 Subject: [PATCH] [Test] make cluster state blocking more reliable in IndicesStoreIntegrationTests.indexCleanup() IndicesStoreIntegrationTests.indexCleanup() tests if the shard files on disk are actually removed after relocation. In particular it tests the following: Whenever a node deletes a shard because it was relocated somewhere else, it first checks if enough other copies are started somewhere else. The node sends a ShardActiveRequest to the nodes that should have a copy. The nodes that receive this request check if the shard is in state STARTED in which case they respond with true. If they have the shard in POST_RECOVERY they register a cluster state observer that checks at each update if the shard has moved to STARTED and respond with true when this happens. To test that the cluster state observer mechanism actually works, the latter can be triggered by blocking the cluster state processing when a recover starts and only unblocking it shortly after the node receives the ShardActiveRequest. This is more reliable than using random cluster state processing delays because the random delays make it hard to reason about different timeouts that can be reached. closes #11989 --- .../store/IndicesStoreIntegrationTests.java | 89 +++++++++++++++---- 1 file changed, 73 insertions(+), 16 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java index 9a4783a2fa2..8162c951534 100644 --- a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java +++ b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java @@ -20,27 +20,33 @@ package org.elasticsearch.indices.store; import com.google.common.base.Predicate; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.recovery.RecoverySource; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import org.elasticsearch.test.InternalTestCluster; -import org.elasticsearch.test.disruption.SlowClusterStateProcessing; +import org.elasticsearch.test.disruption.BlockClusterStateProcessing; +import org.elasticsearch.test.disruption.SingleNodeDisruption; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportModule; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportService; import org.junit.Test; import java.io.IOException; @@ -48,9 +54,10 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; import java.util.List; -import java.util.concurrent.Future; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static java.lang.Thread.sleep; import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -69,6 +76,7 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest { // which is between 1 and 2 sec can cause each of the shard deletion requests to timeout. // to prevent this we are setting the timeout here to something highish ie. the default in practice .put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT, new TimeValue(30, TimeUnit.SECONDS)) + .put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName()) .build(); } @@ -79,7 +87,7 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest { } @Test - @LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/11989") + @Slow public void indexCleanup() throws Exception { final String masterNode = internalCluster().startNode(Settings.builder().put("node.data", false)); final String node_1 = internalCluster().startNode(Settings.builder().put("node.master", false)); @@ -115,24 +123,30 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest { assertThat(Files.exists(indexDirectory(node_3, "test")), equalTo(false)); logger.info("--> move shard from node_1 to node_3, and wait for relocation to finish"); - SlowClusterStateProcessing disruption = null; + if (randomBoolean()) { // sometimes add cluster-state delay to trigger observers in IndicesStore.ShardActiveRequestHandler - disruption = new SlowClusterStateProcessing(node_3, getRandom(), 0, 0, 1000, 2000); + SingleNodeDisruption disruption = new BlockClusterStateProcessing(node_3, getRandom()); internalCluster().setDisruptionScheme(disruption); + MockTransportService transportServiceNode3 = (MockTransportService) internalCluster().getInstance(TransportService.class, node_3); + CountDownLatch beginRelocationLatch = new CountDownLatch(1); + CountDownLatch endRelocationLatch = new CountDownLatch(1); + transportServiceNode3.addTracer(new ReclocationStartEndTracer(logger, beginRelocationLatch, endRelocationLatch)); + internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_3)).get(); + // wait for relocation to start + beginRelocationLatch.await(); disruption.startDisrupting(); + // wait for relocation to finish + endRelocationLatch.await(); + // wait a little so that cluster state observer is registered + sleep(50); + disruption.stopDisrupting(); + } else { + internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_3)).get(); } - internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_3)).get(); clusterHealth = client().admin().cluster().prepareHealth() - .setWaitForNodes("4") .setWaitForRelocatingShards(0) .get(); assertThat(clusterHealth.isTimedOut(), equalTo(false)); - if (disruption != null) { - // we must stop the disruption here, else the delayed cluster state processing on the disrupted node - // can potentially delay registering the observer in IndicesStore.ShardActiveRequestHandler.messageReceived() - // and therefore sending the response for the shard active request for more than 10s - disruption.stopDisrupting(); - } assertThat(waitForShardDeletion(node_1, "test", 0), equalTo(false)); assertThat(waitForIndexDeletion(node_1, "test"), equalTo(false)); @@ -203,7 +217,8 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest { assertThat(waitForShardDeletion(node_4, "test", 0), equalTo(false)); } - @Test @Slow + @Test + @Slow public void testShardActiveElseWhere() throws Exception { List nodes = internalCluster().startNodesAsync(2).get(); @@ -258,6 +273,7 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest { .build(); } + @Override public boolean runOnlyOnMaster() { return false; } @@ -306,4 +322,45 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest { }); return Files.exists(indexDirectory(server, index)); } + + /** + * This Tracer can be used to signal start and end of a recovery. + * This is used to test the following: + * Whenever a node deletes a shard because it was relocated somewhere else, it first + * checks if enough other copies are started somewhere else. The node sends a ShardActiveRequest + * to the other nodes that should have a copy according to cluster state. + * The nodes that receive this request check if the shard is in state STARTED in which case they + * respond with "true". If they have the shard in POST_RECOVERY they register a cluster state + * observer that checks at each update if the shard has moved to STARTED. + * To test that this mechanism actually works, this can be triggered by blocking the cluster + * state processing when a recover starts and only unblocking it shortly after the node receives + * the ShardActiveRequest. + */ + static class ReclocationStartEndTracer extends MockTransportService.Tracer { + private final ESLogger logger; + private final CountDownLatch beginRelocationLatch; + private final CountDownLatch receivedShardExistsRequestLatch; + + ReclocationStartEndTracer(ESLogger logger, CountDownLatch beginRelocationLatch, CountDownLatch receivedShardExistsRequestLatch) { + this.logger = logger; + this.beginRelocationLatch = beginRelocationLatch; + this.receivedShardExistsRequestLatch = receivedShardExistsRequestLatch; + } + + @Override + public void receivedRequest(long requestId, String action) { + if (action.equals(IndicesStore.ACTION_SHARD_EXISTS)) { + receivedShardExistsRequestLatch.countDown(); + logger.info("received: {}, relocation done", action); + } + } + + @Override + public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) { + if (action.equals(RecoverySource.Actions.START_RECOVERY)) { + logger.info("sent: {}, relocation starts", action); + beginRelocationLatch.countDown(); + } + } + } }