diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 1f51ad495e1..322e2a128c9 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -20,12 +20,15 @@ package org.elasticsearch.test; import com.carrotsearch.hppc.ObjectLongMap; +import com.carrotsearch.hppc.cursors.IntObjectCursor; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import com.carrotsearch.randomizedtesting.RandomizedContext; import com.carrotsearch.randomizedtesting.annotations.TestGroup; import com.carrotsearch.randomizedtesting.generators.RandomNumbers; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import org.apache.http.HttpHost; import org.apache.lucene.search.Sort; +import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; @@ -48,10 +51,6 @@ import org.elasticsearch.action.admin.indices.segments.IndexSegments; import org.elasticsearch.action.admin.indices.segments.IndexShardSegments; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; import org.elasticsearch.action.admin.indices.segments.ShardSegments; -import org.elasticsearch.action.admin.indices.stats.IndexShardStats; -import org.elasticsearch.action.admin.indices.stats.IndexStats; -import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; -import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; @@ -187,7 +186,6 @@ import java.util.IdentityHashMap; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; @@ -2328,40 +2326,48 @@ public abstract class ESIntegTestCase extends ESTestCase { protected void assertSeqNos() throws Exception { assertBusy(() -> { - IndicesStatsResponse stats = client().admin().indices().prepareStats().clear().get(); - for (IndexStats indexStats : stats.getIndices().values()) { - for (IndexShardStats indexShardStats : indexStats.getIndexShards().values()) { - Optional maybePrimary = Stream.of(indexShardStats.getShards()) - .filter(s -> s.getShardRouting().active() && s.getShardRouting().primary()) - .findFirst(); - if (maybePrimary.isPresent() == false) { + final ClusterState state = clusterService().state(); + for (ObjectObjectCursor indexRoutingTable : state.routingTable().indicesRouting()) { + for (IntObjectCursor indexShardRoutingTable : indexRoutingTable.value.shards()) { + ShardRouting primaryShardRouting = indexShardRoutingTable.value.primaryShard(); + if (primaryShardRouting == null || primaryShardRouting.assignedToNode() == false) { continue; } - ShardStats primary = maybePrimary.get(); - final SeqNoStats primarySeqNoStats = primary.getSeqNoStats(); - final ShardRouting primaryShardRouting = primary.getShardRouting(); + DiscoveryNode primaryNode = state.nodes().get(primaryShardRouting.currentNodeId()); + IndexShard primaryShard = internalCluster().getInstance(IndicesService.class, primaryNode.getName()) + .indexServiceSafe(primaryShardRouting.index()).getShard(primaryShardRouting.id()); + final SeqNoStats primarySeqNoStats; + final ObjectLongMap syncGlobalCheckpoints; + try { + primarySeqNoStats = primaryShard.seqNoStats(); + syncGlobalCheckpoints = primaryShard.getInSyncGlobalCheckpoints(); + } catch (AlreadyClosedException ex) { + continue; // shard is closed - just ignore + } assertThat(primaryShardRouting + " should have set the global checkpoint", - primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO))); - final DiscoveryNode node = clusterService().state().nodes().get(primaryShardRouting.currentNodeId()); - final IndicesService indicesService = - internalCluster().getInstance(IndicesService.class, node.getName()); - final IndexShard indexShard = indicesService.getShardOrNull(primaryShardRouting.shardId()); - final ObjectLongMap globalCheckpoints = indexShard.getInSyncGlobalCheckpoints(); - for (ShardStats shardStats : indexShardStats) { - final SeqNoStats seqNoStats = shardStats.getSeqNoStats(); - if (seqNoStats == null) { - continue; // this shard was closed + primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO))); + for (ShardRouting replicaShardRouting : indexShardRoutingTable.value.replicaShards()) { + if (replicaShardRouting.assignedToNode() == false) { + continue; } - assertThat(shardStats.getShardRouting() + " local checkpoint mismatch", - seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint())); - assertThat(shardStats.getShardRouting() + " global checkpoint mismatch", - seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint())); - assertThat(shardStats.getShardRouting() + " max seq no mismatch", - seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo())); + DiscoveryNode replicaNode = state.nodes().get(replicaShardRouting.currentNodeId()); + IndexShard replicaShard = internalCluster().getInstance(IndicesService.class, replicaNode.getName()) + .indexServiceSafe(replicaShardRouting.index()).getShard(replicaShardRouting.id()); + final SeqNoStats seqNoStats; + try { + seqNoStats = replicaShard.seqNoStats(); + } catch (AlreadyClosedException e) { + continue; // shard is closed - just ignore + } + assertThat(replicaShardRouting + " local checkpoint mismatch", + seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint())); + assertThat(replicaShardRouting + " global checkpoint mismatch", + seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint())); + assertThat(replicaShardRouting + " max seq no mismatch", + seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo())); // the local knowledge on the primary of the global checkpoint equals the global checkpoint on the shard - assertThat( - seqNoStats.getGlobalCheckpoint(), - equalTo(globalCheckpoints.get(shardStats.getShardRouting().allocationId().getId()))); + assertThat(replicaShardRouting + " global checkpoint syncs mismatch", seqNoStats.getGlobalCheckpoint(), + equalTo(syncGlobalCheckpoints.get(replicaShardRouting.allocationId().getId()))); } } }