[Test] make cluster state blocking more reliable in IndicesStoreIntegrationTests.indexCleanup()

IndicesStoreIntegrationTests.indexCleanup() tests if the shard files on disk are actually removed
after relocation. In particular it tests the following:
Whenever a node deletes a shard because it was relocated somewhere else, it first
checks if enough other copies are started somewhere else. The node sends a
ShardActiveRequest to the nodes that should have a copy.
The nodes that receive this request check if the shard is in state STARTED in which case
they respond with true. If they have the shard in POST_RECOVERY they register a cluster state
observer that checks at each update if the shard has moved to STARTED and respond with true when
this happens.

To test that the cluster state observer mechanism actually works, the latter can be triggered by
blocking the cluster state processing when a recover starts and only unblocking it shortly
after the node receives the ShardActiveRequest.

This is more reliable than using random cluster state processing delays
because the random delays make it hard to reason about different timeouts
that can be reached.

closes #11989
This commit is contained in:
Britta Weber 2015-07-21 22:06:55 +02:00
parent 14f8671a48
commit d78cd66b51
1 changed files with 73 additions and 16 deletions

View File

@ -20,27 +20,33 @@
package org.elasticsearch.indices.store; package org.elasticsearch.indices.store;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.recovery.RecoverySource;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.disruption.SlowClusterStateProcessing; import org.elasticsearch.test.disruption.BlockClusterStateProcessing;
import org.elasticsearch.test.disruption.SingleNodeDisruption;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportModule;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
@ -48,9 +54,10 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.Future; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static java.lang.Thread.sleep;
import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ -69,6 +76,7 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
// which is between 1 and 2 sec can cause each of the shard deletion requests to timeout. // which is between 1 and 2 sec can cause each of the shard deletion requests to timeout.
// to prevent this we are setting the timeout here to something highish ie. the default in practice // to prevent this we are setting the timeout here to something highish ie. the default in practice
.put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT, new TimeValue(30, TimeUnit.SECONDS)) .put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT, new TimeValue(30, TimeUnit.SECONDS))
.put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName())
.build(); .build();
} }
@ -79,7 +87,7 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
} }
@Test @Test
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/11989") @Slow
public void indexCleanup() throws Exception { public void indexCleanup() throws Exception {
final String masterNode = internalCluster().startNode(Settings.builder().put("node.data", false)); final String masterNode = internalCluster().startNode(Settings.builder().put("node.data", false));
final String node_1 = internalCluster().startNode(Settings.builder().put("node.master", false)); final String node_1 = internalCluster().startNode(Settings.builder().put("node.master", false));
@ -115,24 +123,30 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
assertThat(Files.exists(indexDirectory(node_3, "test")), equalTo(false)); assertThat(Files.exists(indexDirectory(node_3, "test")), equalTo(false));
logger.info("--> move shard from node_1 to node_3, and wait for relocation to finish"); logger.info("--> move shard from node_1 to node_3, and wait for relocation to finish");
SlowClusterStateProcessing disruption = null;
if (randomBoolean()) { // sometimes add cluster-state delay to trigger observers in IndicesStore.ShardActiveRequestHandler if (randomBoolean()) { // sometimes add cluster-state delay to trigger observers in IndicesStore.ShardActiveRequestHandler
disruption = new SlowClusterStateProcessing(node_3, getRandom(), 0, 0, 1000, 2000); SingleNodeDisruption disruption = new BlockClusterStateProcessing(node_3, getRandom());
internalCluster().setDisruptionScheme(disruption); internalCluster().setDisruptionScheme(disruption);
disruption.startDisrupting(); MockTransportService transportServiceNode3 = (MockTransportService) internalCluster().getInstance(TransportService.class, node_3);
} CountDownLatch beginRelocationLatch = new CountDownLatch(1);
CountDownLatch endRelocationLatch = new CountDownLatch(1);
transportServiceNode3.addTracer(new ReclocationStartEndTracer(logger, beginRelocationLatch, endRelocationLatch));
internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_3)).get(); internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_3)).get();
// wait for relocation to start
beginRelocationLatch.await();
disruption.startDisrupting();
// wait for relocation to finish
endRelocationLatch.await();
// wait a little so that cluster state observer is registered
sleep(50);
disruption.stopDisrupting();
} else {
internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_3)).get();
}
clusterHealth = client().admin().cluster().prepareHealth() clusterHealth = client().admin().cluster().prepareHealth()
.setWaitForNodes("4")
.setWaitForRelocatingShards(0) .setWaitForRelocatingShards(0)
.get(); .get();
assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.isTimedOut(), equalTo(false));
if (disruption != null) {
// we must stop the disruption here, else the delayed cluster state processing on the disrupted node
// can potentially delay registering the observer in IndicesStore.ShardActiveRequestHandler.messageReceived()
// and therefore sending the response for the shard active request for more than 10s
disruption.stopDisrupting();
}
assertThat(waitForShardDeletion(node_1, "test", 0), equalTo(false)); assertThat(waitForShardDeletion(node_1, "test", 0), equalTo(false));
assertThat(waitForIndexDeletion(node_1, "test"), equalTo(false)); assertThat(waitForIndexDeletion(node_1, "test"), equalTo(false));
@ -203,7 +217,8 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
assertThat(waitForShardDeletion(node_4, "test", 0), equalTo(false)); assertThat(waitForShardDeletion(node_4, "test", 0), equalTo(false));
} }
@Test @Slow @Test
@Slow
public void testShardActiveElseWhere() throws Exception { public void testShardActiveElseWhere() throws Exception {
List<String> nodes = internalCluster().startNodesAsync(2).get(); List<String> nodes = internalCluster().startNodesAsync(2).get();
@ -258,6 +273,7 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
.build(); .build();
} }
@Override
public boolean runOnlyOnMaster() { public boolean runOnlyOnMaster() {
return false; return false;
} }
@ -306,4 +322,45 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
}); });
return Files.exists(indexDirectory(server, index)); return Files.exists(indexDirectory(server, index));
} }
/**
* This Tracer can be used to signal start and end of a recovery.
* This is used to test the following:
* Whenever a node deletes a shard because it was relocated somewhere else, it first
* checks if enough other copies are started somewhere else. The node sends a ShardActiveRequest
* to the other nodes that should have a copy according to cluster state.
* The nodes that receive this request check if the shard is in state STARTED in which case they
* respond with "true". If they have the shard in POST_RECOVERY they register a cluster state
* observer that checks at each update if the shard has moved to STARTED.
* To test that this mechanism actually works, this can be triggered by blocking the cluster
* state processing when a recover starts and only unblocking it shortly after the node receives
* the ShardActiveRequest.
*/
static class ReclocationStartEndTracer extends MockTransportService.Tracer {
private final ESLogger logger;
private final CountDownLatch beginRelocationLatch;
private final CountDownLatch receivedShardExistsRequestLatch;
ReclocationStartEndTracer(ESLogger logger, CountDownLatch beginRelocationLatch, CountDownLatch receivedShardExistsRequestLatch) {
this.logger = logger;
this.beginRelocationLatch = beginRelocationLatch;
this.receivedShardExistsRequestLatch = receivedShardExistsRequestLatch;
}
@Override
public void receivedRequest(long requestId, String action) {
if (action.equals(IndicesStore.ACTION_SHARD_EXISTS)) {
receivedShardExistsRequestLatch.countDown();
logger.info("received: {}, relocation done", action);
}
}
@Override
public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) {
if (action.equals(RecoverySource.Actions.START_RECOVERY)) {
logger.info("sent: {}, relocation starts", action);
beginRelocationLatch.countDown();
}
}
}
} }