diff --git a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java index 9a4783a2fa2..8162c951534 100644 --- a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java +++ b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java @@ -20,27 +20,33 @@ package org.elasticsearch.indices.store; import com.google.common.base.Predicate; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.recovery.RecoverySource; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import org.elasticsearch.test.InternalTestCluster; -import org.elasticsearch.test.disruption.SlowClusterStateProcessing; +import org.elasticsearch.test.disruption.BlockClusterStateProcessing; +import org.elasticsearch.test.disruption.SingleNodeDisruption; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportModule; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportService; import org.junit.Test; import java.io.IOException; @@ -48,9 +54,10 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; import java.util.List; -import java.util.concurrent.Future; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static java.lang.Thread.sleep; import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -69,6 +76,7 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest { // which is between 1 and 2 sec can cause each of the shard deletion requests to timeout. // to prevent this we are setting the timeout here to something highish ie. the default in practice .put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT, new TimeValue(30, TimeUnit.SECONDS)) + .put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName()) .build(); } @@ -79,7 +87,7 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest { } @Test - @LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/11989") + @Slow public void indexCleanup() throws Exception { final String masterNode = internalCluster().startNode(Settings.builder().put("node.data", false)); final String node_1 = internalCluster().startNode(Settings.builder().put("node.master", false)); @@ -115,24 +123,30 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest { assertThat(Files.exists(indexDirectory(node_3, "test")), equalTo(false)); logger.info("--> move shard from node_1 to node_3, and wait for relocation to finish"); - SlowClusterStateProcessing disruption = null; + if (randomBoolean()) { // sometimes add cluster-state delay to trigger observers in IndicesStore.ShardActiveRequestHandler - disruption = new SlowClusterStateProcessing(node_3, getRandom(), 0, 0, 1000, 2000); + SingleNodeDisruption disruption = new BlockClusterStateProcessing(node_3, getRandom()); internalCluster().setDisruptionScheme(disruption); + MockTransportService transportServiceNode3 = (MockTransportService) internalCluster().getInstance(TransportService.class, node_3); + CountDownLatch beginRelocationLatch = new CountDownLatch(1); + CountDownLatch endRelocationLatch = new CountDownLatch(1); + transportServiceNode3.addTracer(new ReclocationStartEndTracer(logger, beginRelocationLatch, endRelocationLatch)); + internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_3)).get(); + // wait for relocation to start + beginRelocationLatch.await(); disruption.startDisrupting(); + // wait for relocation to finish + endRelocationLatch.await(); + // wait a little so that cluster state observer is registered + sleep(50); + disruption.stopDisrupting(); + } else { + internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_3)).get(); } - internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_3)).get(); clusterHealth = client().admin().cluster().prepareHealth() - .setWaitForNodes("4") .setWaitForRelocatingShards(0) .get(); assertThat(clusterHealth.isTimedOut(), equalTo(false)); - if (disruption != null) { - // we must stop the disruption here, else the delayed cluster state processing on the disrupted node - // can potentially delay registering the observer in IndicesStore.ShardActiveRequestHandler.messageReceived() - // and therefore sending the response for the shard active request for more than 10s - disruption.stopDisrupting(); - } assertThat(waitForShardDeletion(node_1, "test", 0), equalTo(false)); assertThat(waitForIndexDeletion(node_1, "test"), equalTo(false)); @@ -203,7 +217,8 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest { assertThat(waitForShardDeletion(node_4, "test", 0), equalTo(false)); } - @Test @Slow + @Test + @Slow public void testShardActiveElseWhere() throws Exception { List nodes = internalCluster().startNodesAsync(2).get(); @@ -258,6 +273,7 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest { .build(); } + @Override public boolean runOnlyOnMaster() { return false; } @@ -306,4 +322,45 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest { }); return Files.exists(indexDirectory(server, index)); } + + /** + * This Tracer can be used to signal start and end of a recovery. + * This is used to test the following: + * Whenever a node deletes a shard because it was relocated somewhere else, it first + * checks if enough other copies are started somewhere else. The node sends a ShardActiveRequest + * to the other nodes that should have a copy according to cluster state. + * The nodes that receive this request check if the shard is in state STARTED in which case they + * respond with "true". If they have the shard in POST_RECOVERY they register a cluster state + * observer that checks at each update if the shard has moved to STARTED. + * To test that this mechanism actually works, this can be triggered by blocking the cluster + * state processing when a recover starts and only unblocking it shortly after the node receives + * the ShardActiveRequest. + */ + static class ReclocationStartEndTracer extends MockTransportService.Tracer { + private final ESLogger logger; + private final CountDownLatch beginRelocationLatch; + private final CountDownLatch receivedShardExistsRequestLatch; + + ReclocationStartEndTracer(ESLogger logger, CountDownLatch beginRelocationLatch, CountDownLatch receivedShardExistsRequestLatch) { + this.logger = logger; + this.beginRelocationLatch = beginRelocationLatch; + this.receivedShardExistsRequestLatch = receivedShardExistsRequestLatch; + } + + @Override + public void receivedRequest(long requestId, String action) { + if (action.equals(IndicesStore.ACTION_SHARD_EXISTS)) { + receivedShardExistsRequestLatch.countDown(); + logger.info("received: {}, relocation done", action); + } + } + + @Override + public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) { + if (action.equals(RecoverySource.Actions.START_RECOVERY)) { + logger.info("sent: {}, relocation starts", action); + beginRelocationLatch.countDown(); + } + } + } }