Merge pull request #12386 from brwe/shard-active-test

[Test] make cluster state blocking more reliable in IndicesStoreIntegrationTests.indexCleanup()
This commit is contained in:
Britta Weber 2015-07-23 11:26:47 +02:00
commit 382daf26e4
1 changed files with 73 additions and 16 deletions

View File

@ -20,27 +20,33 @@
package org.elasticsearch.indices.store;
import com.google.common.base.Predicate;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.recovery.RecoverySource;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.disruption.SlowClusterStateProcessing;
import org.elasticsearch.test.disruption.BlockClusterStateProcessing;
import org.elasticsearch.test.disruption.SingleNodeDisruption;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportModule;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.junit.Test;
import java.io.IOException;
@ -48,9 +54,10 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static java.lang.Thread.sleep;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ -69,6 +76,7 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
// which is between 1 and 2 sec can cause each of the shard deletion requests to timeout.
// to prevent this we are setting the timeout here to something highish ie. the default in practice
.put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT, new TimeValue(30, TimeUnit.SECONDS))
.put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName())
.build();
}
@ -79,7 +87,7 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
}
@Test
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/11989")
@Slow
public void indexCleanup() throws Exception {
final String masterNode = internalCluster().startNode(Settings.builder().put("node.data", false));
final String node_1 = internalCluster().startNode(Settings.builder().put("node.master", false));
@ -115,24 +123,30 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
assertThat(Files.exists(indexDirectory(node_3, "test")), equalTo(false));
logger.info("--> move shard from node_1 to node_3, and wait for relocation to finish");
SlowClusterStateProcessing disruption = null;
if (randomBoolean()) { // sometimes add cluster-state delay to trigger observers in IndicesStore.ShardActiveRequestHandler
disruption = new SlowClusterStateProcessing(node_3, getRandom(), 0, 0, 1000, 2000);
SingleNodeDisruption disruption = new BlockClusterStateProcessing(node_3, getRandom());
internalCluster().setDisruptionScheme(disruption);
disruption.startDisrupting();
}
MockTransportService transportServiceNode3 = (MockTransportService) internalCluster().getInstance(TransportService.class, node_3);
CountDownLatch beginRelocationLatch = new CountDownLatch(1);
CountDownLatch endRelocationLatch = new CountDownLatch(1);
transportServiceNode3.addTracer(new ReclocationStartEndTracer(logger, beginRelocationLatch, endRelocationLatch));
internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_3)).get();
// wait for relocation to start
beginRelocationLatch.await();
disruption.startDisrupting();
// wait for relocation to finish
endRelocationLatch.await();
// wait a little so that cluster state observer is registered
sleep(50);
disruption.stopDisrupting();
} else {
internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_3)).get();
}
clusterHealth = client().admin().cluster().prepareHealth()
.setWaitForNodes("4")
.setWaitForRelocatingShards(0)
.get();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
if (disruption != null) {
// we must stop the disruption here, else the delayed cluster state processing on the disrupted node
// can potentially delay registering the observer in IndicesStore.ShardActiveRequestHandler.messageReceived()
// and therefore sending the response for the shard active request for more than 10s
disruption.stopDisrupting();
}
assertThat(waitForShardDeletion(node_1, "test", 0), equalTo(false));
assertThat(waitForIndexDeletion(node_1, "test"), equalTo(false));
@ -203,7 +217,8 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
assertThat(waitForShardDeletion(node_4, "test", 0), equalTo(false));
}
@Test @Slow
@Test
@Slow
public void testShardActiveElseWhere() throws Exception {
List<String> nodes = internalCluster().startNodesAsync(2).get();
@ -258,6 +273,7 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
.build();
}
@Override
public boolean runOnlyOnMaster() {
return false;
}
@ -306,4 +322,45 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
});
return Files.exists(indexDirectory(server, index));
}
/**
* This Tracer can be used to signal start and end of a recovery.
* This is used to test the following:
* Whenever a node deletes a shard because it was relocated somewhere else, it first
* checks if enough other copies are started somewhere else. The node sends a ShardActiveRequest
* to the other nodes that should have a copy according to cluster state.
* The nodes that receive this request check if the shard is in state STARTED in which case they
* respond with "true". If they have the shard in POST_RECOVERY they register a cluster state
* observer that checks at each update if the shard has moved to STARTED.
* To test that this mechanism actually works, this can be triggered by blocking the cluster
* state processing when a recover starts and only unblocking it shortly after the node receives
* the ShardActiveRequest.
*/
static class ReclocationStartEndTracer extends MockTransportService.Tracer {
private final ESLogger logger;
private final CountDownLatch beginRelocationLatch;
private final CountDownLatch receivedShardExistsRequestLatch;
ReclocationStartEndTracer(ESLogger logger, CountDownLatch beginRelocationLatch, CountDownLatch receivedShardExistsRequestLatch) {
this.logger = logger;
this.beginRelocationLatch = beginRelocationLatch;
this.receivedShardExistsRequestLatch = receivedShardExistsRequestLatch;
}
@Override
public void receivedRequest(long requestId, String action) {
if (action.equals(IndicesStore.ACTION_SHARD_EXISTS)) {
receivedShardExistsRequestLatch.countDown();
logger.info("received: {}, relocation done", action);
}
}
@Override
public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) {
if (action.equals(RecoverySource.Actions.START_RECOVERY)) {
logger.info("sent: {}, relocation starts", action);
beginRelocationLatch.countDown();
}
}
}
}