[TEST] Fix race condition when blocking cluster state processing during primary relocation
Two tests were periodically failing. What both tests are doing is starting a relocation of a shard from one node to another. Once the recovery process is started, the test blocks cluster state processing on the relocation target using the BlockClusterStateProcessing disruption. The test then indefinitely waits for the relocation to complete. The stack dump shows that the relocation is stuck in the PeerRecoveryTargetService.waitForClusterState method, waiting for the relocation target node to have at least the same cluster state version as the relocation source. The reason why it gets stuck is the following race: 1) The test code executes a reroute command that relocates a shard from one node to another 2) Relocation target node starts applying the clusterstate with relocation info, starting the recovery process. 4) Recovery is super fast and quickly goes to the waitForClusterState method, which wants to ensure that the cluster state that is applied on the relocation target is at least as new as the one on the relocation source. The relocation source has already applied the cluster state but the relocation target is still in the process of applying it. The waitForClusterState method thus uses a ClusterObserver to wait for the next cluster state. Internally this means submitting a task with priority HIGH to the cluster service. 5) Cluster state application is about to finish on the relocation target. As one of the last steps, it acks to the master which makes the reroute command return successfully. 6) The test code then blocks cluster state processing on the relocation target by submitting a cluster state update task (with priority IMMEDIATE) that blocks execution. If the task that is submitted in step 6 is handled before the one in step 4 by ClusterService's thread executor, cluster state processing becomes blocked and prevents the cluster state observer from observing the applied cluster state.
This commit is contained in:
parent
524d7f592d
commit
c7edaba4e8
|
@ -40,7 +40,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
|
|||
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Priority;
|
||||
|
@ -67,7 +66,6 @@ import org.elasticsearch.test.ESIntegTestCase.Scope;
|
|||
import org.elasticsearch.test.InternalTestCluster;
|
||||
import org.elasticsearch.test.discovery.ClusterDiscoveryConfiguration;
|
||||
import org.elasticsearch.test.discovery.TestZenDiscovery;
|
||||
import org.elasticsearch.test.disruption.BlockClusterStateProcessing;
|
||||
import org.elasticsearch.test.disruption.IntermittentLongGCDisruption;
|
||||
import org.elasticsearch.test.disruption.LongGCDisruption;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption;
|
||||
|
@ -1127,20 +1125,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|||
.setSource("{\"int_field\":1}", XContentType.JSON));
|
||||
}
|
||||
indexRandom(true, indexRequestBuilderList);
|
||||
SingleNodeDisruption disruption = new BlockClusterStateProcessing(node_2, random());
|
||||
|
||||
internalCluster().setDisruptionScheme(disruption);
|
||||
MockTransportService transportServiceNode2 = (MockTransportService) internalCluster().getInstance(TransportService.class, node_2);
|
||||
CountDownLatch beginRelocationLatch = new CountDownLatch(1);
|
||||
CountDownLatch endRelocationLatch = new CountDownLatch(1);
|
||||
transportServiceNode2.addTracer(new IndicesStoreIntegrationIT.ReclocationStartEndTracer(logger, beginRelocationLatch,
|
||||
endRelocationLatch));
|
||||
internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, node_1, node_2)).get();
|
||||
// wait for relocation to start
|
||||
beginRelocationLatch.await();
|
||||
disruption.startDisrupting();
|
||||
// wait for relocation to finish
|
||||
endRelocationLatch.await();
|
||||
IndicesStoreIntegrationIT.relocateAndBlockCompletion(logger, "test", 0, node_1, node_2);
|
||||
// now search for the documents and see if we get a reply
|
||||
assertThat(client().prepareSearch().setSize(0).get().getHits().getTotalHits(), equalTo(100L));
|
||||
}
|
||||
|
|
|
@ -52,7 +52,6 @@ import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
|||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
import org.elasticsearch.test.disruption.BlockClusterStateProcessing;
|
||||
import org.elasticsearch.test.disruption.SingleNodeDisruption;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
|
@ -134,21 +133,7 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
|
|||
logger.info("--> move shard from node_1 to node_3, and wait for relocation to finish");
|
||||
|
||||
if (randomBoolean()) { // sometimes add cluster-state delay to trigger observers in IndicesStore.ShardActiveRequestHandler
|
||||
SingleNodeDisruption disruption = new BlockClusterStateProcessing(node_3, random());
|
||||
internalCluster().setDisruptionScheme(disruption);
|
||||
MockTransportService transportServiceNode3 = (MockTransportService) internalCluster().getInstance(TransportService.class, node_3);
|
||||
CountDownLatch beginRelocationLatch = new CountDownLatch(1);
|
||||
CountDownLatch endRelocationLatch = new CountDownLatch(1);
|
||||
transportServiceNode3.addTracer(new ReclocationStartEndTracer(logger, beginRelocationLatch, endRelocationLatch));
|
||||
internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, node_1, node_3)).get();
|
||||
// wait for relocation to start
|
||||
logger.info("--> waiting for relocation to start");
|
||||
beginRelocationLatch.await();
|
||||
logger.info("--> starting disruption");
|
||||
disruption.startDisrupting();
|
||||
// wait for relocation to finish
|
||||
logger.info("--> waiting for relocation to finish");
|
||||
endRelocationLatch.await();
|
||||
BlockClusterStateProcessing disruption = relocateAndBlockCompletion(logger, "test", 0, node_1, node_3);
|
||||
// wait a little so that cluster state observer is registered
|
||||
sleep(50);
|
||||
logger.info("--> stopping disruption");
|
||||
|
@ -170,6 +155,56 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* relocate a shard and block cluster state processing on the relocation target node to activate the shard
|
||||
*/
|
||||
public static BlockClusterStateProcessing relocateAndBlockCompletion(Logger logger, String index, int shard, String nodeFrom,
|
||||
String nodeTo) throws InterruptedException {
|
||||
BlockClusterStateProcessing disruption = new BlockClusterStateProcessing(nodeTo, random());
|
||||
internalCluster().setDisruptionScheme(disruption);
|
||||
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeTo);
|
||||
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nodeTo);
|
||||
CountDownLatch beginRelocationLatch = new CountDownLatch(1);
|
||||
CountDownLatch receivedShardExistsRequestLatch = new CountDownLatch(1);
|
||||
// use a tracer on the target node to track relocation start and end
|
||||
transportService.addTracer(new MockTransportService.Tracer() {
|
||||
@Override
|
||||
public void receivedRequest(long requestId, String action) {
|
||||
if (action.equals(PeerRecoveryTargetService.Actions.FILES_INFO)) {
|
||||
logger.info("received: {}, relocation starts", action);
|
||||
beginRelocationLatch.countDown();
|
||||
} else if (action.equals(IndicesStore.ACTION_SHARD_EXISTS)) {
|
||||
// Whenever a node deletes a shard because it was relocated somewhere else, it first
|
||||
// checks if enough other copies are started somewhere else. The node sends a ShardActiveRequest
|
||||
// to the other nodes that should have a copy according to cluster state.
|
||||
receivedShardExistsRequestLatch.countDown();
|
||||
logger.info("received: {}, relocation done", action);
|
||||
} else if (action.equals(PeerRecoveryTargetService.Actions.WAIT_CLUSTERSTATE)) {
|
||||
logger.info("received: {}, waiting on cluster state", action);
|
||||
// ensure that relocation target node is on the same cluster state as relocation source before proceeding with
|
||||
// this request. If the target does not have the relocating cluster state exposed through ClusterService.state(),
|
||||
// then waitForClusterState will have to register a ClusterObserver with the ClusterService, which can cause
|
||||
// a race with the BlockClusterStateProcessing block that is added below.
|
||||
try {
|
||||
assertBusy(() -> assertTrue(
|
||||
clusterService.state().routingTable().index(index).shard(shard).primaryShard().relocating()));
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(index, shard, nodeFrom, nodeTo)).get();
|
||||
logger.info("--> waiting for relocation to start");
|
||||
beginRelocationLatch.await();
|
||||
logger.info("--> starting disruption");
|
||||
disruption.startDisrupting();
|
||||
logger.info("--> waiting for relocation to finish");
|
||||
receivedShardExistsRequestLatch.await();
|
||||
logger.info("--> relocation completed (but cluster state processing block still in place)");
|
||||
return disruption;
|
||||
}
|
||||
|
||||
/* Test that shard is deleted in case ShardActiveRequest after relocation and next incoming cluster state is an index delete. */
|
||||
public void testShardCleanupIfShardDeletionAfterRelocationFailedAndIndexDeleted() throws Exception {
|
||||
final String node_1 = internalCluster().startNode();
|
||||
|
@ -449,40 +484,4 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
|
|||
awaitBusy(() -> !Files.exists(indexDirectory(server, index)));
|
||||
return Files.exists(indexDirectory(server, index));
|
||||
}
|
||||
|
||||
/**
|
||||
* This Tracer can be used to signal start and end of a recovery.
|
||||
* This is used to test the following:
|
||||
* Whenever a node deletes a shard because it was relocated somewhere else, it first
|
||||
* checks if enough other copies are started somewhere else. The node sends a ShardActiveRequest
|
||||
* to the other nodes that should have a copy according to cluster state.
|
||||
* The nodes that receive this request check if the shard is in state STARTED in which case they
|
||||
* respond with "true". If they have the shard in POST_RECOVERY they register a cluster state
|
||||
* observer that checks at each update if the shard has moved to STARTED.
|
||||
* To test that this mechanism actually works, this can be triggered by blocking the cluster
|
||||
* state processing when a recover starts and only unblocking it shortly after the node receives
|
||||
* the ShardActiveRequest.
|
||||
*/
|
||||
public static class ReclocationStartEndTracer extends MockTransportService.Tracer {
|
||||
private final Logger logger;
|
||||
private final CountDownLatch beginRelocationLatch;
|
||||
private final CountDownLatch receivedShardExistsRequestLatch;
|
||||
|
||||
public ReclocationStartEndTracer(Logger logger, CountDownLatch beginRelocationLatch, CountDownLatch receivedShardExistsRequestLatch) {
|
||||
this.logger = logger;
|
||||
this.beginRelocationLatch = beginRelocationLatch;
|
||||
this.receivedShardExistsRequestLatch = receivedShardExistsRequestLatch;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void receivedRequest(long requestId, String action) {
|
||||
if (action.equals(PeerRecoveryTargetService.Actions.FILES_INFO)) {
|
||||
logger.info("received: {}, relocation starts", action);
|
||||
beginRelocationLatch.countDown();
|
||||
} else if (action.equals(IndicesStore.ACTION_SHARD_EXISTS)) {
|
||||
receivedShardExistsRequestLatch.countDown();
|
||||
logger.info("received: {}, relocation done", action);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue