diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncIT.java index d9cb3dc1553..235bb04d3af 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncIT.java @@ -19,10 +19,6 @@ package org.elasticsearch.index.seqno; -import org.elasticsearch.action.admin.indices.stats.IndexShardStats; -import org.elasticsearch.action.admin.indices.stats.IndexStats; -import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; -import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -43,7 +39,6 @@ import org.elasticsearch.transport.TransportService; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Optional; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; @@ -200,29 +195,19 @@ public class GlobalCheckpointSyncIT extends ESIntegTestCase { afterIndexing.accept(client()); assertBusy(() -> { - final IndicesStatsResponse stats = client().admin().indices().prepareStats().clear().get(); - final IndexStats indexStats = stats.getIndex("test"); - for (final IndexShardStats indexShardStats : indexStats.getIndexShards().values()) { - Optional maybePrimary = - Stream.of(indexShardStats.getShards()) - .filter(s -> s.getShardRouting().active() && s.getShardRouting().primary()) - .findFirst(); - if (!maybePrimary.isPresent()) { - continue; - } - final ShardStats primary = maybePrimary.get(); - final SeqNoStats primarySeqNoStats = primary.getSeqNoStats(); - for (final ShardStats shardStats : indexShardStats) { - final SeqNoStats seqNoStats = shardStats.getSeqNoStats(); - if (seqNoStats == null) { - // the shard is initializing - continue; + for (IndicesService indicesService : internalCluster().getDataNodeInstances(IndicesService.class)) { + for (IndexService indexService : indicesService) { + for (IndexShard shard : indexService) { + if (shard.routingEntry().primary()) { + final SeqNoStats seqNoStats = shard.seqNoStats(); + assertThat("shard " + shard.routingEntry() + " seq_no [" + seqNoStats + "]", + seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo())); + } } - assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint())); } } - }, 30, TimeUnit.SECONDS); - + }, 60, TimeUnit.SECONDS); + ensureGreen("test"); for (final Thread thread : threads) { thread.join(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index c1d08ae8b8f..e8a26a05e23 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -107,6 +107,7 @@ import org.elasticsearch.node.NodeValidationException; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchService; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.MockTransportClient; @@ -2492,14 +2493,20 @@ public final class InternalTestCluster extends TestCluster { for (NodeAndClient nodeAndClient : nodes.values()) { CircuitBreaker inFlightRequestsBreaker = getInstance(CircuitBreakerService.class, nodeAndClient.name) .getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); + TaskManager taskManager = getInstance(TransportService.class, nodeAndClient.name).getTaskManager(); try { // see #ensureEstimatedStats() assertBusy(() -> { // ensure that our size accounting on transport level is reset properly long bytesUsed = inFlightRequestsBreaker.getUsed(); - assertThat("All incoming requests on node [" + nodeAndClient.name + "] should have finished. Expected 0 but got " + - bytesUsed, bytesUsed, equalTo(0L)); - }); + if (bytesUsed != 0) { + String pendingTasks = taskManager.getTasks().values().stream() + .map(t -> t.taskInfo(nodeAndClient.name, true).toString()) + .collect(Collectors.joining(",", "[", "]")); + throw new AssertionError("All incoming requests on node [" + nodeAndClient.name + "] should have finished. " + + "Expected 0 but got " + bytesUsed + "; pending tasks [" + pendingTasks + "]"); + } + }, 1, TimeUnit.MINUTES); } catch (Exception e) { logger.error("Could not assert finished requests within timeout", e); fail("Could not assert finished requests within timeout on node [" + nodeAndClient.name + "]");