diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 58fea953850..c4881d06351 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -164,6 +164,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase private final AtomicInteger replicaId = new AtomicInteger(); private final AtomicInteger docId = new AtomicInteger(); boolean closed = false; + private ReplicationTargets replicationTargets; + private final PrimaryReplicaSyncer primaryReplicaSyncer = new PrimaryReplicaSyncer(Settings.EMPTY, new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), (request, parentTask, primaryAllocationId, primaryTerm, listener) -> { @@ -277,6 +279,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase for (final IndexShard replica : replicas) { recoverReplica(replica); } + computeReplicationTargets(); } public IndexShard addReplica() throws IOException { @@ -320,8 +323,9 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase */ public Future promoteReplicaToPrimary(IndexShard replica) throws IOException { PlainActionFuture fut = new PlainActionFuture<>(); - promoteReplicaToPrimary(replica, - (shard, listener) -> primaryReplicaSyncer.resync(shard, + promoteReplicaToPrimary(replica, (shard, listener) -> { + computeReplicationTargets(); + primaryReplicaSyncer.resync(shard, new ActionListener() { @Override public void onResponse(PrimaryReplicaSyncer.ResyncTask resyncTask) { @@ -334,7 +338,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase listener.onFailure(e); fut.onFailure(e); } - })); + }); + }); return fut; } @@ -370,6 +375,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase final boolean removed = replicas.remove(replica); if (removed) { updateAllocationIDsOnPrimary(); + computeReplicationTargets(); } return removed; } @@ -392,6 +398,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase ESIndexLevelReplicationTestCase.this.recoverUnstartedReplica(replica, primary, targetSupplier, markAsRecovering, inSyncIds, routingTable); ESIndexLevelReplicationTestCase.this.startReplicaAfterRecovery(replica, primary, inSyncIds, routingTable); + computeReplicationTargets(); } public synchronized DiscoveryNode getPrimaryNode() { @@ -468,6 +475,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase public synchronized void reinitPrimaryShard() throws IOException { primary = reinitShard(primary); + computeReplicationTargets(); } public void syncGlobalCheckpoint() { @@ -486,6 +494,24 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase currentClusterStateVersion.incrementAndGet(), activeIds(), routingTable(Function.identity()), Collections.emptySet()); } + + private synchronized void computeReplicationTargets() { + this.replicationTargets = new ReplicationTargets(primary, replicas); + } + + private synchronized ReplicationTargets getReplicationTargets() { + return replicationTargets; + } + } + + static final class ReplicationTargets { + final IndexShard primary; + final List replicas; + + ReplicationTargets(IndexShard primary, List replicas) { + this.primary = primary; + this.replicas = Collections.unmodifiableList(replicas); + } } protected abstract class ReplicationAction, @@ -493,13 +519,13 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase Response extends ReplicationResponse> { private final Request request; private ActionListener listener; - private final ReplicationGroup replicationGroup; + private final ReplicationTargets replicationTargets; private final String opType; protected ReplicationAction(Request request, ActionListener listener, ReplicationGroup group, String opType) { this.request = request; this.listener = listener; - this.replicationGroup = group; + this.replicationTargets = group.getReplicationTargets(); this.opType = opType; } @@ -523,7 +549,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase } IndexShard getPrimaryShard() { - return replicationGroup.primary; + return replicationTargets.primary; } protected abstract PrimaryResult performOnPrimary(IndexShard primary, Request request) throws Exception; @@ -534,7 +560,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase @Override public ShardRouting routingEntry() { - return replicationGroup.primary.routingEntry(); + return getPrimaryShard().routingEntry(); } @Override @@ -544,37 +570,37 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase @Override public PrimaryResult perform(Request request) throws Exception { - return performOnPrimary(replicationGroup.primary, request); + return performOnPrimary(getPrimaryShard(), request); } @Override public void updateLocalCheckpointForShard(String allocationId, long checkpoint) { - replicationGroup.getPrimary().updateLocalCheckpointForShard(allocationId, checkpoint); + getPrimaryShard().updateLocalCheckpointForShard(allocationId, checkpoint); } @Override public void updateGlobalCheckpointForShard(String allocationId, long globalCheckpoint) { - replicationGroup.getPrimary().updateGlobalCheckpointForShard(allocationId, globalCheckpoint); + getPrimaryShard().updateGlobalCheckpointForShard(allocationId, globalCheckpoint); } @Override public long localCheckpoint() { - return replicationGroup.getPrimary().getLocalCheckpoint(); + return getPrimaryShard().getLocalCheckpoint(); } @Override public long globalCheckpoint() { - return replicationGroup.getPrimary().getGlobalCheckpoint(); + return getPrimaryShard().getGlobalCheckpoint(); } @Override public long maxSeqNoOfUpdatesOrDeletes() { - return replicationGroup.getPrimary().getMaxSeqNoOfUpdatesOrDeletes(); + return getPrimaryShard().getMaxSeqNoOfUpdatesOrDeletes(); } @Override public org.elasticsearch.index.shard.ReplicationGroup getReplicationGroup() { - return replicationGroup.primary.getReplicationGroup(); + return getPrimaryShard().getReplicationGroup(); } } @@ -588,10 +614,10 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes, final ActionListener listener) { - IndexShard replica = replicationGroup.replicas.stream() + IndexShard replica = replicationTargets.replicas.stream() .filter(s -> replicaRouting.isSameAllocation(s.routingEntry())).findFirst().get(); replica.acquireReplicaOperationPermit( - replicationGroup.primary.getPendingPrimaryTerm(), + getPrimaryShard().getPendingPrimaryTerm(), globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, new ActionListener() {