From 5097071230e078304a35e591ec73584669bc529c Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 3 Jun 2020 08:25:01 -0400 Subject: [PATCH] Increase timeout for GlobalCheckpointSyncIT (#57567) The test failed when it was running with 4 replicas and 3 indexing threads. The recovering replicas can prevent the global checkpoint from advancing. This commit increases the timeout to 60 seconds for this suite and the check for no inflight requests. Closes #57204 --- .../index/seqno/GlobalCheckpointSyncIT.java | 35 ++++++------------- .../test/InternalTestCluster.java | 13 +++++-- 2 files changed, 20 insertions(+), 28 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncIT.java index d9cb3dc1553..235bb04d3af 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncIT.java @@ -19,10 +19,6 @@ package org.elasticsearch.index.seqno; -import org.elasticsearch.action.admin.indices.stats.IndexShardStats; -import org.elasticsearch.action.admin.indices.stats.IndexStats; -import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; -import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -43,7 +39,6 @@ import org.elasticsearch.transport.TransportService; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Optional; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; @@ -200,29 +195,19 @@ public class GlobalCheckpointSyncIT extends ESIntegTestCase { afterIndexing.accept(client()); assertBusy(() -> { - final IndicesStatsResponse stats = client().admin().indices().prepareStats().clear().get(); - final IndexStats indexStats = stats.getIndex("test"); - for (final IndexShardStats indexShardStats : indexStats.getIndexShards().values()) { - Optional maybePrimary = - Stream.of(indexShardStats.getShards()) - .filter(s -> s.getShardRouting().active() && s.getShardRouting().primary()) - .findFirst(); - if (!maybePrimary.isPresent()) { - continue; - } - final ShardStats primary = maybePrimary.get(); - final SeqNoStats primarySeqNoStats = primary.getSeqNoStats(); - for (final ShardStats shardStats : indexShardStats) { - final SeqNoStats seqNoStats = shardStats.getSeqNoStats(); - if (seqNoStats == null) { - // the shard is initializing - continue; + for (IndicesService indicesService : internalCluster().getDataNodeInstances(IndicesService.class)) { + for (IndexService indexService : indicesService) { + for (IndexShard shard : indexService) { + if (shard.routingEntry().primary()) { + final SeqNoStats seqNoStats = shard.seqNoStats(); + assertThat("shard " + shard.routingEntry() + " seq_no [" + seqNoStats + "]", + seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo())); + } } - assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint())); } } - }, 30, TimeUnit.SECONDS); - + }, 60, TimeUnit.SECONDS); + ensureGreen("test"); for (final Thread thread : threads) { thread.join(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index c1d08ae8b8f..e8a26a05e23 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -107,6 +107,7 @@ import org.elasticsearch.node.NodeValidationException; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchService; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.MockTransportClient; @@ -2492,14 +2493,20 @@ public final class InternalTestCluster extends TestCluster { for (NodeAndClient nodeAndClient : nodes.values()) { CircuitBreaker inFlightRequestsBreaker = getInstance(CircuitBreakerService.class, nodeAndClient.name) .getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); + TaskManager taskManager = getInstance(TransportService.class, nodeAndClient.name).getTaskManager(); try { // see #ensureEstimatedStats() assertBusy(() -> { // ensure that our size accounting on transport level is reset properly long bytesUsed = inFlightRequestsBreaker.getUsed(); - assertThat("All incoming requests on node [" + nodeAndClient.name + "] should have finished. Expected 0 but got " + - bytesUsed, bytesUsed, equalTo(0L)); - }); + if (bytesUsed != 0) { + String pendingTasks = taskManager.getTasks().values().stream() + .map(t -> t.taskInfo(nodeAndClient.name, true).toString()) + .collect(Collectors.joining(",", "[", "]")); + throw new AssertionError("All incoming requests on node [" + nodeAndClient.name + "] should have finished. " + + "Expected 0 but got " + bytesUsed + "; pending tasks [" + pendingTasks + "]"); + } + }, 1, TimeUnit.MINUTES); } catch (Exception e) { logger.error("Could not assert finished requests within timeout", e); fail("Could not assert finished requests within timeout on node [" + nodeAndClient.name + "]");