Fix CloseWhileRelocatingShardsIT (#38728)
This commit is contained in:
parent
bbc9aa9979
commit
51d6b9ab31
|
@ -18,8 +18,10 @@
|
|||
*/
|
||||
package org.elasticsearch.indices.state;
|
||||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
|
@ -32,13 +34,14 @@ import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocation
|
|||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.recovery.PeerRecoverySourceService;
|
||||
import org.elasticsearch.indices.recovery.StartRecoveryRequest;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.BackgroundIndexer;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.test.transport.StubbableTransport;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -57,6 +60,7 @@ import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsClosed;
|
|||
import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsOpened;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(minNumDataNodes = 2)
|
||||
public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
|
||||
|
@ -68,9 +72,11 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
|
|||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
final int maxRecoveries = Integer.MAX_VALUE;
|
||||
return Settings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), Integer.MAX_VALUE)
|
||||
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), maxRecoveries)
|
||||
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), maxRecoveries)
|
||||
.put(ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(), -1)
|
||||
.build();
|
||||
}
|
||||
|
@ -80,7 +86,6 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
|
|||
return 3;
|
||||
}
|
||||
|
||||
@TestLogging("org.elasticsearch.cluster.metadata.MetaDataIndexStateService:DEBUG,org.elasticsearch.action.admin.indices.close:DEBUG")
|
||||
public void testCloseWhileRelocatingShards() throws Exception {
|
||||
final String[] indices = new String[randomIntBetween(3, 5)];
|
||||
final Map<String, Long> docsPerIndex = new HashMap<>();
|
||||
|
@ -119,21 +124,19 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
|
|||
|
||||
final String targetNode = internalCluster().startDataOnlyNode();
|
||||
ensureClusterSizeConsistency(); // wait for the master to finish processing join.
|
||||
final MockTransportService targetTransportService =
|
||||
(MockTransportService) internalCluster().getInstance(TransportService.class, targetNode);
|
||||
|
||||
final Set<String> acknowledgedCloses = ConcurrentCollections.newConcurrentSet();
|
||||
try {
|
||||
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName());
|
||||
final ClusterState state = clusterService.state();
|
||||
final CountDownLatch latch = new CountDownLatch(indices.length);
|
||||
final CountDownLatch release = new CountDownLatch(1);
|
||||
final CountDownLatch release = new CountDownLatch(indices.length);
|
||||
|
||||
// relocate one shard for every index to be closed
|
||||
final AllocationCommands commands = new AllocationCommands();
|
||||
for (final String index : indices) {
|
||||
final NumShards numShards = getNumShards(index);
|
||||
final int shardId = numShards.numPrimaries == 1 ? 0 : randomIntBetween(0, numShards.numPrimaries - 1);
|
||||
final IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(index);
|
||||
final IndexRoutingTable indexRoutingTable = state.routingTable().index(index);
|
||||
|
||||
final ShardRouting primary = indexRoutingTable.shard(shardId).primaryShard();
|
||||
assertTrue(primary.started());
|
||||
|
@ -146,24 +149,49 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
|
|||
currentNodeId = replica.currentNodeId();
|
||||
}
|
||||
}
|
||||
commands.add(new MoveAllocationCommand(index, shardId, state.nodes().resolveNode(currentNodeId).getName(), targetNode));
|
||||
}
|
||||
|
||||
final DiscoveryNode sourceNode = clusterService.state().nodes().resolveNode(primary.currentNodeId());
|
||||
targetTransportService.addSendBehavior(internalCluster().getInstance(TransportService.class, sourceNode.getName()),
|
||||
(connection, requestId, action, request, options) -> {
|
||||
if (PeerRecoverySourceService.Actions.START_RECOVERY.equals(action)) {
|
||||
logger.debug("blocking recovery of shard {}", ((StartRecoveryRequest) request).shardId());
|
||||
latch.countDown();
|
||||
try {
|
||||
release.await();
|
||||
logger.debug("releasing recovery of shard {}", ((StartRecoveryRequest) request).shardId());
|
||||
} catch (InterruptedException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
}
|
||||
connection.sendRequest(requestId, action, request, options);
|
||||
// Build the list of shards for which recoveries will be blocked
|
||||
final Set<ShardId> blockedShards = commands.commands().stream()
|
||||
.map(c -> (MoveAllocationCommand) c)
|
||||
.map(c -> new ShardId(clusterService.state().metaData().index(c.index()).getIndex(), c.shardId()))
|
||||
.collect(Collectors.toSet());
|
||||
assertThat(blockedShards, hasSize(indices.length));
|
||||
|
||||
final Set<String> acknowledgedCloses = ConcurrentCollections.newConcurrentSet();
|
||||
final Set<String> interruptedRecoveries = ConcurrentCollections.newConcurrentSet();
|
||||
|
||||
// Create a SendRequestBehavior that will block outgoing start recovery request
|
||||
final StubbableTransport.SendRequestBehavior sendBehavior = (connection, requestId, action, request, options) -> {
|
||||
if (PeerRecoverySourceService.Actions.START_RECOVERY.equals(action)) {
|
||||
final StartRecoveryRequest startRecoveryRequest = ((StartRecoveryRequest) request);
|
||||
if (blockedShards.contains(startRecoveryRequest.shardId())) {
|
||||
logger.debug("blocking recovery of shard {}", startRecoveryRequest.shardId());
|
||||
latch.countDown();
|
||||
try {
|
||||
release.await();
|
||||
logger.debug("releasing recovery of shard {}", startRecoveryRequest.shardId());
|
||||
} catch (final InterruptedException e) {
|
||||
logger.warn(() -> new ParameterizedMessage("exception when releasing recovery of shard {}",
|
||||
startRecoveryRequest.shardId()), e);
|
||||
interruptedRecoveries.add(startRecoveryRequest.shardId().getIndexName());
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
}
|
||||
);
|
||||
commands.add(new MoveAllocationCommand(index, shardId, currentNodeId, targetNode));
|
||||
}
|
||||
}
|
||||
connection.sendRequest(requestId, action, request, options);
|
||||
};
|
||||
|
||||
final MockTransportService targetTransportService =
|
||||
(MockTransportService) internalCluster().getInstance(TransportService.class, targetNode);
|
||||
|
||||
for (DiscoveryNode node : state.getNodes()) {
|
||||
if (node.isDataNode() && node.getName().equals(targetNode) == false) {
|
||||
final TransportService sourceTransportService = internalCluster().getInstance(TransportService.class, node.getName());
|
||||
targetTransportService.addSendBehavior(sourceTransportService, sendBehavior);
|
||||
}
|
||||
}
|
||||
|
||||
assertAcked(client().admin().cluster().reroute(new ClusterRerouteRequest().commands(commands)).get());
|
||||
|
@ -222,12 +250,15 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
|
|||
|
||||
targetTransportService.clearAllRules();
|
||||
|
||||
// If a shard recovery has been interrupted, we expect its index to be closed
|
||||
interruptedRecoveries.forEach(CloseIndexIT::assertIndexIsClosed);
|
||||
|
||||
assertThat("Consider that the test failed if no indices were successfully closed", acknowledgedCloses.size(), greaterThan(0));
|
||||
assertAcked(client().admin().indices().prepareOpen("index-*"));
|
||||
ensureGreen(indices);
|
||||
|
||||
for (String index : acknowledgedCloses) {
|
||||
long docsCount = client().prepareSearch(index).setSize(0).get().getHits().getTotalHits().value;
|
||||
long docsCount = client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get().getHits().getTotalHits().value;
|
||||
assertEquals("Expected " + docsPerIndex.get(index) + " docs in index " + index + " but got " + docsCount
|
||||
+ " (close acknowledged=" + acknowledgedCloses.contains(index) + ")", (long) docsPerIndex.get(index), docsCount);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue