Remove versionType from translog (#31945)
With the introduction of sequence number, we no longer use versionType to resolve out of order collision in replication and recovery requests. This PR removes removes the versionType from translog. We can only remove it in 7.0 because it is still required in a mixed cluster between 6.x and 5.x.
This commit is contained in:
parent
351bbb8906
commit
df1380b8d3
|
@ -523,13 +523,12 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
indexRequest.type(), indexRequest.id(), indexRequest.source(), indexRequest.getContentType())
|
||||
.routing(indexRequest.routing());
|
||||
result = replica.applyIndexOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(),
|
||||
indexRequest.versionType().versionTypeForReplicationAndRecovery(), indexRequest.getAutoGeneratedTimestamp(),
|
||||
indexRequest.isRetry(), sourceToParse);
|
||||
indexRequest.getAutoGeneratedTimestamp(), indexRequest.isRetry(), sourceToParse);
|
||||
break;
|
||||
case DELETE:
|
||||
DeleteRequest deleteRequest = (DeleteRequest) docWriteRequest;
|
||||
result = replica.applyDeleteOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(),
|
||||
deleteRequest.type(), deleteRequest.id(), deleteRequest.versionType().versionTypeForReplicationAndRecovery());
|
||||
deleteRequest.type(), deleteRequest.id());
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("Unexpected request operation type on replica: "
|
||||
|
|
|
@ -85,13 +85,6 @@ public enum VersionType implements Writeable {
|
|||
// not allowing Versions.NOT_FOUND as it is not a valid input value.
|
||||
return version > 0L || version == Versions.MATCH_ANY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public VersionType versionTypeForReplicationAndRecovery() {
|
||||
// replicas get the version from the primary after increment. The same version is stored in
|
||||
// the transaction log. -> the should use the external semantics.
|
||||
return EXTERNAL;
|
||||
}
|
||||
},
|
||||
EXTERNAL((byte) 1) {
|
||||
@Override
|
||||
|
@ -333,14 +326,6 @@ public enum VersionType implements Writeable {
|
|||
*/
|
||||
public abstract boolean validateVersionForReads(long version);
|
||||
|
||||
/**
|
||||
* Some version types require different semantics for primary and replicas. This version allows
|
||||
* the type to override the default behavior.
|
||||
*/
|
||||
public VersionType versionTypeForReplicationAndRecovery() {
|
||||
return this;
|
||||
}
|
||||
|
||||
public static VersionType fromString(String versionType) {
|
||||
if ("internal".equals(versionType)) {
|
||||
return INTERNAL;
|
||||
|
|
|
@ -1168,6 +1168,7 @@ public abstract class Engine implements Closeable {
|
|||
public Index(Term uid, ParsedDocument doc, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin,
|
||||
long startTime, long autoGeneratedIdTimestamp, boolean isRetry) {
|
||||
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
|
||||
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
|
||||
this.doc = doc;
|
||||
this.isRetry = isRetry;
|
||||
this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
|
||||
|
@ -1245,6 +1246,7 @@ public abstract class Engine implements Closeable {
|
|||
public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType,
|
||||
Origin origin, long startTime) {
|
||||
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
|
||||
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
|
||||
this.type = Objects.requireNonNull(type);
|
||||
this.id = Objects.requireNonNull(id);
|
||||
}
|
||||
|
|
|
@ -691,7 +691,7 @@ public class InternalEngine extends Engine {
|
|||
return true;
|
||||
case PEER_RECOVERY:
|
||||
case REPLICA:
|
||||
assert index.version() == 1 && index.versionType() == VersionType.EXTERNAL
|
||||
assert index.version() == 1 && index.versionType() == null
|
||||
: "version: " + index.version() + " type: " + index.versionType();
|
||||
return true;
|
||||
case LOCAL_TRANSLOG_RECOVERY:
|
||||
|
@ -704,20 +704,6 @@ public class InternalEngine extends Engine {
|
|||
return false;
|
||||
}
|
||||
|
||||
private boolean assertVersionType(final Engine.Operation operation) {
|
||||
if (operation.origin() == Operation.Origin.REPLICA ||
|
||||
operation.origin() == Operation.Origin.PEER_RECOVERY ||
|
||||
operation.origin() == Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
|
||||
// ensure that replica operation has expected version type for replication
|
||||
// ensure that versionTypeForReplicationAndRecovery is idempotent
|
||||
assert operation.versionType() == operation.versionType().versionTypeForReplicationAndRecovery()
|
||||
: "unexpected version type in request from [" + operation.origin().name() + "] " +
|
||||
"found [" + operation.versionType().name() + "] " +
|
||||
"expected [" + operation.versionType().versionTypeForReplicationAndRecovery().name() + "]";
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) {
|
||||
if (origin == Operation.Origin.PRIMARY) {
|
||||
assert assertOriginPrimarySequenceNumber(seqNo);
|
||||
|
@ -757,7 +743,6 @@ public class InternalEngine extends Engine {
|
|||
try (ReleasableLock releasableLock = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
|
||||
assert assertVersionType(index);
|
||||
try (Releasable ignored = versionMap.acquireLock(index.uid().bytes());
|
||||
Releasable indexThrottle = doThrottle ? () -> {} : throttle.acquireThrottle()) {
|
||||
lastWriteNanos = index.startTime();
|
||||
|
@ -860,9 +845,6 @@ public class InternalEngine extends Engine {
|
|||
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of index [" + index.seqNo() + "]";
|
||||
}
|
||||
versionMap.enforceSafeAccess();
|
||||
// drop out of order operations
|
||||
assert index.versionType().versionTypeForReplicationAndRecovery() == index.versionType() :
|
||||
"resolving out of order delivery based on versioning but version type isn't fit for it. got [" + index.versionType() + "]";
|
||||
// unlike the primary, replicas don't really care to about creation status of documents
|
||||
// this allows to ignore the case where a document was found in the live version maps in
|
||||
// a delete state and return false for the created flag in favor of code simplicity
|
||||
|
@ -1096,7 +1078,6 @@ public class InternalEngine extends Engine {
|
|||
public DeleteResult delete(Delete delete) throws IOException {
|
||||
versionMap.enforceSafeAccess();
|
||||
assert Objects.equals(delete.uid().field(), IdFieldMapper.NAME) : delete.uid().field();
|
||||
assert assertVersionType(delete);
|
||||
assert assertIncomingSequenceNumber(delete.origin(), delete.seqNo());
|
||||
final DeleteResult deleteResult;
|
||||
// NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments:
|
||||
|
@ -1149,10 +1130,6 @@ public class InternalEngine extends Engine {
|
|||
|
||||
private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException {
|
||||
assert delete.origin() != Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
|
||||
// drop out of order operations
|
||||
assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() :
|
||||
"resolving out of order delivery based on versioning but version type isn't fit for it. got ["
|
||||
+ delete.versionType() + "]";
|
||||
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(delete.seqNo(), curr));
|
||||
assert maxSeqNoOfNonAppendOnlyOperations.get() >= delete.seqNo() : "max_seqno of non-append-only was not updated;" +
|
||||
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of delete [" + delete.seqNo() + "]";
|
||||
|
|
|
@ -645,22 +645,22 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
|
||||
public Engine.IndexResult applyIndexOperationOnPrimary(long version, VersionType versionType, SourceToParse sourceToParse,
|
||||
long autoGeneratedTimestamp, boolean isRetry) throws IOException {
|
||||
assert versionType.validateVersionForWrites(version);
|
||||
return applyIndexOperation(SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, version, versionType, autoGeneratedTimestamp,
|
||||
isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse);
|
||||
}
|
||||
|
||||
public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, VersionType versionType,
|
||||
long autoGeneratedTimeStamp, boolean isRetry, SourceToParse sourceToParse)
|
||||
public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, long autoGeneratedTimeStamp,
|
||||
boolean isRetry, SourceToParse sourceToParse)
|
||||
throws IOException {
|
||||
return applyIndexOperation(seqNo, primaryTerm, version, versionType, autoGeneratedTimeStamp, isRetry,
|
||||
return applyIndexOperation(seqNo, primaryTerm, version, null, autoGeneratedTimeStamp, isRetry,
|
||||
Engine.Operation.Origin.REPLICA, sourceToParse);
|
||||
}
|
||||
|
||||
private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, long version, VersionType versionType,
|
||||
private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, long version, @Nullable VersionType versionType,
|
||||
long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin,
|
||||
SourceToParse sourceToParse) throws IOException {
|
||||
assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]";
|
||||
assert versionType.validateVersionForWrites(version);
|
||||
ensureWriteAllowed(origin);
|
||||
Engine.Index operation;
|
||||
try {
|
||||
|
@ -736,19 +736,18 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
|
||||
public Engine.DeleteResult applyDeleteOperationOnPrimary(long version, String type, String id, VersionType versionType)
|
||||
throws IOException {
|
||||
assert versionType.validateVersionForWrites(version);
|
||||
return applyDeleteOperation(SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, version, type, id, versionType,
|
||||
Engine.Operation.Origin.PRIMARY);
|
||||
}
|
||||
|
||||
public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id,
|
||||
VersionType versionType) throws IOException {
|
||||
return applyDeleteOperation(seqNo, primaryTerm, version, type, id, versionType, Engine.Operation.Origin.REPLICA);
|
||||
public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id) throws IOException {
|
||||
return applyDeleteOperation(seqNo, primaryTerm, version, type, id, null, Engine.Operation.Origin.REPLICA);
|
||||
}
|
||||
|
||||
private Engine.DeleteResult applyDeleteOperation(long seqNo, long opPrimaryTerm, long version, String type, String id,
|
||||
VersionType versionType, Engine.Operation.Origin origin) throws IOException {
|
||||
@Nullable VersionType versionType, Engine.Operation.Origin origin) throws IOException {
|
||||
assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]";
|
||||
assert versionType.validateVersionForWrites(version);
|
||||
ensureWriteAllowed(origin);
|
||||
// When there is a single type, the unique identifier is only composed of the _id,
|
||||
// so there is no way to differenciate foo#1 from bar#1. This is especially an issue
|
||||
|
@ -1211,14 +1210,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
// we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all
|
||||
// autoGeneratedID docs that are coming from the primary are updated correctly.
|
||||
result = applyIndexOperation(index.seqNo(), index.primaryTerm(), index.version(),
|
||||
index.versionType().versionTypeForReplicationAndRecovery(), index.getAutoGeneratedIdTimestamp(), true, origin,
|
||||
null, index.getAutoGeneratedIdTimestamp(), true, origin,
|
||||
source(shardId.getIndexName(), index.type(), index.id(), index.source(),
|
||||
XContentHelper.xContentType(index.source())).routing(index.routing()));
|
||||
break;
|
||||
case DELETE:
|
||||
final Translog.Delete delete = (Translog.Delete) operation;
|
||||
result = applyDeleteOperation(delete.seqNo(), delete.primaryTerm(), delete.version(), delete.type(), delete.id(),
|
||||
delete.versionType().versionTypeForReplicationAndRecovery(), origin);
|
||||
null, origin);
|
||||
break;
|
||||
case NO_OP:
|
||||
final Translog.NoOp noOp = (Translog.NoOp) operation;
|
||||
|
|
|
@ -1011,7 +1011,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
|
||||
public static final int FORMAT_6_0 = 8; // since 6.0.0
|
||||
public static final int FORMAT_NO_PARENT = FORMAT_6_0 + 1; // since 7.0
|
||||
public static final int SERIALIZATION_FORMAT = FORMAT_NO_PARENT;
|
||||
public static final int FORMAT_NO_VERSION_TYPE = FORMAT_NO_PARENT + 1;
|
||||
public static final int SERIALIZATION_FORMAT = FORMAT_NO_VERSION_TYPE;
|
||||
|
||||
private final String id;
|
||||
private final long autoGeneratedIdTimestamp;
|
||||
|
@ -1019,7 +1020,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
private final long seqNo;
|
||||
private final long primaryTerm;
|
||||
private final long version;
|
||||
private final VersionType versionType;
|
||||
private final BytesReference source;
|
||||
private final String routing;
|
||||
|
||||
|
@ -1034,8 +1034,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
in.readOptionalString(); // _parent
|
||||
}
|
||||
this.version = in.readLong();
|
||||
this.versionType = VersionType.fromValue(in.readByte());
|
||||
assert versionType.validateVersionForWrites(this.version) : "invalid version for writes: " + this.version;
|
||||
if (format < FORMAT_NO_VERSION_TYPE) {
|
||||
in.readByte(); // _version_type
|
||||
}
|
||||
this.autoGeneratedIdTimestamp = in.readLong();
|
||||
seqNo = in.readLong();
|
||||
primaryTerm = in.readLong();
|
||||
|
@ -1049,15 +1050,14 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
this.seqNo = indexResult.getSeqNo();
|
||||
this.primaryTerm = index.primaryTerm();
|
||||
this.version = indexResult.getVersion();
|
||||
this.versionType = index.versionType();
|
||||
this.autoGeneratedIdTimestamp = index.getAutoGeneratedIdTimestamp();
|
||||
}
|
||||
|
||||
public Index(String type, String id, long seqNo, long primaryTerm, byte[] source) {
|
||||
this(type, id, seqNo, primaryTerm, Versions.MATCH_ANY, VersionType.INTERNAL, source, null, -1);
|
||||
this(type, id, seqNo, primaryTerm, Versions.MATCH_ANY, source, null, -1);
|
||||
}
|
||||
|
||||
public Index(String type, String id, long seqNo, long primaryTerm, long version, VersionType versionType,
|
||||
public Index(String type, String id, long seqNo, long primaryTerm, long version,
|
||||
byte[] source, String routing, long autoGeneratedIdTimestamp) {
|
||||
this.type = type;
|
||||
this.id = id;
|
||||
|
@ -1065,7 +1065,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
this.seqNo = seqNo;
|
||||
this.primaryTerm = primaryTerm;
|
||||
this.version = version;
|
||||
this.versionType = versionType;
|
||||
this.routing = routing;
|
||||
this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
|
||||
}
|
||||
|
@ -1110,24 +1109,22 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
return this.version;
|
||||
}
|
||||
|
||||
public VersionType versionType() {
|
||||
return versionType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Source getSource() {
|
||||
return new Source(source, routing);
|
||||
}
|
||||
|
||||
private void write(final StreamOutput out) throws IOException {
|
||||
out.writeVInt(SERIALIZATION_FORMAT);
|
||||
final int format = out.getVersion().onOrAfter(Version.V_7_0_0_alpha1) ? SERIALIZATION_FORMAT : FORMAT_6_0;
|
||||
out.writeVInt(format);
|
||||
out.writeString(id);
|
||||
out.writeString(type);
|
||||
out.writeBytesReference(source);
|
||||
out.writeOptionalString(routing);
|
||||
out.writeLong(version);
|
||||
|
||||
out.writeByte(versionType.getValue());
|
||||
if (format < FORMAT_NO_VERSION_TYPE) {
|
||||
out.writeByte(VersionType.EXTERNAL.getValue());
|
||||
}
|
||||
out.writeLong(autoGeneratedIdTimestamp);
|
||||
out.writeLong(seqNo);
|
||||
out.writeLong(primaryTerm);
|
||||
|
@ -1149,7 +1146,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
primaryTerm != index.primaryTerm ||
|
||||
id.equals(index.id) == false ||
|
||||
type.equals(index.type) == false ||
|
||||
versionType != index.versionType ||
|
||||
autoGeneratedIdTimestamp != index.autoGeneratedIdTimestamp ||
|
||||
source.equals(index.source) == false) {
|
||||
return false;
|
||||
|
@ -1168,7 +1164,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
result = 31 * result + Long.hashCode(seqNo);
|
||||
result = 31 * result + Long.hashCode(primaryTerm);
|
||||
result = 31 * result + Long.hashCode(version);
|
||||
result = 31 * result + versionType.hashCode();
|
||||
result = 31 * result + source.hashCode();
|
||||
result = 31 * result + (routing != null ? routing.hashCode() : 0);
|
||||
result = 31 * result + Long.hashCode(autoGeneratedIdTimestamp);
|
||||
|
@ -1194,14 +1189,15 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
public static class Delete implements Operation {
|
||||
|
||||
private static final int FORMAT_6_0 = 4; // 6.0 - *
|
||||
public static final int SERIALIZATION_FORMAT = FORMAT_6_0;
|
||||
public static final int FORMAT_NO_PARENT = FORMAT_6_0 + 1; // since 7.0
|
||||
public static final int FORMAT_NO_VERSION_TYPE = FORMAT_NO_PARENT + 1;
|
||||
public static final int SERIALIZATION_FORMAT = FORMAT_NO_VERSION_TYPE;
|
||||
|
||||
private final String type, id;
|
||||
private final Term uid;
|
||||
private final long seqNo;
|
||||
private final long primaryTerm;
|
||||
private final long version;
|
||||
private final VersionType versionType;
|
||||
|
||||
private Delete(final StreamInput in) throws IOException {
|
||||
final int format = in.readVInt();// SERIALIZATION_FORMAT
|
||||
|
@ -1210,29 +1206,29 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
id = in.readString();
|
||||
uid = new Term(in.readString(), in.readBytesRef());
|
||||
this.version = in.readLong();
|
||||
this.versionType = VersionType.fromValue(in.readByte());
|
||||
assert versionType.validateVersionForWrites(this.version);
|
||||
if (format < FORMAT_NO_VERSION_TYPE) {
|
||||
in.readByte(); // versionType
|
||||
}
|
||||
seqNo = in.readLong();
|
||||
primaryTerm = in.readLong();
|
||||
}
|
||||
|
||||
public Delete(Engine.Delete delete, Engine.DeleteResult deleteResult) {
|
||||
this(delete.type(), delete.id(), delete.uid(), deleteResult.getSeqNo(), delete.primaryTerm(), deleteResult.getVersion(), delete.versionType());
|
||||
this(delete.type(), delete.id(), delete.uid(), deleteResult.getSeqNo(), delete.primaryTerm(), deleteResult.getVersion());
|
||||
}
|
||||
|
||||
/** utility for testing */
|
||||
public Delete(String type, String id, long seqNo, long primaryTerm, Term uid) {
|
||||
this(type, id, uid, seqNo, primaryTerm, Versions.MATCH_ANY, VersionType.INTERNAL);
|
||||
this(type, id, uid, seqNo, primaryTerm, Versions.MATCH_ANY);
|
||||
}
|
||||
|
||||
public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType) {
|
||||
public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version) {
|
||||
this.type = Objects.requireNonNull(type);
|
||||
this.id = Objects.requireNonNull(id);
|
||||
this.uid = uid;
|
||||
this.seqNo = seqNo;
|
||||
this.primaryTerm = primaryTerm;
|
||||
this.version = version;
|
||||
this.versionType = versionType;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1271,23 +1267,22 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
return this.version;
|
||||
}
|
||||
|
||||
public VersionType versionType() {
|
||||
return this.versionType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Source getSource() {
|
||||
throw new IllegalStateException("trying to read doc source from delete operation");
|
||||
}
|
||||
|
||||
private void write(final StreamOutput out) throws IOException {
|
||||
out.writeVInt(SERIALIZATION_FORMAT);
|
||||
final int format = out.getVersion().onOrAfter(Version.V_7_0_0_alpha1) ? SERIALIZATION_FORMAT : FORMAT_6_0;
|
||||
out.writeVInt(format);
|
||||
out.writeString(type);
|
||||
out.writeString(id);
|
||||
out.writeString(uid.field());
|
||||
out.writeBytesRef(uid.bytes());
|
||||
out.writeLong(version);
|
||||
out.writeByte(versionType.getValue());
|
||||
if (format < FORMAT_NO_VERSION_TYPE) {
|
||||
out.writeByte(VersionType.EXTERNAL.getValue());
|
||||
}
|
||||
out.writeLong(seqNo);
|
||||
out.writeLong(primaryTerm);
|
||||
}
|
||||
|
@ -1306,8 +1301,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
return version == delete.version &&
|
||||
seqNo == delete.seqNo &&
|
||||
primaryTerm == delete.primaryTerm &&
|
||||
uid.equals(delete.uid) &&
|
||||
versionType == delete.versionType;
|
||||
uid.equals(delete.uid);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1316,7 +1310,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
result = 31 * result + Long.hashCode(seqNo);
|
||||
result = 31 * result + Long.hashCode(primaryTerm);
|
||||
result = 31 * result + Long.hashCode(version);
|
||||
result = 31 * result + versionType.hashCode();
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
|
@ -202,9 +202,11 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|||
if (previous.v1().equals(data) == false) {
|
||||
Translog.Operation newOp = Translog.readOperation(new BufferedChecksumStreamInput(data.streamInput()));
|
||||
Translog.Operation prvOp = Translog.readOperation(new BufferedChecksumStreamInput(previous.v1().streamInput()));
|
||||
throw new AssertionError(
|
||||
"seqNo [" + seqNo + "] was processed twice in generation [" + generation + "], with different data. " +
|
||||
"prvOp [" + prvOp + "], newOp [" + newOp + "]", previous.v2());
|
||||
if (newOp.equals(prvOp) == false) {
|
||||
throw new AssertionError(
|
||||
"seqNo [" + seqNo + "] was processed twice in generation [" + generation + "], with different data. " +
|
||||
"prvOp [" + prvOp + "], newOp [" + newOp + "]", previous.v2());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
seenSequenceNumbers.put(seqNo,
|
||||
|
|
|
@ -21,9 +21,7 @@ package org.elasticsearch.action.resync;
|
|||
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
@ -38,7 +36,7 @@ public class ResyncReplicationRequestTests extends ESTestCase {
|
|||
public void testSerialization() throws IOException {
|
||||
final byte[] bytes = "{}".getBytes(Charset.forName("UTF-8"));
|
||||
final Translog.Index index = new Translog.Index("type", "id", 0, randomNonNegativeLong(),
|
||||
Versions.MATCH_ANY, VersionType.INTERNAL, bytes, null, -1);
|
||||
randomNonNegativeLong(), bytes, null, -1);
|
||||
final ShardId shardId = new ShardId(new Index("index", "uuid"), 0);
|
||||
final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, 42L, new Translog.Operation[]{index});
|
||||
|
||||
|
|
|
@ -1183,7 +1183,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
|
||||
create = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), create.primaryTerm(), indexResult.getVersion(),
|
||||
create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
|
||||
null, REPLICA, 0, -1, false);
|
||||
indexResult = replicaEngine.index(create);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
}
|
||||
|
@ -1197,7 +1197,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
|
||||
|
||||
create = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), create.primaryTerm(), indexResult.getVersion(),
|
||||
create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
|
||||
null, REPLICA, 0, -1, false);
|
||||
indexResult = replicaEngine.index(create);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
assertTrue(indexResult.isCreated());
|
||||
|
@ -1216,7 +1216,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
|
||||
|
||||
update = new Engine.Index(newUid(doc), doc, updateResult.getSeqNo(), update.primaryTerm(), updateResult.getVersion(),
|
||||
update.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
|
||||
null, REPLICA, 0, -1, false);
|
||||
updateResult = replicaEngine.index(update);
|
||||
assertThat(updateResult.getVersion(), equalTo(2L));
|
||||
assertFalse(updateResult.isCreated());
|
||||
|
@ -1269,7 +1269,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
Engine.IndexResult indexResult = engine.index(index);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
|
||||
index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
|
||||
index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), null, REPLICA, 0, -1, false);
|
||||
indexResult = replicaEngine.index(index);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
}
|
||||
|
@ -1418,7 +1418,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO,
|
||||
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
|
||||
version,
|
||||
forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType,
|
||||
forReplica ? null : versionType,
|
||||
forReplica ? REPLICA : PRIMARY,
|
||||
System.currentTimeMillis(), -1, false
|
||||
);
|
||||
|
@ -1427,7 +1427,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO,
|
||||
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
|
||||
version,
|
||||
forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType,
|
||||
forReplica ? null : versionType,
|
||||
forReplica ? REPLICA : PRIMARY,
|
||||
System.currentTimeMillis());
|
||||
}
|
||||
|
@ -3221,7 +3221,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
Engine.IndexResult indexResult = engine.index(index);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
|
||||
index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry);
|
||||
index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), null, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry);
|
||||
indexResult = replicaEngine.index(index);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
|
||||
|
@ -3235,7 +3235,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
assertEquals(1, topDocs.totalHits);
|
||||
}
|
||||
|
||||
index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry);
|
||||
index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), null, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry);
|
||||
indexResult = replicaEngine.index(index);
|
||||
assertThat(indexResult.getResultType(), equalTo(Engine.Result.Type.SUCCESS));
|
||||
replicaEngine.refresh("test");
|
||||
|
@ -3255,7 +3255,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
Engine.IndexResult result = engine.index(firstIndexRequest);
|
||||
assertThat(result.getVersion(), equalTo(1L));
|
||||
|
||||
Engine.Index firstIndexRequestReplica = new Engine.Index(newUid(doc), doc, result.getSeqNo(), firstIndexRequest.primaryTerm(), result.getVersion(), firstIndexRequest.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry);
|
||||
Engine.Index firstIndexRequestReplica = new Engine.Index(newUid(doc), doc, result.getSeqNo(), firstIndexRequest.primaryTerm(), result.getVersion(), null, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry);
|
||||
Engine.IndexResult indexReplicaResult = replicaEngine.index(firstIndexRequestReplica);
|
||||
assertThat(indexReplicaResult.getVersion(), equalTo(1L));
|
||||
|
||||
|
@ -3269,7 +3269,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
assertEquals(1, topDocs.totalHits);
|
||||
}
|
||||
|
||||
Engine.Index secondIndexRequestReplica = new Engine.Index(newUid(doc), doc, result.getSeqNo(), secondIndexRequest.primaryTerm(), result.getVersion(), firstIndexRequest.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry);
|
||||
Engine.Index secondIndexRequestReplica = new Engine.Index(newUid(doc), doc, result.getSeqNo(), secondIndexRequest.primaryTerm(), result.getVersion(), null, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry);
|
||||
replicaEngine.index(secondIndexRequestReplica);
|
||||
replicaEngine.refresh("test");
|
||||
try (Engine.Searcher searcher = replicaEngine.acquireSearcher("test")) {
|
||||
|
@ -3292,7 +3292,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
}
|
||||
|
||||
public Engine.Index appendOnlyReplica(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp, final long seqNo) {
|
||||
return new Engine.Index(newUid(doc), doc, seqNo, 2, 1, VersionType.EXTERNAL,
|
||||
return new Engine.Index(newUid(doc), doc, seqNo, 2, 1, null,
|
||||
Engine.Operation.Origin.REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, retry);
|
||||
}
|
||||
|
||||
|
@ -3694,7 +3694,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
sequenceNumberSupplier.getAsLong(),
|
||||
1,
|
||||
i,
|
||||
VersionType.EXTERNAL,
|
||||
origin == PRIMARY ? VersionType.EXTERNAL : null,
|
||||
origin,
|
||||
System.nanoTime(),
|
||||
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
|
||||
|
@ -3708,7 +3708,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
sequenceNumberSupplier.getAsLong(),
|
||||
1,
|
||||
i,
|
||||
VersionType.EXTERNAL,
|
||||
origin == PRIMARY ? VersionType.EXTERNAL : null,
|
||||
origin,
|
||||
System.nanoTime());
|
||||
operations.add(delete);
|
||||
|
@ -3928,7 +3928,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
|
||||
final Term uid = newUid(doc);
|
||||
final long time = System.nanoTime();
|
||||
actualEngine.index(new Engine.Index(uid, doc, seqNo, 1, 1, VersionType.EXTERNAL, REPLICA, time, time, false));
|
||||
actualEngine.index(new Engine.Index(uid, doc, seqNo, 1, 1, null, REPLICA, time, time, false));
|
||||
if (rarely()) {
|
||||
actualEngine.rollTranslogGeneration();
|
||||
}
|
||||
|
@ -4686,7 +4686,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
for (int i = 0; i < seqNos.size(); i++) {
|
||||
ParsedDocument doc = testParsedDocument(Long.toString(seqNos.get(i)), null, testDocument(), new BytesArray("{}"), null);
|
||||
Engine.Index index = new Engine.Index(newUid(doc), doc, seqNos.get(i), 0,
|
||||
1, VersionType.EXTERNAL, REPLICA, System.nanoTime(), -1, false);
|
||||
1, null, REPLICA, System.nanoTime(), -1, false);
|
||||
engine.index(index);
|
||||
if (randomBoolean()) {
|
||||
engine.flush();
|
||||
|
|
|
@ -186,7 +186,6 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
|
|||
remainingReplica.applyIndexOperationOnReplica(
|
||||
remainingReplica.getLocalCheckpoint() + 1,
|
||||
1,
|
||||
VersionType.EXTERNAL,
|
||||
randomNonNegativeLong(),
|
||||
false,
|
||||
SourceToParse.source("index", "type", "replica", new BytesArray("{}"), XContentType.JSON));
|
||||
|
|
|
@ -72,7 +72,6 @@ import org.elasticsearch.common.xcontent.XContentType;
|
|||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.engine.CommitStats;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.EngineException;
|
||||
|
@ -1545,17 +1544,17 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
* - If flush and then recover from the existing store, delete #1 will be removed while index #0 is still retained and replayed.
|
||||
*/
|
||||
final IndexShard shard = newStartedShard(false);
|
||||
shard.applyDeleteOperationOnReplica(1, 2, "_doc", "id", VersionType.EXTERNAL);
|
||||
shard.applyDeleteOperationOnReplica(1, 2, "_doc", "id");
|
||||
shard.getEngine().rollTranslogGeneration(); // isolate the delete in it's own generation
|
||||
shard.applyIndexOperationOnReplica(0, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||
shard.applyIndexOperationOnReplica(0, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||
SourceToParse.source(shard.shardId().getIndexName(), "_doc", "id", new BytesArray("{}"), XContentType.JSON));
|
||||
shard.applyIndexOperationOnReplica(3, 3, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||
shard.applyIndexOperationOnReplica(3, 3, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||
SourceToParse.source(shard.shardId().getIndexName(), "_doc", "id-3", new BytesArray("{}"), XContentType.JSON));
|
||||
// Flushing a new commit with local checkpoint=1 allows to skip the translog gen #1 in recovery.
|
||||
shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
|
||||
shard.applyIndexOperationOnReplica(2, 3, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||
shard.applyIndexOperationOnReplica(2, 3, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||
SourceToParse.source(shard.shardId().getIndexName(), "_doc", "id-2", new BytesArray("{}"), XContentType.JSON));
|
||||
shard.applyIndexOperationOnReplica(5, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||
shard.applyIndexOperationOnReplica(5, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||
SourceToParse.source(shard.shardId().getIndexName(), "_doc", "id-5", new BytesArray("{}"), XContentType.JSON));
|
||||
|
||||
final int translogOps;
|
||||
|
@ -1646,8 +1645,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
updateMappings(otherShard, shard.indexSettings().getIndexMetaData());
|
||||
SourceToParse sourceToParse = SourceToParse.source(shard.shardId().getIndexName(), "_doc", "1",
|
||||
new BytesArray("{}"), XContentType.JSON);
|
||||
otherShard.applyIndexOperationOnReplica(1, 1,
|
||||
VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse);
|
||||
otherShard.applyIndexOperationOnReplica(1, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse);
|
||||
|
||||
final ShardRouting primaryShardRouting = shard.routingEntry();
|
||||
IndexShard newShard = reinitShard(otherShard, ShardRoutingHelper.initWithSameId(primaryShardRouting,
|
||||
|
@ -1763,18 +1761,18 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
final IndexShard shard = newStartedShard(false);
|
||||
final String indexName = shard.shardId().getIndexName();
|
||||
// Index #0, index #1
|
||||
shard.applyIndexOperationOnReplica(0, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||
shard.applyIndexOperationOnReplica(0, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||
SourceToParse.source(indexName, "_doc", "doc-0", new BytesArray("{}"), XContentType.JSON));
|
||||
flushShard(shard);
|
||||
shard.updateGlobalCheckpointOnReplica(0, "test"); // stick the global checkpoint here.
|
||||
shard.applyIndexOperationOnReplica(1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||
shard.applyIndexOperationOnReplica(1, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||
SourceToParse.source(indexName, "_doc", "doc-1", new BytesArray("{}"), XContentType.JSON));
|
||||
flushShard(shard);
|
||||
assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1"));
|
||||
// Simulate resync (without rollback): Noop #1, index #2
|
||||
acquireReplicaOperationPermitBlockingly(shard, shard.primaryTerm + 1);
|
||||
shard.markSeqNoAsNoop(1, "test");
|
||||
shard.applyIndexOperationOnReplica(2, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||
shard.applyIndexOperationOnReplica(2, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||
SourceToParse.source(indexName, "_doc", "doc-2", new BytesArray("{}"), XContentType.JSON));
|
||||
flushShard(shard);
|
||||
assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1", "doc-2"));
|
||||
|
@ -2104,11 +2102,11 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
int numCorruptEntries = 0;
|
||||
for (int i = 0; i < numTotalEntries; i++) {
|
||||
if (randomBoolean()) {
|
||||
operations.add(new Translog.Index("_doc", "1", 0, primary.getPrimaryTerm(), 1, VersionType.INTERNAL,
|
||||
operations.add(new Translog.Index("_doc", "1", 0, primary.getPrimaryTerm(), 1,
|
||||
"{\"foo\" : \"bar\"}".getBytes(Charset.forName("UTF-8")), null, -1));
|
||||
} else {
|
||||
// corrupt entry
|
||||
operations.add(new Translog.Index("_doc", "2", 1, primary.getPrimaryTerm(), 1, VersionType.INTERNAL,
|
||||
operations.add(new Translog.Index("_doc", "2", 1, primary.getPrimaryTerm(), 1,
|
||||
"{\"foo\" : \"bar}".getBytes(Charset.forName("UTF-8")), null, -1));
|
||||
numCorruptEntries++;
|
||||
}
|
||||
|
@ -2603,8 +2601,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
final String id = Integer.toString(i);
|
||||
SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "_doc", id,
|
||||
new BytesArray("{}"), XContentType.JSON);
|
||||
indexShard.applyIndexOperationOnReplica(i,
|
||||
1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse);
|
||||
indexShard.applyIndexOperationOnReplica(i, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse);
|
||||
if (!gap && i == localCheckpoint + 1) {
|
||||
localCheckpoint++;
|
||||
}
|
||||
|
|
|
@ -416,9 +416,9 @@ public class TranslogTests extends ESTestCase {
|
|||
{
|
||||
final TranslogStats stats = stats();
|
||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(1));
|
||||
assertThat(stats.getTranslogSizeInBytes(), equalTo(163L));
|
||||
assertThat(stats.getTranslogSizeInBytes(), equalTo(162L));
|
||||
assertThat(stats.getUncommittedOperations(), equalTo(1));
|
||||
assertThat(stats.getUncommittedSizeInBytes(), equalTo(163L));
|
||||
assertThat(stats.getUncommittedSizeInBytes(), equalTo(162L));
|
||||
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L));
|
||||
}
|
||||
|
||||
|
@ -426,9 +426,9 @@ public class TranslogTests extends ESTestCase {
|
|||
{
|
||||
final TranslogStats stats = stats();
|
||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(2));
|
||||
assertThat(stats.getTranslogSizeInBytes(), equalTo(212L));
|
||||
assertThat(stats.getTranslogSizeInBytes(), equalTo(210L));
|
||||
assertThat(stats.getUncommittedOperations(), equalTo(2));
|
||||
assertThat(stats.getUncommittedSizeInBytes(), equalTo(212L));
|
||||
assertThat(stats.getUncommittedSizeInBytes(), equalTo(210L));
|
||||
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L));
|
||||
}
|
||||
|
||||
|
@ -436,9 +436,9 @@ public class TranslogTests extends ESTestCase {
|
|||
{
|
||||
final TranslogStats stats = stats();
|
||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(3));
|
||||
assertThat(stats.getTranslogSizeInBytes(), equalTo(261L));
|
||||
assertThat(stats.getTranslogSizeInBytes(), equalTo(258L));
|
||||
assertThat(stats.getUncommittedOperations(), equalTo(3));
|
||||
assertThat(stats.getUncommittedSizeInBytes(), equalTo(261L));
|
||||
assertThat(stats.getUncommittedSizeInBytes(), equalTo(258L));
|
||||
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L));
|
||||
}
|
||||
|
||||
|
@ -446,13 +446,13 @@ public class TranslogTests extends ESTestCase {
|
|||
{
|
||||
final TranslogStats stats = stats();
|
||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(4));
|
||||
assertThat(stats.getTranslogSizeInBytes(), equalTo(303L));
|
||||
assertThat(stats.getTranslogSizeInBytes(), equalTo(300L));
|
||||
assertThat(stats.getUncommittedOperations(), equalTo(4));
|
||||
assertThat(stats.getUncommittedSizeInBytes(), equalTo(303L));
|
||||
assertThat(stats.getUncommittedSizeInBytes(), equalTo(300L));
|
||||
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L));
|
||||
}
|
||||
|
||||
final long expectedSizeInBytes = 358L;
|
||||
final long expectedSizeInBytes = 355L;
|
||||
translog.rollGeneration();
|
||||
{
|
||||
final TranslogStats stats = stats();
|
||||
|
@ -725,14 +725,12 @@ public class TranslogTests extends ESTestCase {
|
|||
assertEquals(expIndexOp.type(), indexOp.type());
|
||||
assertEquals(expIndexOp.source(), indexOp.source());
|
||||
assertEquals(expIndexOp.version(), indexOp.version());
|
||||
assertEquals(expIndexOp.versionType(), indexOp.versionType());
|
||||
break;
|
||||
case DELETE:
|
||||
Translog.Delete delOp = (Translog.Delete) op;
|
||||
Translog.Delete expDelOp = (Translog.Delete) expectedOp;
|
||||
assertEquals(expDelOp.uid(), delOp.uid());
|
||||
assertEquals(expDelOp.version(), delOp.version());
|
||||
assertEquals(expDelOp.versionType(), delOp.versionType());
|
||||
break;
|
||||
case NO_OP:
|
||||
final Translog.NoOp noOp = (Translog.NoOp) op;
|
||||
|
@ -1478,7 +1476,7 @@ public class TranslogTests extends ESTestCase {
|
|||
try (Translog ignored = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) {
|
||||
fail("corrupted");
|
||||
} catch (IllegalStateException ex) {
|
||||
assertEquals("Checkpoint file translog-3.ckp already exists but has corrupted content expected: Checkpoint{offset=3080, " +
|
||||
assertEquals("Checkpoint file translog-3.ckp already exists but has corrupted content expected: Checkpoint{offset=3025, " +
|
||||
"numOps=55, generation=3, minSeqNo=45, maxSeqNo=99, globalCheckpoint=-1, minTranslogGeneration=1, trimmedAboveSeqNo=-2} but got: Checkpoint{offset=0, numOps=0, " +
|
||||
"generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-1, minTranslogGeneration=0, trimmedAboveSeqNo=-2}", ex.getMessage());
|
||||
}
|
||||
|
@ -1842,8 +1840,7 @@ public class TranslogTests extends ESTestCase {
|
|||
new Term("_uid", threadId + "_" + opCount),
|
||||
seqNoGenerator.getAndIncrement(),
|
||||
primaryTerm.get(),
|
||||
1 + randomInt(100000),
|
||||
randomFrom(VersionType.values()));
|
||||
1 + randomInt(100000));
|
||||
break;
|
||||
case NO_OP:
|
||||
op = new Translog.NoOp(seqNoGenerator.getAndIncrement(), primaryTerm.get(), randomAlphaOfLength(16));
|
||||
|
|
|
@ -122,22 +122,22 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
|
|||
final String indexName = orgReplica.shardId().getIndexName();
|
||||
|
||||
// delete #1
|
||||
orgReplica.applyDeleteOperationOnReplica(1, 2, "type", "id", VersionType.EXTERNAL);
|
||||
orgReplica.applyDeleteOperationOnReplica(1, 2, "type", "id");
|
||||
getTranslog(orgReplica).rollGeneration(); // isolate the delete in it's own generation
|
||||
// index #0
|
||||
orgReplica.applyIndexOperationOnReplica(0, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||
orgReplica.applyIndexOperationOnReplica(0, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||
SourceToParse.source(indexName, "type", "id", new BytesArray("{}"), XContentType.JSON));
|
||||
// index #3
|
||||
orgReplica.applyIndexOperationOnReplica(3, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||
orgReplica.applyIndexOperationOnReplica(3, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||
SourceToParse.source(indexName, "type", "id-3", new BytesArray("{}"), XContentType.JSON));
|
||||
// Flushing a new commit with local checkpoint=1 allows to delete the translog gen #1.
|
||||
orgReplica.flush(new FlushRequest().force(true).waitIfOngoing(true));
|
||||
// index #2
|
||||
orgReplica.applyIndexOperationOnReplica(2, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||
orgReplica.applyIndexOperationOnReplica(2, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||
SourceToParse.source(indexName, "type", "id-2", new BytesArray("{}"), XContentType.JSON));
|
||||
orgReplica.updateGlobalCheckpointOnReplica(3L, "test");
|
||||
// index #5 -> force NoOp #4.
|
||||
orgReplica.applyIndexOperationOnReplica(5, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||
orgReplica.applyIndexOperationOnReplica(5, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||
SourceToParse.source(indexName, "type", "id-5", new BytesArray("{}"), XContentType.JSON));
|
||||
|
||||
final int translogOps;
|
||||
|
|
|
@ -52,7 +52,6 @@ import org.elasticsearch.common.xcontent.XContentType;
|
|||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.codec.CodecService;
|
||||
import org.elasticsearch.index.mapper.IdFieldMapper;
|
||||
import org.elasticsearch.index.mapper.Mapping;
|
||||
|
@ -493,14 +492,12 @@ public abstract class EngineTestCase extends ESTestCase {
|
|||
|
||||
protected Engine.Index replicaIndexForDoc(ParsedDocument doc, long version, long seqNo,
|
||||
boolean isRetry) {
|
||||
return new Engine.Index(newUid(doc), doc, seqNo, primaryTerm.get(), version, VersionType.EXTERNAL,
|
||||
Engine.Operation.Origin.REPLICA, System.nanoTime(),
|
||||
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry);
|
||||
return new Engine.Index(newUid(doc), doc, seqNo, primaryTerm.get(), version, null, Engine.Operation.Origin.REPLICA,
|
||||
System.nanoTime(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry);
|
||||
}
|
||||
|
||||
protected Engine.Delete replicaDeleteForDoc(String id, long version, long seqNo, long startTime) {
|
||||
return new Engine.Delete("test", id, newUid(id), seqNo, 1, version, VersionType.EXTERNAL,
|
||||
Engine.Operation.Origin.REPLICA, startTime);
|
||||
return new Engine.Delete("test", id, newUid(id), seqNo, 1, version, null, Engine.Operation.Origin.REPLICA, startTime);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -124,14 +124,12 @@ public class TranslogHandler implements EngineConfig.TranslogRecoveryRunner {
|
|||
source(indexName, index.type(), index.id(), index.source(),
|
||||
XContentHelper.xContentType(index.source()))
|
||||
.routing(index.routing()), index.seqNo(), index.primaryTerm(),
|
||||
index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin,
|
||||
index.getAutoGeneratedIdTimestamp(), true);
|
||||
index.version(), null, origin, index.getAutoGeneratedIdTimestamp(), true);
|
||||
return engineIndex;
|
||||
case DELETE:
|
||||
final Translog.Delete delete = (Translog.Delete) operation;
|
||||
final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(),
|
||||
delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(),
|
||||
origin, System.nanoTime());
|
||||
delete.primaryTerm(), delete.version(), null, origin, System.nanoTime());
|
||||
return engineDelete;
|
||||
case NO_OP:
|
||||
final Translog.NoOp noOp = (Translog.NoOp) operation;
|
||||
|
|
|
@ -573,7 +573,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
shard.getLocalCheckpoint());
|
||||
} else {
|
||||
result = shard.applyIndexOperationOnReplica(shard.seqNoStats().getMaxSeqNo() + 1, 0,
|
||||
VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse);
|
||||
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse);
|
||||
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
|
||||
throw new TransportReplicationAction.RetryOnReplicaException(shard.shardId,
|
||||
"Mappings are not available on the replica yet, triggered update: " + result.getRequiredMappingUpdate());
|
||||
|
@ -591,7 +591,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
if (shard.routingEntry().primary()) {
|
||||
return shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, type, id, VersionType.INTERNAL);
|
||||
} else {
|
||||
return shard.applyDeleteOperationOnReplica(shard.seqNoStats().getMaxSeqNo() + 1, 0L, type, id, VersionType.EXTERNAL);
|
||||
return shard.applyDeleteOperationOnReplica(shard.seqNoStats().getMaxSeqNo() + 1, 0L, type, id);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue