TEST: Clone replicas list when compute replication targets (#34728)
In #34407, we supposed to clone the list of replicas of ReplicationGroup when computing replication targets, but somehow we missed it. If we don't clone the list, a WriteReplicationAction may use an old ReplicationTargets which consists replicas which are removed from the current list of replicas Relates #34407 Closes #33457
This commit is contained in:
parent
8da1c9626a
commit
52266d8b11
|
@ -79,6 +79,7 @@ import org.elasticsearch.tasks.TaskManager;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
|
@ -295,6 +296,9 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
.filter(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())).findFirst().isPresent() == false :
|
||||
"replica with aId [" + replica.routingEntry().allocationId() + "] already exists";
|
||||
replicas.add(replica);
|
||||
if (replicationTargets != null) {
|
||||
replicationTargets.addReplica(replica);
|
||||
}
|
||||
updateAllocationIDsOnPrimary();
|
||||
}
|
||||
|
||||
|
@ -310,6 +314,9 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
newShard(shardRouting, shardPath, indexMetaData, null, null, getEngineFactory(shardRouting),
|
||||
() -> {}, EMPTY_EVENT_LISTENER);
|
||||
replicas.add(newReplica);
|
||||
if (replicationTargets != null) {
|
||||
replicationTargets.addReplica(newReplica);
|
||||
}
|
||||
updateAllocationIDsOnPrimary();
|
||||
return newReplica;
|
||||
}
|
||||
|
@ -496,7 +503,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
}
|
||||
|
||||
private synchronized void computeReplicationTargets() {
|
||||
this.replicationTargets = new ReplicationTargets(primary, replicas);
|
||||
this.replicationTargets = new ReplicationTargets(this.primary, new ArrayList<>(this.replicas));
|
||||
}
|
||||
|
||||
private synchronized ReplicationTargets getReplicationTargets() {
|
||||
|
@ -510,7 +517,25 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
|
||||
ReplicationTargets(IndexShard primary, List<IndexShard> replicas) {
|
||||
this.primary = primary;
|
||||
this.replicas = Collections.unmodifiableList(replicas);
|
||||
this.replicas = replicas;
|
||||
}
|
||||
|
||||
/**
|
||||
* This does not modify the replication targets, but only adds a replica to the list.
|
||||
* If the targets is updated to include the given replica, a replication action would
|
||||
* be able to find this replica to execute write requests on it.
|
||||
*/
|
||||
synchronized void addReplica(IndexShard replica) {
|
||||
replicas.add(replica);
|
||||
}
|
||||
|
||||
synchronized IndexShard findReplicaShard(ShardRouting replicaRouting) {
|
||||
for (IndexShard replica : replicas) {
|
||||
if (replica.routingEntry().isSameAllocation(replicaRouting)) {
|
||||
return replica;
|
||||
}
|
||||
}
|
||||
throw new AssertionError("replica [" + replicaRouting + "] is not found; replicas[" + replicas + "] primary[" + primary + "]");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -614,8 +639,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
final long globalCheckpoint,
|
||||
final long maxSeqNoOfUpdatesOrDeletes,
|
||||
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
|
||||
IndexShard replica = replicationTargets.replicas.stream()
|
||||
.filter(s -> replicaRouting.isSameAllocation(s.routingEntry())).findFirst().get();
|
||||
IndexShard replica = replicationTargets.findReplicaShard(replicaRouting);
|
||||
replica.acquireReplicaOperationPermit(
|
||||
getPrimaryShard().getPendingPrimaryTerm(),
|
||||
globalCheckpoint,
|
||||
|
|
Loading…
Reference in New Issue