Increase timeout for GlobalCheckpointSyncIT (#57567)

The test failed when it was running with 4 replicas and 3 indexing 
threads. The recovering replicas can prevent the global checkpoint from
advancing. This commit increases the timeout to 60 seconds for this
suite and the check for no inflight requests.

Closes #57204
This commit is contained in:
Nhat Nguyen 2020-06-03 08:25:01 -04:00
parent e2c0c4197f
commit 5097071230
2 changed files with 20 additions and 28 deletions

View File

@ -19,10 +19,6 @@
package org.elasticsearch.index.seqno;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -43,7 +39,6 @@ import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
@ -200,29 +195,19 @@ public class GlobalCheckpointSyncIT extends ESIntegTestCase {
afterIndexing.accept(client());
assertBusy(() -> {
final IndicesStatsResponse stats = client().admin().indices().prepareStats().clear().get();
final IndexStats indexStats = stats.getIndex("test");
for (final IndexShardStats indexShardStats : indexStats.getIndexShards().values()) {
Optional<ShardStats> maybePrimary =
Stream.of(indexShardStats.getShards())
.filter(s -> s.getShardRouting().active() && s.getShardRouting().primary())
.findFirst();
if (!maybePrimary.isPresent()) {
continue;
}
final ShardStats primary = maybePrimary.get();
final SeqNoStats primarySeqNoStats = primary.getSeqNoStats();
for (final ShardStats shardStats : indexShardStats) {
final SeqNoStats seqNoStats = shardStats.getSeqNoStats();
if (seqNoStats == null) {
// the shard is initializing
continue;
}
assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint()));
for (IndicesService indicesService : internalCluster().getDataNodeInstances(IndicesService.class)) {
for (IndexService indexService : indicesService) {
for (IndexShard shard : indexService) {
if (shard.routingEntry().primary()) {
final SeqNoStats seqNoStats = shard.seqNoStats();
assertThat("shard " + shard.routingEntry() + " seq_no [" + seqNoStats + "]",
seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
}
}
}, 30, TimeUnit.SECONDS);
}
}
}, 60, TimeUnit.SECONDS);
ensureGreen("test");
for (final Thread thread : threads) {
thread.join();
}

View File

@ -107,6 +107,7 @@ import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.MockTransportClient;
@ -2492,14 +2493,20 @@ public final class InternalTestCluster extends TestCluster {
for (NodeAndClient nodeAndClient : nodes.values()) {
CircuitBreaker inFlightRequestsBreaker = getInstance(CircuitBreakerService.class, nodeAndClient.name)
.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
TaskManager taskManager = getInstance(TransportService.class, nodeAndClient.name).getTaskManager();
try {
// see #ensureEstimatedStats()
assertBusy(() -> {
// ensure that our size accounting on transport level is reset properly
long bytesUsed = inFlightRequestsBreaker.getUsed();
assertThat("All incoming requests on node [" + nodeAndClient.name + "] should have finished. Expected 0 but got " +
bytesUsed, bytesUsed, equalTo(0L));
});
if (bytesUsed != 0) {
String pendingTasks = taskManager.getTasks().values().stream()
.map(t -> t.taskInfo(nodeAndClient.name, true).toString())
.collect(Collectors.joining(",", "[", "]"));
throw new AssertionError("All incoming requests on node [" + nodeAndClient.name + "] should have finished. " +
"Expected 0 but got " + bytesUsed + "; pending tasks [" + pendingTasks + "]");
}
}, 1, TimeUnit.MINUTES);
} catch (Exception e) {
logger.error("Could not assert finished requests within timeout", e);
fail("Could not assert finished requests within timeout on node [" + nodeAndClient.name + "]");