TEST: Access cluster state directly in assertSeqNos (#33277)
Some AbstractDisruptionTestCase tests start failing since we enabled assertSeqNos (in #33130). They fail because the assertSeqNos assertion queries cluster stats while the cluster is disrupted or not formed yet. This commit switches to use the cluster state and shard stats directly from the test cluster. Closes #33251
This commit is contained in:
parent
8a2d154bad
commit
39839f97ef
|
@ -20,12 +20,15 @@
|
|||
package org.elasticsearch.test;
|
||||
|
||||
import com.carrotsearch.hppc.ObjectLongMap;
|
||||
import com.carrotsearch.hppc.cursors.IntObjectCursor;
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
import com.carrotsearch.randomizedtesting.RandomizedContext;
|
||||
import com.carrotsearch.randomizedtesting.annotations.TestGroup;
|
||||
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
|
||||
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
||||
import org.apache.http.HttpHost;
|
||||
import org.apache.lucene.search.Sort;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
|
@ -48,10 +51,6 @@ import org.elasticsearch.action.admin.indices.segments.IndexSegments;
|
|||
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
|
||||
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
|
||||
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndexStats;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
||||
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
|
||||
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
|
@ -187,7 +186,6 @@ import java.util.IdentityHashMap;
|
|||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
|
@ -2328,40 +2326,48 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
|||
|
||||
protected void assertSeqNos() throws Exception {
|
||||
assertBusy(() -> {
|
||||
IndicesStatsResponse stats = client().admin().indices().prepareStats().clear().get();
|
||||
for (IndexStats indexStats : stats.getIndices().values()) {
|
||||
for (IndexShardStats indexShardStats : indexStats.getIndexShards().values()) {
|
||||
Optional<ShardStats> maybePrimary = Stream.of(indexShardStats.getShards())
|
||||
.filter(s -> s.getShardRouting().active() && s.getShardRouting().primary())
|
||||
.findFirst();
|
||||
if (maybePrimary.isPresent() == false) {
|
||||
final ClusterState state = clusterService().state();
|
||||
for (ObjectObjectCursor<String, IndexRoutingTable> indexRoutingTable : state.routingTable().indicesRouting()) {
|
||||
for (IntObjectCursor<IndexShardRoutingTable> indexShardRoutingTable : indexRoutingTable.value.shards()) {
|
||||
ShardRouting primaryShardRouting = indexShardRoutingTable.value.primaryShard();
|
||||
if (primaryShardRouting == null || primaryShardRouting.assignedToNode() == false) {
|
||||
continue;
|
||||
}
|
||||
ShardStats primary = maybePrimary.get();
|
||||
final SeqNoStats primarySeqNoStats = primary.getSeqNoStats();
|
||||
final ShardRouting primaryShardRouting = primary.getShardRouting();
|
||||
DiscoveryNode primaryNode = state.nodes().get(primaryShardRouting.currentNodeId());
|
||||
IndexShard primaryShard = internalCluster().getInstance(IndicesService.class, primaryNode.getName())
|
||||
.indexServiceSafe(primaryShardRouting.index()).getShard(primaryShardRouting.id());
|
||||
final SeqNoStats primarySeqNoStats;
|
||||
final ObjectLongMap<String> syncGlobalCheckpoints;
|
||||
try {
|
||||
primarySeqNoStats = primaryShard.seqNoStats();
|
||||
syncGlobalCheckpoints = primaryShard.getInSyncGlobalCheckpoints();
|
||||
} catch (AlreadyClosedException ex) {
|
||||
continue; // shard is closed - just ignore
|
||||
}
|
||||
assertThat(primaryShardRouting + " should have set the global checkpoint",
|
||||
primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)));
|
||||
final DiscoveryNode node = clusterService().state().nodes().get(primaryShardRouting.currentNodeId());
|
||||
final IndicesService indicesService =
|
||||
internalCluster().getInstance(IndicesService.class, node.getName());
|
||||
final IndexShard indexShard = indicesService.getShardOrNull(primaryShardRouting.shardId());
|
||||
final ObjectLongMap<String> globalCheckpoints = indexShard.getInSyncGlobalCheckpoints();
|
||||
for (ShardStats shardStats : indexShardStats) {
|
||||
final SeqNoStats seqNoStats = shardStats.getSeqNoStats();
|
||||
if (seqNoStats == null) {
|
||||
continue; // this shard was closed
|
||||
primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)));
|
||||
for (ShardRouting replicaShardRouting : indexShardRoutingTable.value.replicaShards()) {
|
||||
if (replicaShardRouting.assignedToNode() == false) {
|
||||
continue;
|
||||
}
|
||||
assertThat(shardStats.getShardRouting() + " local checkpoint mismatch",
|
||||
seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint()));
|
||||
assertThat(shardStats.getShardRouting() + " global checkpoint mismatch",
|
||||
seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint()));
|
||||
assertThat(shardStats.getShardRouting() + " max seq no mismatch",
|
||||
seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo()));
|
||||
DiscoveryNode replicaNode = state.nodes().get(replicaShardRouting.currentNodeId());
|
||||
IndexShard replicaShard = internalCluster().getInstance(IndicesService.class, replicaNode.getName())
|
||||
.indexServiceSafe(replicaShardRouting.index()).getShard(replicaShardRouting.id());
|
||||
final SeqNoStats seqNoStats;
|
||||
try {
|
||||
seqNoStats = replicaShard.seqNoStats();
|
||||
} catch (AlreadyClosedException e) {
|
||||
continue; // shard is closed - just ignore
|
||||
}
|
||||
assertThat(replicaShardRouting + " local checkpoint mismatch",
|
||||
seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint()));
|
||||
assertThat(replicaShardRouting + " global checkpoint mismatch",
|
||||
seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint()));
|
||||
assertThat(replicaShardRouting + " max seq no mismatch",
|
||||
seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo()));
|
||||
// the local knowledge on the primary of the global checkpoint equals the global checkpoint on the shard
|
||||
assertThat(
|
||||
seqNoStats.getGlobalCheckpoint(),
|
||||
equalTo(globalCheckpoints.get(shardStats.getShardRouting().allocationId().getId())));
|
||||
assertThat(replicaShardRouting + " global checkpoint syncs mismatch", seqNoStats.getGlobalCheckpoint(),
|
||||
equalTo(syncGlobalCheckpoints.get(replicaShardRouting.allocationId().getId())));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue