TEST: Capture replication targets when replication group ready (#34407)

Today, WriteReplicationAction uses a set of replication targets directly
from the primary shard of ReplicationGroup. It should be fine except
when we add/remove or promote a shard while a write action is executing.
We have encountered these two issues:

1. Replicas are not found in the replication targets. This happens
because we remove replicas but the WriteReplicationAction still uses the
old replication targets which include the removed replicas.

2. Access ReplicationGroup from a primary shard which hasn't activated
the primary-mode yet. This is because we won't activate the primary-mode
for a promoting shard after bumping the primary term which is executed
asynchronously.

This commit captures the replication targets when the replication group
is ready and continue using those targets until we re-compute the new
targets after the group is changed.

Closes #33457
This commit is contained in:
Nhat Nguyen 2018-10-17 17:37:52 -04:00 committed by GitHub
parent a45626deb5
commit eb36f10394
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 42 additions and 16 deletions

View File

@ -164,6 +164,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
private final AtomicInteger replicaId = new AtomicInteger();
private final AtomicInteger docId = new AtomicInteger();
boolean closed = false;
private ReplicationTargets replicationTargets;
private final PrimaryReplicaSyncer primaryReplicaSyncer = new PrimaryReplicaSyncer(Settings.EMPTY,
new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()),
(request, parentTask, primaryAllocationId, primaryTerm, listener) -> {
@ -277,6 +279,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
for (final IndexShard replica : replicas) {
recoverReplica(replica);
}
computeReplicationTargets();
}
public IndexShard addReplica() throws IOException {
@ -320,8 +323,9 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
*/
public Future<PrimaryReplicaSyncer.ResyncTask> promoteReplicaToPrimary(IndexShard replica) throws IOException {
PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<>();
promoteReplicaToPrimary(replica,
(shard, listener) -> primaryReplicaSyncer.resync(shard,
promoteReplicaToPrimary(replica, (shard, listener) -> {
computeReplicationTargets();
primaryReplicaSyncer.resync(shard,
new ActionListener<PrimaryReplicaSyncer.ResyncTask>() {
@Override
public void onResponse(PrimaryReplicaSyncer.ResyncTask resyncTask) {
@ -334,7 +338,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
listener.onFailure(e);
fut.onFailure(e);
}
}));
});
});
return fut;
}
@ -370,6 +375,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
final boolean removed = replicas.remove(replica);
if (removed) {
updateAllocationIDsOnPrimary();
computeReplicationTargets();
}
return removed;
}
@ -392,6 +398,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
ESIndexLevelReplicationTestCase.this.recoverUnstartedReplica(replica, primary, targetSupplier, markAsRecovering, inSyncIds,
routingTable);
ESIndexLevelReplicationTestCase.this.startReplicaAfterRecovery(replica, primary, inSyncIds, routingTable);
computeReplicationTargets();
}
public synchronized DiscoveryNode getPrimaryNode() {
@ -468,6 +475,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
public synchronized void reinitPrimaryShard() throws IOException {
primary = reinitShard(primary);
computeReplicationTargets();
}
public void syncGlobalCheckpoint() {
@ -486,6 +494,24 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
currentClusterStateVersion.incrementAndGet(),
activeIds(), routingTable(Function.identity()), Collections.emptySet());
}
private synchronized void computeReplicationTargets() {
this.replicationTargets = new ReplicationTargets(primary, replicas);
}
private synchronized ReplicationTargets getReplicationTargets() {
return replicationTargets;
}
}
static final class ReplicationTargets {
final IndexShard primary;
final List<IndexShard> replicas;
ReplicationTargets(IndexShard primary, List<IndexShard> replicas) {
this.primary = primary;
this.replicas = Collections.unmodifiableList(replicas);
}
}
protected abstract class ReplicationAction<Request extends ReplicationRequest<Request>,
@ -493,13 +519,13 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
Response extends ReplicationResponse> {
private final Request request;
private ActionListener<Response> listener;
private final ReplicationGroup replicationGroup;
private final ReplicationTargets replicationTargets;
private final String opType;
protected ReplicationAction(Request request, ActionListener<Response> listener, ReplicationGroup group, String opType) {
this.request = request;
this.listener = listener;
this.replicationGroup = group;
this.replicationTargets = group.getReplicationTargets();
this.opType = opType;
}
@ -523,7 +549,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
}
IndexShard getPrimaryShard() {
return replicationGroup.primary;
return replicationTargets.primary;
}
protected abstract PrimaryResult performOnPrimary(IndexShard primary, Request request) throws Exception;
@ -534,7 +560,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
@Override
public ShardRouting routingEntry() {
return replicationGroup.primary.routingEntry();
return getPrimaryShard().routingEntry();
}
@Override
@ -544,37 +570,37 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
@Override
public PrimaryResult perform(Request request) throws Exception {
return performOnPrimary(replicationGroup.primary, request);
return performOnPrimary(getPrimaryShard(), request);
}
@Override
public void updateLocalCheckpointForShard(String allocationId, long checkpoint) {
replicationGroup.getPrimary().updateLocalCheckpointForShard(allocationId, checkpoint);
getPrimaryShard().updateLocalCheckpointForShard(allocationId, checkpoint);
}
@Override
public void updateGlobalCheckpointForShard(String allocationId, long globalCheckpoint) {
replicationGroup.getPrimary().updateGlobalCheckpointForShard(allocationId, globalCheckpoint);
getPrimaryShard().updateGlobalCheckpointForShard(allocationId, globalCheckpoint);
}
@Override
public long localCheckpoint() {
return replicationGroup.getPrimary().getLocalCheckpoint();
return getPrimaryShard().getLocalCheckpoint();
}
@Override
public long globalCheckpoint() {
return replicationGroup.getPrimary().getGlobalCheckpoint();
return getPrimaryShard().getGlobalCheckpoint();
}
@Override
public long maxSeqNoOfUpdatesOrDeletes() {
return replicationGroup.getPrimary().getMaxSeqNoOfUpdatesOrDeletes();
return getPrimaryShard().getMaxSeqNoOfUpdatesOrDeletes();
}
@Override
public org.elasticsearch.index.shard.ReplicationGroup getReplicationGroup() {
return replicationGroup.primary.getReplicationGroup();
return getPrimaryShard().getReplicationGroup();
}
}
@ -588,10 +614,10 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
IndexShard replica = replicationGroup.replicas.stream()
IndexShard replica = replicationTargets.replicas.stream()
.filter(s -> replicaRouting.isSameAllocation(s.routingEntry())).findFirst().get();
replica.acquireReplicaOperationPermit(
replicationGroup.primary.getPendingPrimaryTerm(),
getPrimaryShard().getPendingPrimaryTerm(),
globalCheckpoint,
maxSeqNoOfUpdatesOrDeletes,
new ActionListener<Releasable>() {