Improve CloseWhileRelocatingShardsIT (#37348)

This commit is contained in:
Tanguy Leroux 2019-01-14 13:14:36 +01:00 committed by GitHub
parent 6ca076bf74
commit 07dc8c7eee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 100 additions and 59 deletions

View File

@ -18,29 +18,40 @@
*/
package org.elasticsearch.indices.state;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.Rebalance;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.indices.recovery.PeerRecoverySourceService;
import org.elasticsearch.indices.recovery.StartRecoveryRequest;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING;
import static java.util.Collections.singletonList;
import static org.elasticsearch.indices.state.CloseIndexIT.assertException;
import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsClosed;
import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsOpened;
@ -50,36 +61,52 @@ import static org.hamcrest.Matchers.greaterThan;
@ESIntegTestCase.ClusterScope(minNumDataNodes = 2)
public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return singletonList(MockTransportService.TestPlugin.class);
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 10)
.put(CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(), -1)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), Integer.MAX_VALUE)
.put(ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(), -1)
.build();
}
@Override
protected int numberOfReplicas() {
return 1;
protected int maximumNumberOfShards() {
return 3;
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37274")
@TestLogging("org.elasticsearch.cluster.metadata.MetaDataIndexStateService:DEBUG,org.elasticsearch.action.admin.indices.close:DEBUG")
public void testCloseWhileRelocatingShards() throws Exception {
final String[] indices = new String[randomIntBetween(1, 3)];
final String[] indices = new String[randomIntBetween(3, 5)];
final Map<String, Long> docsPerIndex = new HashMap<>();
final Map<String, BackgroundIndexer> indexers = new HashMap<>();
for (int i = 0; i < indices.length; i++) {
final String indexName = "index-" + i;
createIndex(indexName);
final String indexName = "index-" + i;
int nbDocs = 0;
if (randomBoolean()) {
nbDocs = randomIntBetween(1, 20);
for (int j = 0; j < nbDocs; j++) {
IndexResponse indexResponse = client().prepareIndex(indexName, "_doc").setSource("num", j).get();
assertEquals(RestStatus.CREATED, indexResponse.status());
}
switch (i) {
case 0:
logger.debug("creating empty index {}", indexName);
createIndex(indexName);
break;
case 1:
nbDocs = scaledRandomIntBetween(1, 100);
logger.debug("creating index {} with {} documents", indexName, nbDocs);
createIndex(indexName);
indexRandom(randomBoolean(), IntStream.range(0, nbDocs)
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n))
.collect(Collectors.toList()));
break;
default:
logger.debug("creating index {} with background indexing", indexName);
final BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client(), -1, 1);
indexers.put(indexName, indexer);
waitForDocs(1, indexer);
}
docsPerIndex.put(indexName, (long) nbDocs);
indices[i] = indexName;
@ -88,60 +115,72 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
ensureGreen(indices);
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE.toString())));
.put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), Rebalance.NONE.toString())));
// start some concurrent indexing threads
final Map<String, BackgroundIndexer> indexers = new HashMap<>();
for (final String index : indices) {
if (randomBoolean()) {
final BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), -1, scaledRandomIntBetween(1, 3));
waitForDocs(1, indexer);
indexers.put(index, indexer);
}
}
final String targetNode = internalCluster().startDataOnlyNode();
ensureClusterSizeConsistency(); // wait for the master to finish processing join.
final Set<String> acknowledgedCloses = ConcurrentCollections.newConcurrentSet();
final String newNode = internalCluster().startDataOnlyNode();
try {
final CountDownLatch latch = new CountDownLatch(1);
final List<Thread> threads = new ArrayList<>();
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName());
final CountDownLatch latch = new CountDownLatch(indices.length);
final CountDownLatch release = new CountDownLatch(1);
// start shards relocating threads
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
for (final String indexToRelocate : indices) {
final IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(indexToRelocate);
for (int i = 0; i < getNumShards(indexToRelocate).numPrimaries; i++) {
final int shardId = i;
ShardRouting primary = indexRoutingTable.shard(shardId).primaryShard();
assertTrue(primary.started());
ShardRouting replica = indexRoutingTable.shard(shardId).replicaShards().iterator().next();
// relocate one shard for every index to be closed
final AllocationCommands commands = new AllocationCommands();
for (final String index : indices) {
final NumShards numShards = getNumShards(index);
final int shardId = numShards.numPrimaries == 1 ? 0 : randomIntBetween(0, numShards.numPrimaries - 1);
final IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(index);
final ShardRouting primary = indexRoutingTable.shard(shardId).primaryShard();
assertTrue(primary.started());
String currentNodeId = primary.currentNodeId();
if (numShards.numReplicas > 0) {
final ShardRouting replica = indexRoutingTable.shard(shardId).replicaShards().iterator().next();
assertTrue(replica.started());
final String currentNodeId = randomBoolean() ? primary.currentNodeId() : replica.currentNodeId();
assertNotNull(currentNodeId);
final Thread thread = new Thread(() -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
assertAcked(client().admin().cluster().prepareReroute()
.add(new MoveAllocationCommand(indexToRelocate, shardId, currentNodeId, newNode)));
});
threads.add(thread);
thread.start();
if (randomBoolean()) {
currentNodeId = replica.currentNodeId();
}
}
final DiscoveryNode sourceNode = clusterService.state().nodes().resolveNode(primary.currentNodeId());
((MockTransportService) internalCluster().getInstance(TransportService.class, targetNode))
.addSendBehavior(internalCluster().getInstance(TransportService.class, sourceNode.getName()),
(connection, requestId, action, request, options) -> {
if (PeerRecoverySourceService.Actions.START_RECOVERY.equals(action)) {
logger.debug("blocking recovery of shard {}", ((StartRecoveryRequest) request).shardId());
latch.countDown();
try {
release.await();
logger.debug("releasing recovery of shard {}", ((StartRecoveryRequest) request).shardId());
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
connection.sendRequest(requestId, action, request, options);
}
);
commands.add(new MoveAllocationCommand(index, shardId, currentNodeId, targetNode));
}
assertAcked(client().admin().cluster().reroute(new ClusterRerouteRequest().commands(commands)).get());
// start index closing threads
final List<Thread> threads = new ArrayList<>();
for (final String indexToClose : indices) {
final Thread thread = new Thread(() -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
} finally {
release.countDown();
}
// Closing is not always acknowledged when shards are relocating: this is the case when the target shard is initializing
// or is catching up operations. In these cases the TransportVerifyShardBeforeCloseAction will detect that the global
// and max sequence number don't match and will not ack the close.
AcknowledgedResponse closeResponse = client().admin().indices().prepareClose(indexToClose).get();
if (closeResponse.isAcknowledged()) {
assertTrue("Index closing should not be acknowledged twice", acknowledgedCloses.add(indexToClose));
@ -155,6 +194,7 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
for (Thread thread : threads) {
thread.join();
}
for (Map.Entry<String, BackgroundIndexer> entry : indexers.entrySet()) {
final BackgroundIndexer indexer = entry.getValue();
indexer.setAssertNoFailuresOnStop(false);
@ -172,7 +212,8 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
}
} finally {
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder().putNull(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey())));
.setTransientSettings(Settings.builder()
.putNull(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey())));
}
for (String index : indices) {