Replicate max seq_no of updates to replicas (#33967)

We start tracking max seq_no_of_updates on the primary in #33842. This
commit replicates that value from a primary to its replicas in replication 
requests or the translog phase of peer-recovery.

With this change, we guarantee that the value of max seq_no_of_updates
on a replica when any index/delete operation is performed at least the
max_seq_no_of_updates on the primary when that operation was executed.

Relates #33656
This commit is contained in:
Nhat Nguyen 2018-09-25 08:07:57 -04:00 committed by GitHub
parent 970407c663
commit 5166dd0a4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 386 additions and 125 deletions

View File

@ -31,6 +31,7 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
@ -114,9 +115,13 @@ public class ReplicationOperation<
// of the sampled replication group, and advanced further than what the given replication group would allow it to.
// This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
final long globalCheckpoint = primary.globalCheckpoint();
// we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of
// max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed on.
final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes();
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized";
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
markUnavailableShardsAsStale(replicaRequest, replicationGroup);
performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup);
performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
}
successfulShards.incrementAndGet(); // mark primary as successful
@ -136,7 +141,7 @@ public class ReplicationOperation<
}
private void performOnReplicas(final ReplicaRequest replicaRequest, final long globalCheckpoint,
final ReplicationGroup replicationGroup) {
final long maxSeqNoOfUpdatesOrDeletes, final ReplicationGroup replicationGroup) {
// for total stats, add number of unassigned shards and
// number of initializing shards that are not ready yet to receive operations (recovery has not opened engine yet on the target)
totalShards.addAndGet(replicationGroup.getSkippedShards().size());
@ -145,19 +150,20 @@ public class ReplicationOperation<
for (final ShardRouting shard : replicationGroup.getReplicationTargets()) {
if (shard.isSameAllocation(primaryRouting) == false) {
performOnReplica(shard, replicaRequest, globalCheckpoint);
performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
}
}
}
private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest, final long globalCheckpoint) {
private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest,
final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest);
}
totalShards.incrementAndGet();
pendingActions.incrementAndGet();
replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, new ActionListener<ReplicaResponse>() {
replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, new ActionListener<ReplicaResponse>() {
@Override
public void onResponse(ReplicaResponse response) {
successfulShards.incrementAndGet();
@ -322,6 +328,12 @@ public class ReplicationOperation<
*/
long globalCheckpoint();
/**
* Returns the maximum seq_no of updates (index operations overwrite Lucene) or deletes on the primary.
* This value must be captured after the execution of a replication request on the primary is completed.
*/
long maxSeqNoOfUpdatesOrDeletes();
/**
* Returns the current replication group on the primary shard
*
@ -338,12 +350,15 @@ public class ReplicationOperation<
/**
* Performs the specified request on the specified replica.
*
* @param replica the shard this request should be executed on
* @param replicaRequest the operation to perform
* @param globalCheckpoint the global checkpoint on the primary
* @param listener callback for handling the response or failure
* @param replica the shard this request should be executed on
* @param replicaRequest the operation to perform
* @param globalCheckpoint the global checkpoint on the primary
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwriting Lucene) or deletes on primary
* after this replication was executed on it.
* @param listener callback for handling the response or failure
*/
void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpoint, ActionListener<ReplicaResponse> listener);
void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpoint,
long maxSeqNoOfUpdatesOrDeletes, ActionListener<ReplicaResponse> listener);
/**
* Fail the specified shard if needed, removing it from the current set

View File

@ -200,7 +200,7 @@ public abstract class TransportReplicationAction<
/**
* Synchronously execute the specified replica operation. This is done under a permit from
* {@link IndexShard#acquireReplicaOperationPermit(long, long, ActionListener, String, Object)}.
* {@link IndexShard#acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)}.
*
* @param shardRequest the request to the replica shard
* @param replica the replica shard to perform the operation on
@ -489,6 +489,7 @@ public abstract class TransportReplicationAction<
replicaRequest.getTargetAllocationID(),
replicaRequest.getPrimaryTerm(),
replicaRequest.getGlobalCheckpoint(),
replicaRequest.getMaxSeqNoOfUpdatesOrDeletes(),
channel,
(ReplicationTask) task).run();
}
@ -513,6 +514,7 @@ public abstract class TransportReplicationAction<
private final String targetAllocationID;
private final long primaryTerm;
private final long globalCheckpoint;
private final long maxSeqNoOfUpdatesOrDeletes;
private final TransportChannel channel;
private final IndexShard replica;
/**
@ -528,6 +530,7 @@ public abstract class TransportReplicationAction<
String targetAllocationID,
long primaryTerm,
long globalCheckpoint,
long maxSeqNoOfUpdatesOrDeletes,
TransportChannel channel,
ReplicationTask task) {
this.request = request;
@ -536,6 +539,7 @@ public abstract class TransportReplicationAction<
this.targetAllocationID = targetAllocationID;
this.primaryTerm = primaryTerm;
this.globalCheckpoint = globalCheckpoint;
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
final ShardId shardId = request.shardId();
assert shardId != null : "request shardId must be set";
this.replica = getIndexShard(shardId);
@ -575,7 +579,8 @@ public abstract class TransportReplicationAction<
new TransportChannelResponseHandler<>(logger, channel, extraMessage,
() -> TransportResponse.Empty.INSTANCE);
transportService.sendRequest(clusterService.localNode(), transportReplicaAction,
new ConcreteReplicaRequest<>(request, targetAllocationID, primaryTerm, globalCheckpoint),
new ConcreteReplicaRequest<>(request, targetAllocationID, primaryTerm,
globalCheckpoint, maxSeqNoOfUpdatesOrDeletes),
handler);
}
@ -613,7 +618,7 @@ public abstract class TransportReplicationAction<
throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID,
actualAllocationId);
}
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, this, executor, request);
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, this, executor, request);
}
/**
@ -1023,6 +1028,11 @@ public abstract class TransportReplicationAction<
return indexShard.getGlobalCheckpoint();
}
@Override
public long maxSeqNoOfUpdatesOrDeletes() {
return indexShard.getMaxSeqNoOfUpdatesOrDeletes();
}
@Override
public ReplicationGroup getReplicationGroup() {
return indexShard.getReplicationGroup();
@ -1107,6 +1117,7 @@ public abstract class TransportReplicationAction<
final ShardRouting replica,
final ReplicaRequest request,
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
String nodeId = replica.currentNodeId();
final DiscoveryNode node = clusterService.state().nodes().get(nodeId);
@ -1114,8 +1125,8 @@ public abstract class TransportReplicationAction<
listener.onFailure(new NoNodeAvailableException("unknown node [" + nodeId + "]"));
return;
}
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest =
new ConcreteReplicaRequest<>(request, replica.allocationId().getId(), primaryTerm, globalCheckpoint);
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest = new ConcreteReplicaRequest<>(
request, replica.allocationId().getId(), primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
sendReplicaRequest(replicaRequest, node, listener);
}
@ -1263,15 +1274,17 @@ public abstract class TransportReplicationAction<
protected static final class ConcreteReplicaRequest<R extends TransportRequest> extends ConcreteShardRequest<R> {
private long globalCheckpoint;
private long maxSeqNoOfUpdatesOrDeletes;
public ConcreteReplicaRequest(final Supplier<R> requestSupplier) {
super(requestSupplier);
}
public ConcreteReplicaRequest(final R request, final String targetAllocationID, final long primaryTerm,
final long globalCheckpoint) {
final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes) {
super(request, targetAllocationID, primaryTerm);
this.globalCheckpoint = globalCheckpoint;
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
}
@Override
@ -1282,6 +1295,13 @@ public abstract class TransportReplicationAction<
} else {
globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
maxSeqNoOfUpdatesOrDeletes = in.readZLong();
} else {
// UNASSIGNED_SEQ_NO (-2) means uninitialized, and replicas will disable
// optimization using seq_no if its max_seq_no_of_updates is still uninitialized
maxSeqNoOfUpdatesOrDeletes = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
}
@Override
@ -1290,12 +1310,19 @@ public abstract class TransportReplicationAction<
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
out.writeZLong(globalCheckpoint);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeZLong(maxSeqNoOfUpdatesOrDeletes);
}
}
public long getGlobalCheckpoint() {
return globalCheckpoint;
}
public long getMaxSeqNoOfUpdatesOrDeletes() {
return maxSeqNoOfUpdatesOrDeletes;
}
@Override
public String toString() {
return "ConcreteReplicaRequest{" +
@ -1303,6 +1330,7 @@ public abstract class TransportReplicationAction<
", primaryTerm='" + getPrimaryTerm() + '\'' +
", request=" + getRequest() +
", globalCheckpoint=" + globalCheckpoint +
", maxSeqNoOfUpdatesOrDeletes=" + maxSeqNoOfUpdatesOrDeletes +
'}';
}
}

View File

@ -47,6 +47,7 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.InfoStream;
import org.elasticsearch.Assertions;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;
@ -976,6 +977,7 @@ public class InternalEngine extends Engine {
if (plan.addStaleOpToLucene) {
addStaleDocs(index.docs(), indexWriter);
} else if (plan.useLuceneUpdateDocument) {
assert assertMaxSeqNoOfUpdatesIsAdvanced(index.uid(), plan.seqNoForIndexing, true, true);
updateDocs(index.uid(), index.docs(), indexWriter);
} else {
// document does not exists, we can optimize for create, but double check if assertions are running
@ -1275,8 +1277,8 @@ public class InternalEngine extends Engine {
return plan;
}
private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan)
throws IOException {
private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws IOException {
assert assertMaxSeqNoOfUpdatesIsAdvanced(delete.uid(), plan.seqNoOfDeletion, false, false);
try {
if (softDeleteEnabled) {
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.type(), delete.id());
@ -2556,6 +2558,29 @@ public class InternalEngine extends Engine {
assert maxUnsafeAutoIdTimestamp.get() <= maxSeenAutoIdTimestamp.get();
}
private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean allowDeleted, boolean relaxIfGapInSeqNo) {
final long maxSeqNoOfUpdates = getMaxSeqNoOfUpdatesOrDeletes();
// If the primary is on an old version which does not replicate msu, we need to relax this assertion for that.
if (maxSeqNoOfUpdates == SequenceNumbers.UNASSIGNED_SEQ_NO) {
assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_7_0_0_alpha1);
return true;
}
// We treat a delete on the tombstones on replicas as a regular document, then use updateDocument (not addDocument).
if (allowDeleted) {
final VersionValue versionValue = versionMap.getVersionForAssert(id.bytes());
if (versionValue != null && versionValue.isDelete()) {
return true;
}
}
// Operations can be processed on a replica in a different order than on the primary. If the order on the primary is index-1,
// delete-2, index-3, and the order on a replica is index-1, index-3, delete-2, then the msu of index-3 on the replica is 2
// even though it is an update (overwrites index-1). We should relax this assertion if there is a pending gap in the seq_no.
if (relaxIfGapInSeqNo && getLocalCheckpoint() < maxSeqNoOfUpdates) {
return true;
}
assert seqNo <= maxSeqNoOfUpdates : "id=" + id + " seq_no=" + seqNo + " msu=" + maxSeqNoOfUpdates;
return true;
}
@Override
public void initializeMaxSeqNoOfUpdatesOrDeletes() {

View File

@ -502,6 +502,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* the reverted operations on this shard by replaying the translog to avoid losing acknowledged writes.
*/
final Engine engine = getEngine();
if (getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
// If the old primary was on an old version that did not replicate the msu,
// we need to bootstrap it manually from its local history.
assert indexSettings.getIndexVersionCreated().before(Version.V_7_0_0_alpha1);
engine.advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo());
}
engine.restoreLocalHistoryFromTranslog((resettingEngine, snapshot) ->
runTranslogRecovery(resettingEngine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {}));
/* Rolling the translog generation is not strictly needed here (as we will never have collisions between
@ -511,12 +517,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
*/
engine.rollTranslogGeneration();
engine.fillSeqNoGaps(newPrimaryTerm);
if (getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
// TODO: Enable this assertion after we replicate max_seq_no_updates during replication
// assert indexSettings.getIndexVersionCreated().before(Version.V_7_0_0_alpha1) :
// indexSettings.getIndexVersionCreated();
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
}
replicationTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(), getLocalCheckpoint());
primaryReplicaSyncer.accept(this, new ActionListener<ResyncTask>() {
@Override
@ -1955,12 +1955,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint();
synchronized (mutex) {
replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex
// If the old primary was on an old version, this primary (was replica before)
// does not have max_of_updates yet. Thus we need to bootstrap it manually.
if (getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
// TODO: Enable this assertion after we replicate max_seq_no_updates during replication
// assert indexSettings.getIndexVersionCreated().before(Version.V_7_0_0_alpha1) : indexSettings.getIndexVersionCreated();
getEngine().initializeMaxSeqNoOfUpdatesOrDeletes();
// If the old primary was on an old version that did not replicate the msu,
// we need to bootstrap it manually from its local history.
assert indexSettings.getIndexVersionCreated().before(Version.V_7_0_0_alpha1);
getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo());
}
}
}
@ -2316,15 +2315,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* {@link IllegalStateException}. If permit acquisition is delayed, the listener will be invoked on the executor with the specified
* name.
*
* @param opPrimaryTerm the operation primary term
* @param globalCheckpoint the global checkpoint associated with the request
* @param onPermitAcquired the listener for permit acquisition
* @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed
* @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are enabled
* the tracing will capture the supplied object's {@link Object#toString()} value. Otherwise the object
* isn't used
* @param opPrimaryTerm the operation primary term
* @param globalCheckpoint the global checkpoint associated with the request
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwrite Lucene) or deletes captured on the primary
* after this replication request was executed on it (see {@link #getMaxSeqNoOfUpdatesOrDeletes()}
* @param onPermitAcquired the listener for permit acquisition
* @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed
* @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are
* enabled the tracing will capture the supplied object's {@link Object#toString()} value.
* Otherwise the object isn't used
*/
public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long globalCheckpoint,
public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay,
final Object debugInfo) {
verifyNotClosed();
@ -2378,6 +2379,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
assert assertReplicationTarget();
try {
updateGlobalCheckpointOnReplica(globalCheckpoint, "operation");
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes);
} catch (Exception e) {
releasable.close();
onPermitAcquired.onFailure(e);
@ -2729,12 +2731,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
newEngine = createNewEngine(newEngineConfig());
active.set(true);
}
newEngine.advanceMaxSeqNoOfUpdatesOrDeletes(globalCheckpoint);
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
// TODO: add a dedicate recovery stats for the reset translog
});
// TODO: do not use init method here but use advance with the max_seq_no received from the primary
newEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
newEngine.recoverFromTranslog(translogRunner, globalCheckpoint);
}
@ -2763,10 +2764,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* These transfers guarantee that every index/delete operation when executing on a replica engine will observe this marker a value
* which is at least the value of the max_seq_no_of_updates marker on the primary after that operation was executed on the primary.
*
* @see #acquireReplicaOperationPermit(long, long, ActionListener, String, Object)
* @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long)
* @see #acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)
* @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long, long)
*/
public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
assert seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO
|| getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO :
"replica has max_seq_no_of_updates=" + getMaxSeqNoOfUpdatesOrDeletes() + " but primary does not";
getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNo);
assert seqNo <= getMaxSeqNoOfUpdatesOrDeletes() : getMaxSeqNoOfUpdatesOrDeletes() + " < " + seqNo;
}

View File

@ -456,7 +456,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
final RecoveryTarget recoveryTarget = recoveryRef.target();
try {
recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(),
request.maxSeenAutoIdTimestampOnPrimary());
request.maxSeenAutoIdTimestampOnPrimary(), request.maxSeqNoOfUpdatesOrDeletesOnPrimary());
channel.sendResponse(new RecoveryTranslogOperationsResponse(recoveryTarget.indexShard().getLocalCheckpoint()));
} catch (MapperException exception) {
// in very rare cases a translog replay from primary is processed before a mapping update on this node

View File

@ -215,10 +215,12 @@ public class RecoverySourceHandler {
}
final long targetLocalCheckpoint;
try (Translog.Snapshot snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo)) {
// We have to capture the max auto_id_timestamp after taking a snapshot of operations to guarantee
// that the auto_id_timestamp of every operation in the snapshot is at most this timestamp value.
// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp);
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot,
maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
} catch (Exception e) {
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
}
@ -445,16 +447,17 @@ public class RecoverySourceHandler {
* point-in-time view of the translog). It then sends each translog operation to the target node so it can be replayed into the new
* shard.
*
* @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all
* ops should be sent
* @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo)
* @param endingSeqNo the highest sequence number that should be sent
* @param snapshot a snapshot of the translog
* @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary
* @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all
* ops should be sent
* @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo)
* @param endingSeqNo the highest sequence number that should be sent
* @param snapshot a snapshot of the translog
* @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates or deletes on the primary after these operations were executed on it.
* @return the local checkpoint on the target
*/
long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot,
final long maxSeenAutoIdTimestamp)
final long maxSeenAutoIdTimestamp, final long maxSeqNoOfUpdatesOrDeletes)
throws IOException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
@ -468,7 +471,7 @@ public class RecoverySourceHandler {
// send all the snapshot's translog operations to the target
final SendSnapshotResult result = sendSnapshot(
startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp);
startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
stopWatch.stop();
logger.trace("recovery [phase2]: took [{}]", stopWatch.totalTime());
@ -531,16 +534,18 @@ public class RecoverySourceHandler {
* <p>
* Operations are bulked into a single request depending on an operation count limit or size-in-bytes limit.
*
* @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent
* @param requiredSeqNoRangeStart the lower sequence number of the required range
* @param endingSeqNo the upper bound of the sequence number range to be sent (inclusive)
* @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the
* total number of operations sent
* @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary
* @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent
* @param requiredSeqNoRangeStart the lower sequence number of the required range
* @param endingSeqNo the upper bound of the sequence number range to be sent (inclusive)
* @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the
* total number of operations sent
* @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates or deletes on the primary after these operations were executed on it.
* @throws IOException if an I/O exception occurred reading the translog snapshot
*/
protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo,
final Translog.Snapshot snapshot, final long maxSeenAutoIdTimestamp) throws IOException {
final Translog.Snapshot snapshot, final long maxSeenAutoIdTimestamp,
final long maxSeqNoOfUpdatesOrDeletes) throws IOException {
assert requiredSeqNoRangeStart <= endingSeqNo + 1:
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo;
assert startingSeqNo <= requiredSeqNoRangeStart :
@ -558,8 +563,11 @@ public class RecoverySourceHandler {
logger.trace("no translog operations to send");
}
final CancellableThreads.IOInterruptable sendBatch = () ->
targetLocalCheckpoint.set(recoveryTarget.indexTranslogOperations(operations, expectedTotalOps, maxSeenAutoIdTimestamp));
final CancellableThreads.IOInterruptable sendBatch = () -> {
final long targetCheckpoint = recoveryTarget.indexTranslogOperations(
operations, expectedTotalOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
targetLocalCheckpoint.set(targetCheckpoint);
};
// send operations in batches
Translog.Operation operation;

View File

@ -386,8 +386,8 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
}
@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxSeenAutoIdTimestampOnPrimary) throws IOException {
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary,
long maxSeqNoOfDeletesOrUpdatesOnPrimary) throws IOException {
final RecoveryState.Translog translog = state().getTranslog();
translog.totalOperations(totalTranslogOps);
assert indexShard().recoveryState() == state();
@ -401,6 +401,11 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
* replay these operations first (without timestamp), then optimize append-only requests (with timestamp).
*/
indexShard().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampOnPrimary);
/*
* Bootstrap the max_seq_no_of_updates from the primary to make sure that the max_seq_no_of_updates on this replica when
* replaying any of these operations will be at least the max_seq_no_of_updates on the primary when that operation was executed on.
*/
indexShard().advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfDeletesOrUpdatesOnPrimary);
for (Translog.Operation operation : operations) {
Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY);
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {

View File

@ -59,13 +59,17 @@ public interface RecoveryTargetHandler {
/**
* Index a set of translog operations on the target
* @param operations operations to index
* @param totalTranslogOps current number of total operations expected to be indexed
* @param maxSeenAutoIdTimestampOnPrimary the maximum auto_id_timestamp of all append-only requests processed by the primary shard
*
* @param operations operations to index
* @param totalTranslogOps current number of total operations expected to be indexed
* @param maxSeenAutoIdTimestampOnPrimary the maximum auto_id_timestamp of all append-only requests processed by the primary shard
* @param maxSeqNoOfUpdatesOrDeletesOnPrimary the max seq_no of update operations (index operations overwrite Lucene) or delete ops on
* the primary shard when capturing these operations. This value is at least as high as the
* max_seq_no_of_updates on the primary was when any of these ops were processed on it.
* @return the local checkpoint on the target shard
*/
long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxSeenAutoIdTimestampOnPrimary) throws IOException;
long maxSeenAutoIdTimestampOnPrimary, long maxSeqNoOfUpdatesOrDeletesOnPrimary) throws IOException;
/**
* Notifies the target of the files it is going to receive

View File

@ -23,6 +23,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.transport.TransportRequest;
@ -37,17 +38,19 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest {
private List<Translog.Operation> operations;
private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;
private long maxSeenAutoIdTimestampOnPrimary;
private long maxSeqNoOfUpdatesOrDeletesOnPrimary;
public RecoveryTranslogOperationsRequest() {
}
RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List<Translog.Operation> operations,
int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary) {
RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List<Translog.Operation> operations, int totalTranslogOps,
long maxSeenAutoIdTimestampOnPrimary, long maxSeqNoOfUpdatesOrDeletesOnPrimary) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.operations = operations;
this.totalTranslogOps = totalTranslogOps;
this.maxSeenAutoIdTimestampOnPrimary = maxSeenAutoIdTimestampOnPrimary;
this.maxSeqNoOfUpdatesOrDeletesOnPrimary = maxSeqNoOfUpdatesOrDeletesOnPrimary;
}
public long recoveryId() {
@ -70,6 +73,10 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest {
return maxSeenAutoIdTimestampOnPrimary;
}
public long maxSeqNoOfUpdatesOrDeletesOnPrimary() {
return maxSeqNoOfUpdatesOrDeletesOnPrimary;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -82,6 +89,12 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest {
} else {
maxSeenAutoIdTimestampOnPrimary = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
maxSeqNoOfUpdatesOrDeletesOnPrimary = in.readZLong();
} else {
// UNASSIGNED_SEQ_NO means uninitialized and replica won't enable optimization using seq_no
maxSeqNoOfUpdatesOrDeletesOnPrimary = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
}
@Override
@ -94,5 +107,8 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest {
if (out.getVersion().onOrAfter(Version.V_6_5_0)) {
out.writeZLong(maxSeenAutoIdTimestampOnPrimary);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeZLong(maxSeqNoOfUpdatesOrDeletesOnPrimary);
}
}
}

View File

@ -110,9 +110,10 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
}
@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary) {
final RecoveryTranslogOperationsRequest translogOperationsRequest =
new RecoveryTranslogOperationsRequest(recoveryId, shardId, operations, totalTranslogOps, maxSeenAutoIdTimestampOnPrimary);
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxSeenAutoIdTimestampOnPrimary, long maxSeqNoOfDeletesOrUpdatesOnPrimary) {
final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(
recoveryId, shardId, operations, totalTranslogOps, maxSeenAutoIdTimestampOnPrimary, maxSeqNoOfDeletesOrUpdatesOnPrimary);
final TransportFuture<RecoveryTranslogOperationsResponse> future = transportService.submitRequest(
targetNode,
PeerRecoveryTargetService.Actions.TRANSLOG_OPS,

View File

@ -443,6 +443,7 @@ public class ReplicationOperationTests extends ESTestCase {
final ShardRouting routing;
final long localCheckpoint;
final long globalCheckpoint;
final long maxSeqNoOfUpdatesOrDeletes;
final Supplier<ReplicationGroup> replicationGroupSupplier;
final Map<String, Long> knownLocalCheckpoints = new HashMap<>();
final Map<String, Long> knownGlobalCheckpoints = new HashMap<>();
@ -452,6 +453,7 @@ public class ReplicationOperationTests extends ESTestCase {
this.replicationGroupSupplier = replicationGroupSupplier;
this.localCheckpoint = random().nextLong();
this.globalCheckpoint = randomNonNegativeLong();
this.maxSeqNoOfUpdatesOrDeletes = randomNonNegativeLong();
}
@Override
@ -515,6 +517,11 @@ public class ReplicationOperationTests extends ESTestCase {
return globalCheckpoint;
}
@Override
public long maxSeqNoOfUpdatesOrDeletes() {
return maxSeqNoOfUpdatesOrDeletes;
}
@Override
public ReplicationGroup getReplicationGroup() {
return replicationGroupSupplier.get();
@ -571,6 +578,7 @@ public class ReplicationOperationTests extends ESTestCase {
final ShardRouting replica,
final Request request,
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
assertTrue("replica request processed twice on [" + replica + "]", request.processedOnReplicas.add(replica));
if (opFailures.containsKey(replica)) {

View File

@ -625,6 +625,7 @@ public class TransportReplicationActionTests extends ESTestCase {
routingState == ShardRoutingState.RELOCATING ? state.nodes().iterator().next().getId() : null, false, routingState),
new Request(),
randomNonNegativeLong(),
randomNonNegativeLong(),
listener);
assertTrue(listener.isDone());
assertListenerThrows("non existent node should throw a NoNodeAvailableException", listener, NoNodeAvailableException.class);
@ -633,7 +634,7 @@ public class TransportReplicationActionTests extends ESTestCase {
final ShardRouting replica = randomFrom(shardRoutings.replicaShards().stream()
.filter(ShardRouting::assignedToNode).collect(Collectors.toList()));
listener = new PlainActionFuture<>();
proxy.performOn(replica, new Request(), randomNonNegativeLong(), listener);
proxy.performOn(replica, new Request(), randomNonNegativeLong(), randomNonNegativeLong(), listener);
assertFalse(listener.isDone());
CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear();
@ -805,7 +806,7 @@ public class TransportReplicationActionTests extends ESTestCase {
replicaOperationTransportHandler.messageReceived(
new TransportReplicationAction.ConcreteReplicaRequest<>(
new Request().setShardId(shardId), replicaRouting.allocationId().getId(), randomNonNegativeLong(),
randomNonNegativeLong()),
randomNonNegativeLong(), randomNonNegativeLong()),
createTransportChannel(new PlainActionFuture<>()), task);
} catch (ElasticsearchException e) {
assertThat(e.getMessage(), containsString("simulated"));
@ -895,7 +896,7 @@ public class TransportReplicationActionTests extends ESTestCase {
Request request = new Request(shardId).timeout("1ms");
action.new ReplicaOperationTransportHandler().messageReceived(
new TransportReplicationAction.ConcreteReplicaRequest<>(request, "_not_a_valid_aid_", randomNonNegativeLong(),
randomNonNegativeLong()),
randomNonNegativeLong(), randomNonNegativeLong()),
createTransportChannel(listener), maybeTask()
);
try {
@ -939,8 +940,10 @@ public class TransportReplicationActionTests extends ESTestCase {
final PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
final Request request = new Request().setShardId(shardId);
final long checkpoint = randomNonNegativeLong();
final long maxSeqNoOfUpdatesOrDeletes = randomNonNegativeLong();
replicaOperationTransportHandler.messageReceived(
new TransportReplicationAction.ConcreteReplicaRequest<>(request, replica.allocationId().getId(), primaryTerm, checkpoint),
new TransportReplicationAction.ConcreteReplicaRequest<>(request, replica.allocationId().getId(),
primaryTerm, checkpoint, maxSeqNoOfUpdatesOrDeletes),
createTransportChannel(listener), task);
if (listener.isDone()) {
listener.get(); // fail with the exception if there
@ -964,6 +967,8 @@ public class TransportReplicationActionTests extends ESTestCase {
assertThat(capturedRequest.request, instanceOf(TransportReplicationAction.ConcreteReplicaRequest.class));
assertThat(((TransportReplicationAction.ConcreteReplicaRequest) capturedRequest.request).getGlobalCheckpoint(),
equalTo(checkpoint));
assertThat(((TransportReplicationAction.ConcreteReplicaRequest) capturedRequest.request).getMaxSeqNoOfUpdatesOrDeletes(),
equalTo(maxSeqNoOfUpdatesOrDeletes));
assertConcreteShardRequest(capturedRequest.request, request, replica.allocationId());
}
@ -1004,8 +1009,10 @@ public class TransportReplicationActionTests extends ESTestCase {
final PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
final Request request = new Request().setShardId(shardId);
final long checkpoint = randomNonNegativeLong();
final long maxSeqNoOfUpdates = randomNonNegativeLong();
replicaOperationTransportHandler.messageReceived(
new TransportReplicationAction.ConcreteReplicaRequest<>(request, replica.allocationId().getId(), primaryTerm, checkpoint),
new TransportReplicationAction.ConcreteReplicaRequest<>(request, replica.allocationId().getId(),
primaryTerm, checkpoint, maxSeqNoOfUpdates),
createTransportChannel(listener), task);
if (listener.isDone()) {
listener.get(); // fail with the exception if there
@ -1198,7 +1205,7 @@ public class TransportReplicationActionTests extends ESTestCase {
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject());
doAnswer(invocation -> {
long term = (Long)invocation.getArguments()[0];
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[2];
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[3];
final long primaryTerm = indexShard.getPendingPrimaryTerm();
if (term < primaryTerm) {
throw new IllegalArgumentException(String.format(Locale.ROOT, "%s operation term [%d] is too old (current [%d])",
@ -1207,7 +1214,8 @@ public class TransportReplicationActionTests extends ESTestCase {
count.incrementAndGet();
callback.onResponse(count::decrementAndGet);
return null;
}).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), any(ActionListener.class), anyString(), anyObject());
}).when(indexShard)
.acquireReplicaOperationPermit(anyLong(), anyLong(), anyLong(), any(ActionListener.class), anyString(), anyObject());
when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> {
final ClusterState state = clusterService.state();
final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId());

View File

@ -277,7 +277,7 @@ public class TransportWriteActionTests extends ESTestCase {
TestShardRouting.newShardRouting(shardId, "NOT THERE",
routingState == ShardRoutingState.RELOCATING ? state.nodes().iterator().next().getId() : null, false, routingState),
new TestRequest(),
randomNonNegativeLong(), listener);
randomNonNegativeLong(), randomNonNegativeLong(), listener);
assertTrue(listener.isDone());
assertListenerThrows("non existent node should throw a NoNodeAvailableException", listener, NoNodeAvailableException.class);
@ -285,7 +285,7 @@ public class TransportWriteActionTests extends ESTestCase {
final ShardRouting replica = randomFrom(shardRoutings.replicaShards().stream()
.filter(ShardRouting::assignedToNode).collect(Collectors.toList()));
listener = new PlainActionFuture<>();
proxy.performOn(replica, new TestRequest(), randomNonNegativeLong(), listener);
proxy.performOn(replica, new TestRequest(), randomNonNegativeLong(), randomNonNegativeLong(), listener);
assertFalse(listener.isDone());
CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear();
@ -462,7 +462,8 @@ public class TransportWriteActionTests extends ESTestCase {
count.incrementAndGet();
callback.onResponse(count::decrementAndGet);
return null;
}).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), any(ActionListener.class), anyString(), anyObject());
}).when(indexShard)
.acquireReplicaOperationPermit(anyLong(), anyLong(), anyLong(), any(ActionListener.class), anyString(), anyObject());
when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> {
final ClusterState state = clusterService.state();
final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId());

View File

@ -338,7 +338,8 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
barrier.await();
indexOnReplica(replicationRequest, shards, replica2, newReplica1Term);
} catch (IllegalStateException ise) {
assertThat(ise.getMessage(), either(containsString("is too old")).or(containsString("cannot be a replication target")));
assertThat(ise.getMessage(), either(containsString("is too old"))
.or(containsString("cannot be a replication target")).or(containsString("engine is closed")));
} catch (Exception e) {
throw new RuntimeException(e);
}

View File

@ -405,6 +405,10 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
assertThat(task.getResyncedOperations(), greaterThanOrEqualTo(extraDocs));
}
shards.assertAllEqual(initialDocs + extraDocs);
for (IndexShard replica : shards.getReplicas()) {
assertThat(replica.getMaxSeqNoOfUpdatesOrDeletes(),
greaterThanOrEqualTo(shards.getPrimary().getMaxSeqNoOfUpdatesOrDeletes()));
}
// check translog on replica is trimmed
int translogOperations = 0;
@ -490,9 +494,9 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
}) {
@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxSeenAutoIdTimestampOnPrimary) throws IOException {
long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdates) throws IOException {
opsSent.set(true);
return super.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestampOnPrimary);
return super.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdates);
}
};
});
@ -560,7 +564,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
(indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener, l -> {}) {
@Override
public long indexTranslogOperations(final List<Translog.Operation> operations, final int totalTranslogOps,
final long maxAutoIdTimestamp)
final long maxAutoIdTimestamp, long maxSeqNoOfUpdates)
throws IOException {
// index a doc which is not part of the snapshot, but also does not complete on replica
replicaEngineFactory.latchIndexers(1);
@ -588,7 +592,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
} catch (InterruptedException e) {
throw new AssertionError(e);
}
return super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp);
return super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates);
}
});
pendingDocActiveWithExtraDocIndexed.await();
@ -718,11 +722,11 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxAutoIdTimestamp) throws IOException {
long maxAutoIdTimestamp, long maxSeqNoOfUpdates) throws IOException {
if (hasBlocked() == false) {
blockIfNeeded(RecoveryState.Stage.TRANSLOG);
}
return super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp);
return super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates);
}
@Override

View File

@ -315,8 +315,8 @@ public class IndexShardTests extends IndexShardTestCase {
// expected
}
try {
indexShard.acquireReplicaOperationPermit(indexShard.getPendingPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, null,
ThreadPool.Names.WRITE, "");
indexShard.acquireReplicaOperationPermit(indexShard.getPendingPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO,
randomNonNegativeLong(), null, ThreadPool.Names.WRITE, "");
fail("we should not be able to increment anymore");
} catch (IndexShardClosedException e) {
// expected
@ -327,7 +327,7 @@ public class IndexShardTests extends IndexShardTestCase {
IndexShard indexShard = newShard(false);
expectThrows(IndexShardNotStartedException.class, () ->
indexShard.acquireReplicaOperationPermit(indexShard.getPendingPrimaryTerm() + randomIntBetween(1, 100),
SequenceNumbers.UNASSIGNED_SEQ_NO, null, ThreadPool.Names.WRITE, ""));
SequenceNumbers.UNASSIGNED_SEQ_NO, randomNonNegativeLong(), null, ThreadPool.Names.WRITE, ""));
closeShards(indexShard);
}
@ -351,6 +351,7 @@ public class IndexShardTests extends IndexShardTestCase {
indexShard.acquireReplicaOperationPermit(
indexShard.getPendingPrimaryTerm(),
indexShard.getGlobalCheckpoint(),
indexShard.getMaxSeqNoOfUpdatesOrDeletes(),
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
@ -602,7 +603,7 @@ public class IndexShardTests extends IndexShardTestCase {
if (Assertions.ENABLED && indexShard.routingEntry().isRelocationTarget() == false) {
assertThat(expectThrows(AssertionError.class, () -> indexShard.acquireReplicaOperationPermit(primaryTerm,
indexShard.getGlobalCheckpoint(), new ActionListener<Releasable>() {
indexShard.getGlobalCheckpoint(), indexShard.getMaxSeqNoOfUpdatesOrDeletes(), new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
fail();
@ -628,7 +629,8 @@ public class IndexShardTests extends IndexShardTestCase {
private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard, long opPrimaryTerm)
throws ExecutionException, InterruptedException {
PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
indexShard.acquireReplicaOperationPermit(opPrimaryTerm, indexShard.getGlobalCheckpoint(), fut, ThreadPool.Names.WRITE, "");
indexShard.acquireReplicaOperationPermit(opPrimaryTerm, indexShard.getGlobalCheckpoint(),
randomNonNegativeLong(), fut, ThreadPool.Names.WRITE, "");
return fut.get();
}
@ -712,8 +714,8 @@ public class IndexShardTests extends IndexShardTestCase {
}
};
indexShard.acquireReplicaOperationPermit(primaryTerm - 1, SequenceNumbers.UNASSIGNED_SEQ_NO, onLockAcquired,
ThreadPool.Names.WRITE, "");
indexShard.acquireReplicaOperationPermit(primaryTerm - 1, SequenceNumbers.UNASSIGNED_SEQ_NO,
randomNonNegativeLong(), onLockAcquired, ThreadPool.Names.WRITE, "");
assertFalse(onResponse.get());
assertTrue(onFailure.get());
@ -785,6 +787,7 @@ public class IndexShardTests extends IndexShardTestCase {
indexShard.acquireReplicaOperationPermit(
newPrimaryTerm,
newGlobalCheckPoint,
randomNonNegativeLong(),
listener,
ThreadPool.Names.SAME, "");
} catch (Exception e) {
@ -836,6 +839,22 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(indexShard);
}
public void testAcquireReplicaPermitAdvanceMaxSeqNoOfUpdates() throws Exception {
IndexShard replica = newStartedShard(false);
assertThat(replica.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
long currentMaxSeqNoOfUpdates = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
replica.advanceMaxSeqNoOfUpdatesOrDeletes(currentMaxSeqNoOfUpdates);
long newMaxSeqNoOfUpdates = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
replica.acquireReplicaOperationPermit(replica.operationPrimaryTerm, replica.getGlobalCheckpoint(),
newMaxSeqNoOfUpdates, fut, ThreadPool.Names.WRITE, "");
try (Releasable ignored = fut.actionGet()) {
assertThat(replica.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, newMaxSeqNoOfUpdates)));
}
closeShards(replica);
}
public void testGlobalCheckpointSync() throws IOException {
// create the primary shard with a callback that sets a boolean when the global checkpoint sync is invoked
final ShardId shardId = new ShardId("index", "_na_", 0);
@ -906,11 +925,14 @@ public class IndexShardTests extends IndexShardTestCase {
indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica, "test");
final long globalCheckpoint = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint());
final long currentMaxSeqNoOfUpdates = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
final long maxSeqNoOfUpdatesOrDeletes = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, maxSeqNo);
final Set<String> docsBeforeRollback = getShardDocUIDs(indexShard);
final CountDownLatch latch = new CountDownLatch(1);
indexShard.acquireReplicaOperationPermit(
indexShard.getPendingPrimaryTerm() + 1,
globalCheckpoint,
maxSeqNoOfUpdatesOrDeletes,
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
@ -926,6 +948,9 @@ public class IndexShardTests extends IndexShardTestCase {
ThreadPool.Names.SAME, "");
latch.await();
assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Collections.max(
Arrays.asList(maxSeqNoOfUpdatesOrDeletes, globalCheckpoint, globalCheckpointOnReplica))
));
final ShardRouting newRouting = indexShard.routingEntry().moveActiveReplicaToPrimary();
final CountDownLatch resyncLatch = new CountDownLatch(1);
@ -941,7 +966,9 @@ public class IndexShardTests extends IndexShardTestCase {
assertThat(indexShard.getLocalCheckpoint(), equalTo(maxSeqNo));
assertThat(indexShard.seqNoStats().getMaxSeqNo(), equalTo(maxSeqNo));
assertThat(getShardDocUIDs(indexShard), equalTo(docsBeforeRollback));
assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(maxSeqNo));
assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Collections.max(
Arrays.asList(currentMaxSeqNoOfUpdates, maxSeqNoOfUpdatesOrDeletes, globalCheckpoint, globalCheckpointOnReplica))
));
closeShard(indexShard, false);
}
@ -961,9 +988,11 @@ public class IndexShardTests extends IndexShardTestCase {
final boolean shouldRollback = Math.max(globalCheckpoint, globalCheckpointOnReplica) < indexShard.seqNoStats().getMaxSeqNo()
&& indexShard.seqNoStats().getMaxSeqNo() != SequenceNumbers.NO_OPS_PERFORMED;
final Engine beforeRollbackEngine = indexShard.getEngine();
final long newMaxSeqNoOfUpdates = randomLongBetween(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), Long.MAX_VALUE);
indexShard.acquireReplicaOperationPermit(
indexShard.pendingPrimaryTerm + 1,
globalCheckpoint,
newMaxSeqNoOfUpdates,
new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
@ -990,6 +1019,7 @@ public class IndexShardTests extends IndexShardTestCase {
} else {
assertThat(indexShard.getEngine(), sameInstance(beforeRollbackEngine));
}
assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(newMaxSeqNoOfUpdates));
// ensure that after the local checkpoint throw back and indexing again, the local checkpoint advances
final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint()));
assertThat(indexShard.getLocalCheckpoint(), equalTo((long) result.localCheckpoint));
@ -1016,6 +1046,7 @@ public class IndexShardTests extends IndexShardTestCase {
indexShard.acquireReplicaOperationPermit(
primaryTerm + increment,
indexShard.getGlobalCheckpoint(),
randomNonNegativeLong(),
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
@ -1628,6 +1659,7 @@ public class IndexShardTests extends IndexShardTestCase {
* - If flush and then recover from the existing store, delete #1 will be removed while index #0 is still retained and replayed.
*/
final IndexShard shard = newStartedShard(false);
shard.advanceMaxSeqNoOfUpdatesOrDeletes(1); // manually advance msu for this delete
shard.applyDeleteOperationOnReplica(1, 2, "_doc", "id");
shard.getEngine().rollTranslogGeneration(); // isolate the delete in it's own generation
shard.applyIndexOperationOnReplica(0, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
@ -2193,9 +2225,10 @@ public class IndexShardTests extends IndexShardTestCase {
new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> {
}) {
@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxSeenAutoIdTimestamp) throws IOException {
final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp);
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestamp,
long maxSeqNoOfUpdatesOrDeletes) throws IOException {
final long localCheckpoint = super.indexTranslogOperations(
operations, totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
assertFalse(replica.isSyncNeeded());
return localCheckpoint;
}
@ -2302,8 +2335,9 @@ public class IndexShardTests extends IndexShardTestCase {
}) {
@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxAutoIdTimestamp) throws IOException {
final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp);
long maxAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes) throws IOException {
final long localCheckpoint = super.indexTranslogOperations(
operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
// Shard should now be active since we did recover:
assertTrue(replica.isActive());
return localCheckpoint;
@ -2350,8 +2384,9 @@ public class IndexShardTests extends IndexShardTestCase {
@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxAutoIdTimestamp) throws IOException {
final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp);
long maxAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes) throws IOException {
final long localCheckpoint = super.indexTranslogOperations(
operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
assertListenerCalled.accept(replica);
return localCheckpoint;
}
@ -3434,6 +3469,7 @@ public class IndexShardTests extends IndexShardTestCase {
assertThat(getShardDocUIDs(shard), equalTo(docBelowGlobalCheckpoint));
assertThat(shard.seqNoStats().getMaxSeqNo(), equalTo(globalCheckpoint));
assertThat(shard.translogStats().estimatedNumberOfOperations(), equalTo(translogStats.estimatedNumberOfOperations()));
assertThat(shard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(globalCheckpoint));
closeShard(shard, false);
}

View File

@ -207,11 +207,12 @@ public class RecoverySourceHandlerTests extends ESTestCase {
public Translog.Operation next() throws IOException {
return operations.get(counter++);
}
}, randomNonNegativeLong());
}, randomNonNegativeLong(), randomNonNegativeLong());
final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1);
assertThat(result.totalOperations, equalTo(expectedOps));
final ArgumentCaptor<List> shippedOpsCaptor = ArgumentCaptor.forClass(List.class);
verify(recoveryTarget).indexTranslogOperations(shippedOpsCaptor.capture(), ArgumentCaptor.forClass(Integer.class).capture(), ArgumentCaptor.forClass(Long.class).capture());
verify(recoveryTarget).indexTranslogOperations(shippedOpsCaptor.capture(), ArgumentCaptor.forClass(Integer.class).capture(),
ArgumentCaptor.forClass(Long.class).capture(), ArgumentCaptor.forClass(Long.class).capture());
List<Translog.Operation> shippedOps = new ArrayList<>();
for (List list: shippedOpsCaptor.getAllValues()) {
shippedOps.addAll(list);
@ -249,7 +250,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
} while (op != null && opsToSkip.contains(op));
return op;
}
}, randomNonNegativeLong()));
}, randomNonNegativeLong(), randomNonNegativeLong()));
}
}
@ -421,7 +422,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
@Override
long phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot,
long maxSeenAutoIdTimestamp) {
long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes) {
phase2Called.set(true);
return SequenceNumbers.UNASSIGNED_SEQ_NO;
}

View File

@ -127,6 +127,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
final String indexName = orgReplica.shardId().getIndexName();
// delete #1
orgReplica.advanceMaxSeqNoOfUpdatesOrDeletes(1); // manually advance msu for this delete
orgReplica.applyDeleteOperationOnReplica(1, 2, "type", "id");
getTranslog(orgReplica).rollGeneration(); // isolate the delete in it's own generation
// index #0
@ -190,6 +191,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
final String indexName = orgReplica.shardId().getIndexName();
// delete #1
orgReplica.advanceMaxSeqNoOfUpdatesOrDeletes(1); // manually advance msu for this delete
orgReplica.applyDeleteOperationOnReplica(1, 2, "type", "id");
orgReplica.flush(new FlushRequest().force(true)); // isolate delete#1 in its own translog generation and lucene segment
// index #0

View File

@ -503,7 +503,7 @@ public abstract class EngineTestCase extends ESTestCase {
@Nullable final ToLongBiFunction<Engine, Engine.Operation> seqNoForOperation,
final EngineConfig config) {
if (localCheckpointTrackerSupplier == null) {
return new InternalEngine(config) {
return new InternalTestEngine(config) {
@Override
IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException {
return (indexWriterFactory != null) ?
@ -519,7 +519,7 @@ public abstract class EngineTestCase extends ESTestCase {
}
};
} else {
return new InternalEngine(config, localCheckpointTrackerSupplier) {
return new InternalTestEngine(config, localCheckpointTrackerSupplier) {
@Override
IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException {
return (indexWriterFactory != null) ?

View File

@ -0,0 +1,74 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.SequenceNumbers;
import java.io.IOException;
import java.util.Map;
import java.util.function.BiFunction;
/**
* An alternative of {@link InternalEngine} that allows tweaking internals to reduce noise in engine tests.
*/
class InternalTestEngine extends InternalEngine {
private final Map<String, Long> idToMaxSeqNo = ConcurrentCollections.newConcurrentMap();
InternalTestEngine(EngineConfig engineConfig) {
super(engineConfig);
}
InternalTestEngine(EngineConfig engineConfig, BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
super(engineConfig, localCheckpointTrackerSupplier);
}
@Override
public IndexResult index(Index index) throws IOException {
if (index.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
idToMaxSeqNo.compute(index.id(), (id, existing) -> {
if (existing == null) {
return index.seqNo();
} else {
long maxSeqNo = Math.max(index.seqNo(), existing);
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNo);
return maxSeqNo;
}
});
}
return super.index(index);
}
@Override
public DeleteResult delete(Delete delete) throws IOException {
if (delete.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
final long maxSeqNo = idToMaxSeqNo.compute(delete.id(), (id, existing) -> {
if (existing == null) {
return delete.seqNo();
} else {
return Math.max(delete.seqNo(), existing);
}
});
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNo);
}
return super.delete(delete);
}
}

View File

@ -97,6 +97,7 @@ import java.util.stream.StreamSupport;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase {
@ -444,6 +445,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
for (IndexShard replica : replicas) {
try {
assertThat(replica.getMaxSeenAutoIdTimestamp(), equalTo(primary.getMaxSeenAutoIdTimestamp()));
assertThat(replica.getMaxSeqNoOfUpdatesOrDeletes(), greaterThanOrEqualTo(primary.getMaxSeqNoOfUpdatesOrDeletes()));
} catch (AlreadyClosedException ignored) {
}
}
@ -563,6 +565,11 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
return replicationGroup.getPrimary().getGlobalCheckpoint();
}
@Override
public long maxSeqNoOfUpdatesOrDeletes() {
return replicationGroup.getPrimary().getMaxSeqNoOfUpdatesOrDeletes();
}
@Override
public org.elasticsearch.index.shard.ReplicationGroup getReplicationGroup() {
return replicationGroup.primary.getReplicationGroup();
@ -577,12 +584,14 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
final ShardRouting replicaRouting,
final ReplicaRequest request,
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
IndexShard replica = replicationGroup.replicas.stream()
.filter(s -> replicaRouting.isSameAllocation(s.routingEntry())).findFirst().get();
replica.acquireReplicaOperationPermit(
replicationGroup.primary.getPendingPrimaryTerm(),
globalCheckpoint,
maxSeqNoOfUpdatesOrDeletes,
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
@ -659,7 +668,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
@Override
protected void performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
executeShardBulkOnReplica(request, replica, getPrimaryShard().getPendingPrimaryTerm(), getPrimaryShard().getGlobalCheckpoint());
executeShardBulkOnReplica(request, replica, getPrimaryShard().getPendingPrimaryTerm(),
getPrimaryShard().getGlobalCheckpoint(), getPrimaryShard().getMaxSeqNoOfUpdatesOrDeletes());
}
}
@ -690,10 +700,10 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
}
private void executeShardBulkOnReplica(BulkShardRequest request, IndexShard replica, long operationPrimaryTerm,
long globalCheckpointOnPrimary) throws Exception {
long globalCheckpointOnPrimary, long maxSeqNoOfUpdatesOrDeletes) throws Exception {
final PlainActionFuture<Releasable> permitAcquiredFuture = new PlainActionFuture<>();
replica.acquireReplicaOperationPermit(
operationPrimaryTerm, globalCheckpointOnPrimary, permitAcquiredFuture, ThreadPool.Names.SAME, request);
replica.acquireReplicaOperationPermit(operationPrimaryTerm, globalCheckpointOnPrimary,
maxSeqNoOfUpdatesOrDeletes, permitAcquiredFuture, ThreadPool.Names.SAME, request);
final Translog.Location location;
try (Releasable ignored = permitAcquiredFuture.actionGet()) {
location = TransportShardBulkAction.performOnReplica(request, replica);
@ -723,14 +733,16 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
}
void indexOnReplica(BulkShardRequest request, ReplicationGroup group, IndexShard replica, long term) throws Exception {
executeShardBulkOnReplica(request, replica, term, group.primary.getGlobalCheckpoint());
executeShardBulkOnReplica(request, replica, term,
group.primary.getGlobalCheckpoint(), group.primary.getMaxSeqNoOfUpdatesOrDeletes());
}
/**
* Executes the delete request on the given replica shard.
*/
void deleteOnReplica(BulkShardRequest request, ReplicationGroup group, IndexShard replica) throws Exception {
executeShardBulkOnReplica(request, replica, group.primary.getPendingPrimaryTerm(), group.primary.getGlobalCheckpoint());
executeShardBulkOnReplica(request, replica, group.primary.getPendingPrimaryTerm(),
group.primary.getGlobalCheckpoint(), group.primary.getMaxSeqNoOfUpdatesOrDeletes());
}
class GlobalCheckpointSync extends ReplicationAction<
@ -774,7 +786,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
@Override
protected void performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
executeResyncOnReplica(replica, request, getPrimaryShard().getPendingPrimaryTerm(), getPrimaryShard().getGlobalCheckpoint());
executeResyncOnReplica(replica, request, getPrimaryShard().getPendingPrimaryTerm(),
getPrimaryShard().getGlobalCheckpoint(), getPrimaryShard().getMaxSeqNoOfUpdatesOrDeletes());
}
}
@ -787,12 +800,12 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
return result;
}
private void executeResyncOnReplica(IndexShard replica, ResyncReplicationRequest request,
long operationPrimaryTerm, long globalCheckpointOnPrimary) throws Exception {
private void executeResyncOnReplica(IndexShard replica, ResyncReplicationRequest request, long operationPrimaryTerm,
long globalCheckpointOnPrimary, long maxSeqNoOfUpdatesOrDeletes) throws Exception {
final Translog.Location location;
final PlainActionFuture<Releasable> acquirePermitFuture = new PlainActionFuture<>();
replica.acquireReplicaOperationPermit(
operationPrimaryTerm, globalCheckpointOnPrimary, acquirePermitFuture, ThreadPool.Names.SAME, request);
replica.acquireReplicaOperationPermit(operationPrimaryTerm, globalCheckpointOnPrimary,
maxSeqNoOfUpdatesOrDeletes, acquirePermitFuture, ThreadPool.Names.SAME, request);
try (Releasable ignored = acquirePermitFuture.actionGet()) {
location = TransportResyncReplicationAction.performOnReplica(request, replica);
}

View File

@ -699,8 +699,9 @@ public abstract class IndexShardTestCase extends ESTestCase {
shard.updateLocalCheckpointForShard(shard.routingEntry().allocationId().getId(),
shard.getLocalCheckpoint());
} else {
result = shard.applyIndexOperationOnReplica(shard.seqNoStats().getMaxSeqNo() + 1, 0,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse);
final long seqNo = shard.seqNoStats().getMaxSeqNo() + 1;
shard.advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); // manually replicate max_seq_no_of_updates
result = shard.applyIndexOperationOnReplica(seqNo, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse);
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
throw new TransportReplicationAction.RetryOnReplicaException(shard.shardId,
"Mappings are not available on the replica yet, triggered update: " + result.getRequiredMappingUpdate());
@ -720,7 +721,9 @@ public abstract class IndexShardTestCase extends ESTestCase {
result = shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, type, id, VersionType.INTERNAL);
shard.updateLocalCheckpointForShard(shard.routingEntry().allocationId().getId(), shard.getEngine().getLocalCheckpoint());
} else {
result = shard.applyDeleteOperationOnReplica(shard.seqNoStats().getMaxSeqNo() + 1, 0L, type, id);
final long seqNo = shard.seqNoStats().getMaxSeqNo() + 1;
shard.advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); // manually replicate max_seq_no_of_updates
result = shard.applyDeleteOperationOnReplica(seqNo, 0L, type, id);
}
return result;
}

View File

@ -103,6 +103,8 @@ public class TransportBulkShardOperationsAction
}
return operationWithPrimaryTerm;
}).collect(Collectors.toList());
// TODO: Replace this artificial value by the actual max_seq_no_updates from the leader
targetOperations.stream().mapToLong(Translog.Operation::seqNo).max().ifPresent(primary::advanceMaxSeqNoOfUpdatesOrDeletes);
final Translog.Location location = applyTranslogOperations(targetOperations, primary, Engine.Operation.Origin.PRIMARY);
final BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest(shardId, targetOperations);
return new CcrWritePrimaryResult(replicaRequest, location, primary, logger);

View File

@ -127,6 +127,7 @@ public class FollowingEngineTests extends ESTestCase {
final VersionType versionType =
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE, VersionType.FORCE);
final List<Engine.Operation> ops = EngineTestCase.generateSingleDocHistory(true, versionType, 2, 2, 20, "id");
ops.stream().mapToLong(op -> op.seqNo()).max().ifPresent(followingEngine::advanceMaxSeqNoOfUpdatesOrDeletes);
EngineTestCase.assertOpsOnReplica(ops, followingEngine, true, logger);
}
}
@ -160,6 +161,7 @@ public class FollowingEngineTests extends ESTestCase {
seqNo,
Engine.Operation.Origin.PRIMARY,
(followingEngine, delete) -> {
followingEngine.advanceMaxSeqNoOfUpdatesOrDeletes(randomLongBetween(seqNo, Long.MAX_VALUE));
final Engine.DeleteResult result = followingEngine.delete(delete);
assertThat(result.getSeqNo(), equalTo(seqNo));
});