diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java index 165360c3597..555bf243354 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java @@ -18,29 +18,40 @@ */ package org.elasticsearch.indices.state; -import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.Rebalance; +import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.indices.recovery.PeerRecoverySourceService; +import org.elasticsearch.indices.recovery.StartRecoveryRequest; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportService; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; +import java.util.stream.IntStream; -import static org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING; -import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING; -import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING; +import static java.util.Collections.singletonList; import static org.elasticsearch.indices.state.CloseIndexIT.assertException; import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsClosed; import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsOpened; @@ -50,36 +61,52 @@ import static org.hamcrest.Matchers.greaterThan; @ESIntegTestCase.ClusterScope(minNumDataNodes = 2) public class CloseWhileRelocatingShardsIT extends ESIntegTestCase { + @Override + protected Collection> nodePlugins() { + return singletonList(MockTransportService.TestPlugin.class); + } + @Override protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) - .put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 10) - .put(CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(), -1) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), Integer.MAX_VALUE) + .put(ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(), -1) .build(); } @Override - protected int numberOfReplicas() { - return 1; + protected int maximumNumberOfShards() { + return 3; } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37274") + @TestLogging("org.elasticsearch.cluster.metadata.MetaDataIndexStateService:DEBUG,org.elasticsearch.action.admin.indices.close:DEBUG") public void testCloseWhileRelocatingShards() throws Exception { - final String[] indices = new String[randomIntBetween(1, 3)]; + final String[] indices = new String[randomIntBetween(3, 5)]; final Map docsPerIndex = new HashMap<>(); + final Map indexers = new HashMap<>(); for (int i = 0; i < indices.length; i++) { - final String indexName = "index-" + i; - createIndex(indexName); - + final String indexName = "index-" + i; int nbDocs = 0; - if (randomBoolean()) { - nbDocs = randomIntBetween(1, 20); - for (int j = 0; j < nbDocs; j++) { - IndexResponse indexResponse = client().prepareIndex(indexName, "_doc").setSource("num", j).get(); - assertEquals(RestStatus.CREATED, indexResponse.status()); - } + switch (i) { + case 0: + logger.debug("creating empty index {}", indexName); + createIndex(indexName); + break; + case 1: + nbDocs = scaledRandomIntBetween(1, 100); + logger.debug("creating index {} with {} documents", indexName, nbDocs); + createIndex(indexName); + indexRandom(randomBoolean(), IntStream.range(0, nbDocs) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)) + .collect(Collectors.toList())); + break; + default: + logger.debug("creating index {} with background indexing", indexName); + final BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client(), -1, 1); + indexers.put(indexName, indexer); + waitForDocs(1, indexer); } docsPerIndex.put(indexName, (long) nbDocs); indices[i] = indexName; @@ -88,60 +115,72 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase { ensureGreen(indices); assertAcked(client().admin().cluster().prepareUpdateSettings() .setTransientSettings(Settings.builder() - .put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE.toString()))); + .put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), Rebalance.NONE.toString()))); - // start some concurrent indexing threads - final Map indexers = new HashMap<>(); - for (final String index : indices) { - if (randomBoolean()) { - final BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), -1, scaledRandomIntBetween(1, 3)); - waitForDocs(1, indexer); - indexers.put(index, indexer); - } - } + final String targetNode = internalCluster().startDataOnlyNode(); + ensureClusterSizeConsistency(); // wait for the master to finish processing join. final Set acknowledgedCloses = ConcurrentCollections.newConcurrentSet(); - final String newNode = internalCluster().startDataOnlyNode(); try { - final CountDownLatch latch = new CountDownLatch(1); - final List threads = new ArrayList<>(); + final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName()); + final CountDownLatch latch = new CountDownLatch(indices.length); + final CountDownLatch release = new CountDownLatch(1); - // start shards relocating threads - final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - for (final String indexToRelocate : indices) { - final IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(indexToRelocate); - for (int i = 0; i < getNumShards(indexToRelocate).numPrimaries; i++) { - final int shardId = i; - ShardRouting primary = indexRoutingTable.shard(shardId).primaryShard(); - assertTrue(primary.started()); - ShardRouting replica = indexRoutingTable.shard(shardId).replicaShards().iterator().next(); + // relocate one shard for every index to be closed + final AllocationCommands commands = new AllocationCommands(); + for (final String index : indices) { + final NumShards numShards = getNumShards(index); + final int shardId = numShards.numPrimaries == 1 ? 0 : randomIntBetween(0, numShards.numPrimaries - 1); + final IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(index); + + final ShardRouting primary = indexRoutingTable.shard(shardId).primaryShard(); + assertTrue(primary.started()); + + String currentNodeId = primary.currentNodeId(); + if (numShards.numReplicas > 0) { + final ShardRouting replica = indexRoutingTable.shard(shardId).replicaShards().iterator().next(); assertTrue(replica.started()); - - final String currentNodeId = randomBoolean() ? primary.currentNodeId() : replica.currentNodeId(); - assertNotNull(currentNodeId); - - final Thread thread = new Thread(() -> { - try { - latch.await(); - } catch (InterruptedException e) { - throw new AssertionError(e); - } - assertAcked(client().admin().cluster().prepareReroute() - .add(new MoveAllocationCommand(indexToRelocate, shardId, currentNodeId, newNode))); - }); - threads.add(thread); - thread.start(); + if (randomBoolean()) { + currentNodeId = replica.currentNodeId(); + } } + + final DiscoveryNode sourceNode = clusterService.state().nodes().resolveNode(primary.currentNodeId()); + ((MockTransportService) internalCluster().getInstance(TransportService.class, targetNode)) + .addSendBehavior(internalCluster().getInstance(TransportService.class, sourceNode.getName()), + (connection, requestId, action, request, options) -> { + if (PeerRecoverySourceService.Actions.START_RECOVERY.equals(action)) { + logger.debug("blocking recovery of shard {}", ((StartRecoveryRequest) request).shardId()); + latch.countDown(); + try { + release.await(); + logger.debug("releasing recovery of shard {}", ((StartRecoveryRequest) request).shardId()); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + connection.sendRequest(requestId, action, request, options); + } + ); + commands.add(new MoveAllocationCommand(index, shardId, currentNodeId, targetNode)); } + assertAcked(client().admin().cluster().reroute(new ClusterRerouteRequest().commands(commands)).get()); + // start index closing threads + final List threads = new ArrayList<>(); for (final String indexToClose : indices) { final Thread thread = new Thread(() -> { try { latch.await(); } catch (InterruptedException e) { throw new AssertionError(e); + } finally { + release.countDown(); } + // Closing is not always acknowledged when shards are relocating: this is the case when the target shard is initializing + // or is catching up operations. In these cases the TransportVerifyShardBeforeCloseAction will detect that the global + // and max sequence number don't match and will not ack the close. AcknowledgedResponse closeResponse = client().admin().indices().prepareClose(indexToClose).get(); if (closeResponse.isAcknowledged()) { assertTrue("Index closing should not be acknowledged twice", acknowledgedCloses.add(indexToClose)); @@ -155,6 +194,7 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase { for (Thread thread : threads) { thread.join(); } + for (Map.Entry entry : indexers.entrySet()) { final BackgroundIndexer indexer = entry.getValue(); indexer.setAssertNoFailuresOnStop(false); @@ -172,7 +212,8 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase { } } finally { assertAcked(client().admin().cluster().prepareUpdateSettings() - .setTransientSettings(Settings.builder().putNull(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey()))); + .setTransientSettings(Settings.builder() + .putNull(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey()))); } for (String index : indices) {