From 52266d8b115ce8633e57ff1e5abfbb951496e67b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 23 Oct 2018 21:08:34 -0400 Subject: [PATCH] TEST: Clone replicas list when compute replication targets (#34728) In #34407, we supposed to clone the list of replicas of ReplicationGroup when computing replication targets, but somehow we missed it. If we don't clone the list, a WriteReplicationAction may use an old ReplicationTargets which consists replicas which are removed from the current list of replicas Relates #34407 Closes #33457 --- .../ESIndexLevelReplicationTestCase.java | 32 ++++++++++++++++--- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index c4881d06351..60a7655e9ed 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -79,6 +79,7 @@ import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; @@ -295,6 +296,9 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase .filter(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())).findFirst().isPresent() == false : "replica with aId [" + replica.routingEntry().allocationId() + "] already exists"; replicas.add(replica); + if (replicationTargets != null) { + replicationTargets.addReplica(replica); + } updateAllocationIDsOnPrimary(); } @@ -310,6 +314,9 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase newShard(shardRouting, shardPath, indexMetaData, null, null, getEngineFactory(shardRouting), () -> {}, EMPTY_EVENT_LISTENER); replicas.add(newReplica); + if (replicationTargets != null) { + replicationTargets.addReplica(newReplica); + } updateAllocationIDsOnPrimary(); return newReplica; } @@ -496,7 +503,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase } private synchronized void computeReplicationTargets() { - this.replicationTargets = new ReplicationTargets(primary, replicas); + this.replicationTargets = new ReplicationTargets(this.primary, new ArrayList<>(this.replicas)); } private synchronized ReplicationTargets getReplicationTargets() { @@ -510,7 +517,25 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase ReplicationTargets(IndexShard primary, List replicas) { this.primary = primary; - this.replicas = Collections.unmodifiableList(replicas); + this.replicas = replicas; + } + + /** + * This does not modify the replication targets, but only adds a replica to the list. + * If the targets is updated to include the given replica, a replication action would + * be able to find this replica to execute write requests on it. + */ + synchronized void addReplica(IndexShard replica) { + replicas.add(replica); + } + + synchronized IndexShard findReplicaShard(ShardRouting replicaRouting) { + for (IndexShard replica : replicas) { + if (replica.routingEntry().isSameAllocation(replicaRouting)) { + return replica; + } + } + throw new AssertionError("replica [" + replicaRouting + "] is not found; replicas[" + replicas + "] primary[" + primary + "]"); } } @@ -614,8 +639,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes, final ActionListener listener) { - IndexShard replica = replicationTargets.replicas.stream() - .filter(s -> replicaRouting.isSameAllocation(s.routingEntry())).findFirst().get(); + IndexShard replica = replicationTargets.findReplicaShard(replicaRouting); replica.acquireReplicaOperationPermit( getPrimaryShard().getPendingPrimaryTerm(), globalCheckpoint,