CCR: Translog op on primary should have versionType

Normally translog operations will not be replayed on the primary.
Following engine is an exception where we replay translog on both
primary and replica as a non-primary strategy.  Even though we won't use
the version_type in the following engine, we still need to pass a valid
value for the primary operation in order not to trip assertions in an
engine.

This commit passes version_type EXTERNAL for translog operation if its
origin is primary.

Relates #31945
This commit is contained in:
Nhat Nguyen 2018-07-20 08:39:32 -04:00
parent a6b7497fdc
commit fe574f89f8
2 changed files with 6 additions and 2 deletions

View File

@ -1214,6 +1214,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine.Operation.Origin origin) throws IOException {
// If a translog op is replayed on the primary (eg. ccr), we need to use external instead of null for its version type.
final VersionType versionType = (origin == Engine.Operation.Origin.PRIMARY) ? VersionType.EXTERNAL : null;
final Engine.Result result;
switch (operation.opType()) {
case INDEX:
@ -1221,14 +1223,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
// we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all
// autoGeneratedID docs that are coming from the primary are updated correctly.
result = applyIndexOperation(index.seqNo(), index.primaryTerm(), index.version(),
null, index.getAutoGeneratedIdTimestamp(), true, origin,
versionType, index.getAutoGeneratedIdTimestamp(), true, origin,
source(shardId.getIndexName(), index.type(), index.id(), index.source(),
XContentHelper.xContentType(index.source())).routing(index.routing()));
break;
case DELETE:
final Translog.Delete delete = (Translog.Delete) operation;
result = applyDeleteOperation(delete.seqNo(), delete.primaryTerm(), delete.version(), delete.type(), delete.id(),
null, origin);
versionType, origin);
break;
case NO_OP:
final Translog.NoOp noOp = (Translog.NoOp) operation;

View File

@ -44,6 +44,8 @@ public final class FollowingEngine extends InternalEngine {
if (operation.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
throw new IllegalStateException("a following engine does not accept operations without an assigned sequence number");
}
assert (operation.origin() == Operation.Origin.PRIMARY) == (operation.versionType() == VersionType.EXTERNAL) :
"invalid version_type in a following engine; version_type=" + operation.versionType() + "origin=" + operation.origin();
}
@Override