Add sequence numbers based optimistic concurrency control support to Engine (#36467)
This commit add support to engine operations for resolving and verifying the sequence number and primary term of the last modification to a document before performing an operation. This is infrastructure to move our (optimistic concurrency control)[http://en.wikipedia.org/wiki/Optimistic_concurrency_control] API to use sequence numbers instead of internal versioning. Relates #36148 Relates #10708
This commit is contained in:
parent
cd1bec3a06
commit
f6b5d7e013
|
@ -94,7 +94,7 @@ final class PerThreadIDVersionAndSeqNoLookup {
|
|||
* using the same cache key. Otherwise we'd have to disable caching
|
||||
* entirely for these readers.
|
||||
*/
|
||||
public DocIdAndVersion lookupVersion(BytesRef id, LeafReaderContext context)
|
||||
public DocIdAndVersion lookupVersion(BytesRef id, boolean loadSeqNo, LeafReaderContext context)
|
||||
throws IOException {
|
||||
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
|
||||
"context's reader is not the same as the reader class was initialized on.";
|
||||
|
@ -108,7 +108,28 @@ final class PerThreadIDVersionAndSeqNoLookup {
|
|||
if (versions.advanceExact(docID) == false) {
|
||||
throw new IllegalArgumentException("Document [" + docID + "] misses the [" + VersionFieldMapper.NAME + "] field");
|
||||
}
|
||||
return new DocIdAndVersion(docID, versions.longValue(), context.reader(), context.docBase);
|
||||
final long seqNo;
|
||||
final long term;
|
||||
if (loadSeqNo) {
|
||||
NumericDocValues seqNos = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
|
||||
// remove the null check in 7.0 once we can't read indices with no seq#
|
||||
if (seqNos != null && seqNos.advanceExact(docID)) {
|
||||
seqNo = seqNos.longValue();
|
||||
} else {
|
||||
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
|
||||
}
|
||||
NumericDocValues terms = context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
|
||||
if (terms != null && terms.advanceExact(docID)) {
|
||||
term = terms.longValue();
|
||||
} else {
|
||||
term = 0;
|
||||
}
|
||||
|
||||
} else {
|
||||
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
|
||||
term = 0;
|
||||
}
|
||||
return new DocIdAndVersion(docID, versions.longValue(), seqNo, term, context.reader(), context.docBase);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
|
@ -150,6 +171,7 @@ final class PerThreadIDVersionAndSeqNoLookup {
|
|||
final NumericDocValues seqNoDV = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
|
||||
for (; docID != DocIdSetIterator.NO_MORE_DOCS; docID = docsEnum.nextDoc()) {
|
||||
final long seqNo;
|
||||
// remove the null check in 7.0 once we can't read indices with no seq#
|
||||
if (seqNoDV != null && seqNoDV.advanceExact(docID)) {
|
||||
seqNo = seqNoDV.longValue();
|
||||
} else {
|
||||
|
|
|
@ -31,8 +31,6 @@ import java.util.List;
|
|||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import static org.elasticsearch.common.lucene.uid.Versions.NOT_FOUND;
|
||||
|
||||
/** Utility class to resolve the Lucene doc ID, version, seqNo and primaryTerms for a given uid. */
|
||||
public final class VersionsAndSeqNoResolver {
|
||||
|
||||
|
@ -96,12 +94,16 @@ public final class VersionsAndSeqNoResolver {
|
|||
public static class DocIdAndVersion {
|
||||
public final int docId;
|
||||
public final long version;
|
||||
public final long seqNo;
|
||||
public final long primaryTerm;
|
||||
public final LeafReader reader;
|
||||
public final int docBase;
|
||||
|
||||
public DocIdAndVersion(int docId, long version, LeafReader reader, int docBase) {
|
||||
public DocIdAndVersion(int docId, long version, long seqNo, long primaryTerm, LeafReader reader, int docBase) {
|
||||
this.docId = docId;
|
||||
this.version = version;
|
||||
this.seqNo = seqNo;
|
||||
this.primaryTerm = primaryTerm;
|
||||
this.reader = reader;
|
||||
this.docBase = docBase;
|
||||
}
|
||||
|
@ -129,7 +131,7 @@ public final class VersionsAndSeqNoResolver {
|
|||
* <li>a doc ID and a version otherwise
|
||||
* </ul>
|
||||
*/
|
||||
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException {
|
||||
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term, boolean loadSeqNo) throws IOException {
|
||||
PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field());
|
||||
List<LeafReaderContext> leaves = reader.leaves();
|
||||
// iterate backwards to optimize for the frequently updated documents
|
||||
|
@ -137,7 +139,7 @@ public final class VersionsAndSeqNoResolver {
|
|||
for (int i = leaves.size() - 1; i >= 0; i--) {
|
||||
final LeafReaderContext leaf = leaves.get(i);
|
||||
PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord];
|
||||
DocIdAndVersion result = lookup.lookupVersion(term.bytes(), leaf);
|
||||
DocIdAndVersion result = lookup.lookupVersion(term.bytes(), loadSeqNo, leaf);
|
||||
if (result != null) {
|
||||
return result;
|
||||
}
|
||||
|
@ -175,15 +177,4 @@ public final class VersionsAndSeqNoResolver {
|
|||
}
|
||||
return latest;
|
||||
}
|
||||
|
||||
/**
|
||||
* Load the version for the uid from the reader, returning<ul>
|
||||
* <li>{@link Versions#NOT_FOUND} if no matching doc exists,
|
||||
* <li>the version associated with the provided uid otherwise
|
||||
* </ul>
|
||||
*/
|
||||
public static long loadVersion(IndexReader reader, Term term) throws IOException {
|
||||
final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term);
|
||||
return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -606,7 +606,7 @@ public abstract class Engine implements Closeable {
|
|||
final Searcher searcher = searcherFactory.apply("get", scope);
|
||||
final DocIdAndVersion docIdAndVersion;
|
||||
try {
|
||||
docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.reader(), get.uid());
|
||||
docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.reader(), get.uid(), true);
|
||||
} catch (Exception e) {
|
||||
Releasables.closeWhileHandlingException(searcher);
|
||||
//TODO: A better exception goes here
|
||||
|
@ -1345,14 +1345,23 @@ public abstract class Engine implements Closeable {
|
|||
private final ParsedDocument doc;
|
||||
private final long autoGeneratedIdTimestamp;
|
||||
private final boolean isRetry;
|
||||
private final long ifSeqNoMatch;
|
||||
private final long ifPrimaryTermMatch;
|
||||
|
||||
public Index(Term uid, ParsedDocument doc, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin,
|
||||
long startTime, long autoGeneratedIdTimestamp, boolean isRetry) {
|
||||
long startTime, long autoGeneratedIdTimestamp, boolean isRetry, long ifSeqNoMatch, long ifPrimaryTermMatch) {
|
||||
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
|
||||
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
|
||||
assert ifPrimaryTermMatch >= 0 : "ifPrimaryTermMatch [" + ifPrimaryTermMatch + "] must be non negative";
|
||||
assert ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO || ifSeqNoMatch >=0 :
|
||||
"ifSeqNoMatch [" + ifSeqNoMatch + "] must be non negative or unset";
|
||||
assert (origin == Origin.PRIMARY) || (ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO && ifPrimaryTermMatch == 0) :
|
||||
"cas operations are only allowed if origin is primary. get [" + origin + "]";
|
||||
this.doc = doc;
|
||||
this.isRetry = isRetry;
|
||||
this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
|
||||
this.ifSeqNoMatch = ifSeqNoMatch;
|
||||
this.ifPrimaryTermMatch = ifPrimaryTermMatch;
|
||||
}
|
||||
|
||||
public Index(Term uid, long primaryTerm, ParsedDocument doc) {
|
||||
|
@ -1361,7 +1370,7 @@ public abstract class Engine implements Closeable {
|
|||
|
||||
Index(Term uid, long primaryTerm, ParsedDocument doc, long version) {
|
||||
this(uid, doc, SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, version, VersionType.INTERNAL,
|
||||
Origin.PRIMARY, System.nanoTime(), -1, false);
|
||||
Origin.PRIMARY, System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
|
||||
} // TEST ONLY
|
||||
|
||||
public ParsedDocument parsedDoc() {
|
||||
|
@ -1417,29 +1426,45 @@ public abstract class Engine implements Closeable {
|
|||
return isRetry;
|
||||
}
|
||||
|
||||
public long getIfSeqNoMatch() {
|
||||
return ifSeqNoMatch;
|
||||
}
|
||||
|
||||
public long getIfPrimaryTermMatch() {
|
||||
return ifPrimaryTermMatch;
|
||||
}
|
||||
}
|
||||
|
||||
public static class Delete extends Operation {
|
||||
|
||||
private final String type;
|
||||
private final String id;
|
||||
private final long ifSeqNoMatch;
|
||||
private final long ifPrimaryTermMatch;
|
||||
|
||||
public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType,
|
||||
Origin origin, long startTime) {
|
||||
Origin origin, long startTime, long ifSeqNoMatch, long ifPrimaryTermMatch) {
|
||||
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
|
||||
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
|
||||
assert ifPrimaryTermMatch >= 0 : "ifPrimaryTermMatch [" + ifPrimaryTermMatch + "] must be non negative";
|
||||
assert ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO || ifSeqNoMatch >=0 :
|
||||
"ifSeqNoMatch [" + ifSeqNoMatch + "] must be non negative or unset";
|
||||
assert (origin == Origin.PRIMARY) || (ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO && ifPrimaryTermMatch == 0) :
|
||||
"cas operations are only allowed if origin is primary. get [" + origin + "]";
|
||||
this.type = Objects.requireNonNull(type);
|
||||
this.id = Objects.requireNonNull(id);
|
||||
this.ifSeqNoMatch = ifSeqNoMatch;
|
||||
this.ifPrimaryTermMatch = ifPrimaryTermMatch;
|
||||
}
|
||||
|
||||
public Delete(String type, String id, Term uid, long primaryTerm) {
|
||||
this(type, id, uid, SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, Versions.MATCH_ANY, VersionType.INTERNAL,
|
||||
Origin.PRIMARY, System.nanoTime());
|
||||
Origin.PRIMARY, System.nanoTime(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
|
||||
}
|
||||
|
||||
public Delete(Delete template, VersionType versionType) {
|
||||
this(template.type(), template.id(), template.uid(), template.seqNo(), template.primaryTerm(), template.version(),
|
||||
versionType, template.origin(), template.startTime());
|
||||
versionType, template.origin(), template.startTime(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1462,6 +1487,13 @@ public abstract class Engine implements Closeable {
|
|||
return (uid().field().length() + uid().text().length()) * 2 + 20;
|
||||
}
|
||||
|
||||
public long getIfSeqNoMatch() {
|
||||
return ifSeqNoMatch;
|
||||
}
|
||||
|
||||
public long getIfPrimaryTermMatch() {
|
||||
return ifPrimaryTermMatch;
|
||||
}
|
||||
}
|
||||
|
||||
public static class NoOp extends Operation {
|
||||
|
|
|
@ -627,11 +627,12 @@ public class InternalEngine extends Engine {
|
|||
Translog.Operation operation = translog.readOperation(versionValue.getLocation());
|
||||
if (operation != null) {
|
||||
// in the case of a already pruned translog generation we might get null here - yet very unlikely
|
||||
TranslogLeafReader reader = new TranslogLeafReader((Translog.Index) operation, engineConfig
|
||||
final Translog.Index index = (Translog.Index) operation;
|
||||
TranslogLeafReader reader = new TranslogLeafReader(index, engineConfig
|
||||
.getIndexSettings().getIndexVersionCreated());
|
||||
return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader), reader::close),
|
||||
new VersionsAndSeqNoResolver.DocIdAndVersion(0,
|
||||
((Translog.Index) operation).version(), reader, 0));
|
||||
new VersionsAndSeqNoResolver.DocIdAndVersion(0, index.version(), index.seqNo(), index.primaryTerm(),
|
||||
reader, 0));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
|
||||
|
@ -708,14 +709,17 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
/** resolves the current version of the document, returning null if not found */
|
||||
private VersionValue resolveDocVersion(final Operation op) throws IOException {
|
||||
private VersionValue resolveDocVersion(final Operation op, boolean loadSeqNo) throws IOException {
|
||||
assert incrementVersionLookup(); // used for asserting in tests
|
||||
VersionValue versionValue = getVersionFromMap(op.uid().bytes());
|
||||
if (versionValue == null) {
|
||||
assert incrementIndexVersionLookup(); // used for asserting in tests
|
||||
final long currentVersion = loadCurrentVersionFromIndex(op.uid());
|
||||
if (currentVersion != Versions.NOT_FOUND) {
|
||||
versionValue = new IndexVersionValue(null, currentVersion, SequenceNumbers.UNASSIGNED_SEQ_NO, 0L);
|
||||
final VersionsAndSeqNoResolver.DocIdAndVersion docIdAndVersion;
|
||||
try (Searcher searcher = acquireSearcher("load_version", SearcherScope.INTERNAL)) {
|
||||
docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.reader(), op.uid(), loadSeqNo);
|
||||
}
|
||||
if (docIdAndVersion != null) {
|
||||
versionValue = new IndexVersionValue(null, docIdAndVersion.version, docIdAndVersion.seqNo, docIdAndVersion.primaryTerm);
|
||||
}
|
||||
} else if (engineConfig.isEnableGcDeletes() && versionValue.isDelete() &&
|
||||
(engineConfig.getThreadPool().relativeTimeInMillis() - ((DeleteVersionValue)versionValue).time) > getGcDeletesInMillis()) {
|
||||
|
@ -960,7 +964,8 @@ public class InternalEngine extends Engine {
|
|||
} else {
|
||||
versionMap.enforceSafeAccess();
|
||||
// resolves incoming version
|
||||
final VersionValue versionValue = resolveDocVersion(index);
|
||||
final VersionValue versionValue =
|
||||
resolveDocVersion(index, index.getIfSeqNoMatch() != SequenceNumbers.UNASSIGNED_SEQ_NO);
|
||||
final long currentVersion;
|
||||
final boolean currentNotFoundOrDeleted;
|
||||
if (versionValue == null) {
|
||||
|
@ -970,7 +975,17 @@ public class InternalEngine extends Engine {
|
|||
currentVersion = versionValue.version;
|
||||
currentNotFoundOrDeleted = versionValue.isDelete();
|
||||
}
|
||||
if (index.versionType().isVersionConflictForWrites(
|
||||
if (index.getIfSeqNoMatch() != SequenceNumbers.UNASSIGNED_SEQ_NO && versionValue == null) {
|
||||
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, index.type(), index.id(),
|
||||
index.getIfSeqNoMatch(), index.getIfPrimaryTermMatch(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
|
||||
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion, getPrimaryTerm());
|
||||
} else if (index.getIfSeqNoMatch() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
|
||||
versionValue.seqNo != index.getIfSeqNoMatch() || versionValue.term != index.getIfPrimaryTermMatch()
|
||||
)) {
|
||||
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, index.type(), index.id(),
|
||||
index.getIfSeqNoMatch(), index.getIfPrimaryTermMatch(), versionValue.seqNo, versionValue.term);
|
||||
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion, getPrimaryTerm());
|
||||
} else if (index.versionType().isVersionConflictForWrites(
|
||||
currentVersion, index.version(), currentNotFoundOrDeleted)) {
|
||||
final VersionConflictEngineException e =
|
||||
new VersionConflictEngineException(shardId, index, currentVersion, currentNotFoundOrDeleted);
|
||||
|
@ -1011,7 +1026,7 @@ public class InternalEngine extends Engine {
|
|||
assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false);
|
||||
addDocs(index.docs(), indexWriter);
|
||||
}
|
||||
return new IndexResult(plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
|
||||
return new IndexResult(plan.versionForIndexing, index.primaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
|
||||
} catch (Exception ex) {
|
||||
if (indexWriter.getTragicException() == null) {
|
||||
/* There is no tragic event recorded so this must be a document failure.
|
||||
|
@ -1027,7 +1042,7 @@ public class InternalEngine extends Engine {
|
|||
* we return a `MATCH_ANY` version to indicate no document was index. The value is
|
||||
* not used anyway
|
||||
*/
|
||||
return new IndexResult(ex, Versions.MATCH_ANY, getPrimaryTerm(), plan.seqNoForIndexing);
|
||||
return new IndexResult(ex, Versions.MATCH_ANY, index.primaryTerm(), plan.seqNoForIndexing);
|
||||
} else {
|
||||
throw ex;
|
||||
}
|
||||
|
@ -1287,7 +1302,7 @@ public class InternalEngine extends Engine {
|
|||
assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
|
||||
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
|
||||
// resolve operation from external to internal
|
||||
final VersionValue versionValue = resolveDocVersion(delete);
|
||||
final VersionValue versionValue = resolveDocVersion(delete, delete.getIfSeqNoMatch() != SequenceNumbers.UNASSIGNED_SEQ_NO);
|
||||
assert incrementVersionLookup();
|
||||
final long currentVersion;
|
||||
final boolean currentlyDeleted;
|
||||
|
@ -1299,7 +1314,17 @@ public class InternalEngine extends Engine {
|
|||
currentlyDeleted = versionValue.isDelete();
|
||||
}
|
||||
final DeletionStrategy plan;
|
||||
if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) {
|
||||
if (delete.getIfSeqNoMatch() != SequenceNumbers.UNASSIGNED_SEQ_NO && versionValue == null) {
|
||||
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete.type(), delete.id(),
|
||||
delete.getIfSeqNoMatch(), delete.getIfPrimaryTermMatch(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
|
||||
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), currentlyDeleted);
|
||||
} else if (delete.getIfSeqNoMatch() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
|
||||
versionValue.seqNo != delete.getIfSeqNoMatch() || versionValue.term != delete.getIfPrimaryTermMatch()
|
||||
)) {
|
||||
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete.type(), delete.id(),
|
||||
delete.getIfSeqNoMatch(), delete.getIfPrimaryTermMatch(), versionValue.seqNo, versionValue.term);
|
||||
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), currentlyDeleted);
|
||||
} else if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) {
|
||||
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted);
|
||||
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), currentlyDeleted);
|
||||
} else {
|
||||
|
@ -1346,7 +1371,7 @@ public class InternalEngine extends Engine {
|
|||
if (indexWriter.getTragicException() == null) {
|
||||
// there is no tragic event and such it must be a document level failure
|
||||
return new DeleteResult(
|
||||
ex, plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false);
|
||||
ex, plan.versionOfDeletion, delete.primaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false);
|
||||
} else {
|
||||
throw ex;
|
||||
}
|
||||
|
@ -2066,13 +2091,6 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
}
|
||||
|
||||
private long loadCurrentVersionFromIndex(Term uid) throws IOException {
|
||||
assert incrementIndexVersionLookup();
|
||||
try (Searcher searcher = acquireSearcher("load_version", SearcherScope.INTERNAL)) {
|
||||
return VersionsAndSeqNoResolver.loadVersion(searcher.reader(), uid);
|
||||
}
|
||||
}
|
||||
|
||||
private IndexWriter createWriter() throws IOException {
|
||||
try {
|
||||
final IndexWriterConfig iwc = getIndexWriterConfig();
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.elasticsearch.index.engine;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
||||
|
@ -30,6 +31,16 @@ public class VersionConflictEngineException extends EngineException {
|
|||
this(shardId, op.type(), op.id(), op.versionType().explainConflictForWrites(currentVersion, op.version(), deleted));
|
||||
}
|
||||
|
||||
public VersionConflictEngineException(ShardId shardId, String type, String id,
|
||||
long compareAndWriteSeqNo, long compareAndWriteTerm,
|
||||
long currentSeqNo, long currentTerm) {
|
||||
this(shardId, type, id, "required seqNo [" + compareAndWriteSeqNo + "], primary term [" + compareAndWriteTerm +"]." +
|
||||
(currentSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO ?
|
||||
" but no document was found" :
|
||||
" current document has seqNo [" + currentSeqNo + "] and primary term ["+ currentTerm + "]"
|
||||
));
|
||||
}
|
||||
|
||||
public VersionConflictEngineException(ShardId shardId, String type, String id, String explanation) {
|
||||
this(shardId, null, type, id, explanation);
|
||||
}
|
||||
|
|
|
@ -735,7 +735,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
doc.addDynamicMappingsUpdate(docMapper.getMapping());
|
||||
}
|
||||
Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(doc.id()));
|
||||
return new Engine.Index(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry);
|
||||
return new Engine.Index(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry,
|
||||
UNASSIGNED_SEQ_NO, 0);
|
||||
}
|
||||
|
||||
private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOException {
|
||||
|
@ -834,7 +835,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
private Engine.Delete prepareDelete(String type, String id, Term uid, long seqNo, long primaryTerm, long version,
|
||||
VersionType versionType, Engine.Operation.Origin origin) {
|
||||
long startTime = System.nanoTime();
|
||||
return new Engine.Delete(resolveType(type), id, uid, seqNo, primaryTerm, version, versionType, origin, startTime);
|
||||
return new Engine.Delete(resolveType(type), id, uid, seqNo, primaryTerm, version, versionType, origin, startTime,
|
||||
UNASSIGNED_SEQ_NO, 0);
|
||||
}
|
||||
|
||||
private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) throws IOException {
|
||||
|
|
|
@ -58,19 +58,19 @@ public class VersionLookupTests extends ESTestCase {
|
|||
LeafReaderContext segment = reader.leaves().get(0);
|
||||
PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader(), IdFieldMapper.NAME);
|
||||
// found doc
|
||||
DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), segment);
|
||||
DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), randomBoolean(), segment);
|
||||
assertNotNull(result);
|
||||
assertEquals(87, result.version);
|
||||
assertEquals(0, result.docId);
|
||||
// not found doc
|
||||
assertNull(lookup.lookupVersion(new BytesRef("7"), segment));
|
||||
assertNull(lookup.lookupVersion(new BytesRef("7"), randomBoolean(), segment));
|
||||
// deleted doc
|
||||
writer.deleteDocuments(new Term(IdFieldMapper.NAME, "6"));
|
||||
reader.close();
|
||||
reader = DirectoryReader.open(writer);
|
||||
segment = reader.leaves().get(0);
|
||||
lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader(), IdFieldMapper.NAME);
|
||||
assertNull(lookup.lookupVersion(new BytesRef("6"), segment));
|
||||
assertNull(lookup.lookupVersion(new BytesRef("6"), randomBoolean(), segment));
|
||||
reader.close();
|
||||
writer.close();
|
||||
dir.close();
|
||||
|
@ -93,7 +93,7 @@ public class VersionLookupTests extends ESTestCase {
|
|||
LeafReaderContext segment = reader.leaves().get(0);
|
||||
PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader(), IdFieldMapper.NAME);
|
||||
// return the last doc when there are duplicates
|
||||
DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), segment);
|
||||
DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), randomBoolean(), segment);
|
||||
assertNotNull(result);
|
||||
assertEquals(87, result.version);
|
||||
assertEquals(1, result.docId);
|
||||
|
@ -103,7 +103,7 @@ public class VersionLookupTests extends ESTestCase {
|
|||
reader = DirectoryReader.open(writer);
|
||||
segment = reader.leaves().get(0);
|
||||
lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader(), IdFieldMapper.NAME);
|
||||
result = lookup.lookupVersion(new BytesRef("6"), segment);
|
||||
result = lookup.lookupVersion(new BytesRef("6"), randomBoolean(), segment);
|
||||
assertNotNull(result);
|
||||
assertEquals(87, result.version);
|
||||
assertEquals(1, result.docId);
|
||||
|
@ -113,7 +113,7 @@ public class VersionLookupTests extends ESTestCase {
|
|||
reader = DirectoryReader.open(writer);
|
||||
segment = reader.leaves().get(0);
|
||||
lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader(), IdFieldMapper.NAME);
|
||||
assertNull(lookup.lookupVersion(new BytesRef("6"), segment));
|
||||
assertNull(lookup.lookupVersion(new BytesRef("6"), randomBoolean(), segment));
|
||||
reader.close();
|
||||
writer.close();
|
||||
dir.close();
|
||||
|
|
|
@ -34,7 +34,6 @@ import org.elasticsearch.index.mapper.VersionFieldMapper;
|
|||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.hamcrest.MatcherAssert;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -42,7 +41,6 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.loadDocIdAndVersion;
|
||||
import static org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.loadVersion;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
|
@ -66,15 +64,14 @@ public class VersionsTests extends ESTestCase {
|
|||
Directory dir = newDirectory();
|
||||
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
|
||||
DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "_na_", 1));
|
||||
MatcherAssert.assertThat(loadVersion(directoryReader, new Term(IdFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND));
|
||||
assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1"), randomBoolean()), nullValue());
|
||||
|
||||
Document doc = new Document();
|
||||
doc.add(new Field(IdFieldMapper.NAME, "1", IdFieldMapper.Defaults.FIELD_TYPE));
|
||||
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 1));
|
||||
writer.updateDocument(new Term(IdFieldMapper.NAME, "1"), doc);
|
||||
directoryReader = reopen(directoryReader);
|
||||
assertThat(loadVersion(directoryReader, new Term(IdFieldMapper.NAME, "1")), equalTo(1L));
|
||||
assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1")).version, equalTo(1L));
|
||||
assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1"), randomBoolean()).version, equalTo(1L));
|
||||
|
||||
doc = new Document();
|
||||
Field uid = new Field(IdFieldMapper.NAME, "1", IdFieldMapper.Defaults.FIELD_TYPE);
|
||||
|
@ -83,8 +80,7 @@ public class VersionsTests extends ESTestCase {
|
|||
doc.add(version);
|
||||
writer.updateDocument(new Term(IdFieldMapper.NAME, "1"), doc);
|
||||
directoryReader = reopen(directoryReader);
|
||||
assertThat(loadVersion(directoryReader, new Term(IdFieldMapper.NAME, "1")), equalTo(2L));
|
||||
assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1")).version, equalTo(2L));
|
||||
assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1"), randomBoolean()).version, equalTo(2L));
|
||||
|
||||
// test reuse of uid field
|
||||
doc = new Document();
|
||||
|
@ -94,13 +90,11 @@ public class VersionsTests extends ESTestCase {
|
|||
writer.updateDocument(new Term(IdFieldMapper.NAME, "1"), doc);
|
||||
|
||||
directoryReader = reopen(directoryReader);
|
||||
assertThat(loadVersion(directoryReader, new Term(IdFieldMapper.NAME, "1")), equalTo(3L));
|
||||
assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1")).version, equalTo(3L));
|
||||
assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1"), randomBoolean()).version, equalTo(3L));
|
||||
|
||||
writer.deleteDocuments(new Term(IdFieldMapper.NAME, "1"));
|
||||
directoryReader = reopen(directoryReader);
|
||||
assertThat(loadVersion(directoryReader, new Term(IdFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND));
|
||||
assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1")), nullValue());
|
||||
assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1"), randomBoolean()), nullValue());
|
||||
directoryReader.close();
|
||||
writer.close();
|
||||
dir.close();
|
||||
|
@ -126,21 +120,18 @@ public class VersionsTests extends ESTestCase {
|
|||
|
||||
writer.updateDocuments(new Term(IdFieldMapper.NAME, "1"), docs);
|
||||
DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "_na_", 1));
|
||||
assertThat(loadVersion(directoryReader, new Term(IdFieldMapper.NAME, "1")), equalTo(5L));
|
||||
assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1")).version, equalTo(5L));
|
||||
assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1"), randomBoolean()).version, equalTo(5L));
|
||||
|
||||
version.setLongValue(6L);
|
||||
writer.updateDocuments(new Term(IdFieldMapper.NAME, "1"), docs);
|
||||
version.setLongValue(7L);
|
||||
writer.updateDocuments(new Term(IdFieldMapper.NAME, "1"), docs);
|
||||
directoryReader = reopen(directoryReader);
|
||||
assertThat(loadVersion(directoryReader, new Term(IdFieldMapper.NAME, "1")), equalTo(7L));
|
||||
assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1")).version, equalTo(7L));
|
||||
assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1"), randomBoolean()).version, equalTo(7L));
|
||||
|
||||
writer.deleteDocuments(new Term(IdFieldMapper.NAME, "1"));
|
||||
directoryReader = reopen(directoryReader);
|
||||
assertThat(loadVersion(directoryReader, new Term(IdFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND));
|
||||
assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1")), nullValue());
|
||||
assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1"), randomBoolean()), nullValue());
|
||||
directoryReader.close();
|
||||
writer.close();
|
||||
dir.close();
|
||||
|
@ -158,10 +149,10 @@ public class VersionsTests extends ESTestCase {
|
|||
writer.addDocument(doc);
|
||||
DirectoryReader reader = DirectoryReader.open(writer);
|
||||
// should increase cache size by 1
|
||||
assertEquals(87, loadVersion(reader, new Term(IdFieldMapper.NAME, "6")));
|
||||
assertEquals(87, loadDocIdAndVersion(reader, new Term(IdFieldMapper.NAME, "6"), randomBoolean()).version);
|
||||
assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size());
|
||||
// should be cache hit
|
||||
assertEquals(87, loadVersion(reader, new Term(IdFieldMapper.NAME, "6")));
|
||||
assertEquals(87, loadDocIdAndVersion(reader, new Term(IdFieldMapper.NAME, "6"), randomBoolean()).version);
|
||||
assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size());
|
||||
|
||||
reader.close();
|
||||
|
@ -182,11 +173,11 @@ public class VersionsTests extends ESTestCase {
|
|||
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
|
||||
writer.addDocument(doc);
|
||||
DirectoryReader reader = DirectoryReader.open(writer);
|
||||
assertEquals(87, loadVersion(reader, new Term(IdFieldMapper.NAME, "6")));
|
||||
assertEquals(87, loadDocIdAndVersion(reader, new Term(IdFieldMapper.NAME, "6"), randomBoolean()).version);
|
||||
assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size());
|
||||
// now wrap the reader
|
||||
DirectoryReader wrapped = ElasticsearchDirectoryReader.wrap(reader, new ShardId("bogus", "_na_", 5));
|
||||
assertEquals(87, loadVersion(wrapped, new Term(IdFieldMapper.NAME, "6")));
|
||||
assertEquals(87, loadDocIdAndVersion(wrapped, new Term(IdFieldMapper.NAME, "6"), randomBoolean()).version);
|
||||
// same size map: core cache key is shared
|
||||
assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size());
|
||||
|
||||
|
|
|
@ -79,6 +79,7 @@ import org.elasticsearch.cluster.routing.TestShardRouting;
|
|||
import org.elasticsearch.common.CheckedRunnable;
|
||||
import org.elasticsearch.common.Randomness;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.TriFunction;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
@ -171,6 +172,7 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.LOCAL_TRANS
|
|||
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY;
|
||||
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
|
||||
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
|
||||
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
|
||||
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.hamcrest.CoreMatchers.sameInstance;
|
||||
|
@ -590,7 +592,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
public void testCommitStats() throws IOException {
|
||||
final AtomicLong maxSeqNo = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
|
||||
final AtomicLong localCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
|
||||
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
|
||||
final AtomicLong globalCheckpoint = new AtomicLong(UNASSIGNED_SEQ_NO);
|
||||
try (
|
||||
Store store = createStore();
|
||||
InternalEngine engine = createEngine(store, createTempDir(), (maxSeq, localCP) -> new LocalCheckpointTracker(
|
||||
|
@ -626,7 +628,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
rarely() || maxSeqNo.get() == SequenceNumbers.NO_OPS_PERFORMED ?
|
||||
SequenceNumbers.NO_OPS_PERFORMED : randomIntBetween(0, 1024));
|
||||
globalCheckpoint.set(rarely() || localCheckpoint.get() == SequenceNumbers.NO_OPS_PERFORMED ?
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO : randomIntBetween(0, (int) localCheckpoint.get()));
|
||||
UNASSIGNED_SEQ_NO : randomIntBetween(0, (int) localCheckpoint.get()));
|
||||
|
||||
final Engine.CommitId commitId = engine.flush(true, true);
|
||||
|
||||
|
@ -707,15 +709,13 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
for (int i = 0; i < ops; i++) {
|
||||
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null);
|
||||
if (randomBoolean()) {
|
||||
final Engine.Index operation = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO,
|
||||
0, i, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(),
|
||||
-1, false);
|
||||
final Engine.Index operation = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO,
|
||||
0, i, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0);
|
||||
operations.add(operation);
|
||||
initialEngine.index(operation);
|
||||
} else {
|
||||
final Engine.Delete operation = new Engine.Delete("test", "1", newUid(doc),
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO, 0, i, VersionType.EXTERNAL,
|
||||
Engine.Operation.Origin.PRIMARY, System.nanoTime());
|
||||
final Engine.Delete operation = new Engine.Delete("test", "1", newUid(doc), UNASSIGNED_SEQ_NO, 0, i,
|
||||
VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), UNASSIGNED_SEQ_NO, 0);
|
||||
operations.add(operation);
|
||||
initialEngine.delete(operation);
|
||||
}
|
||||
|
@ -1183,10 +1183,10 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
final ParsedDocument parsedDoc3 =
|
||||
testParsedDocument("3", null, testDocumentWithTextField(), B_1, null);
|
||||
if (forceMergeFlushes) {
|
||||
engine.index(new Engine.Index(newUid(parsedDoc3), parsedDoc3, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
|
||||
engine.index(new Engine.Index(newUid(parsedDoc3), parsedDoc3, UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY,
|
||||
System.nanoTime() - engine.engineConfig.getFlushMergesAfter().nanos(),
|
||||
-1, false));
|
||||
-1, false, UNASSIGNED_SEQ_NO, 0));
|
||||
} else {
|
||||
engine.index(indexForDoc(parsedDoc3));
|
||||
}
|
||||
|
@ -1253,7 +1253,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
}
|
||||
if (randomBoolean()) {
|
||||
final String translogUUID = Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(),
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO, shardId, primaryTerm.get());
|
||||
UNASSIGNED_SEQ_NO, shardId, primaryTerm.get());
|
||||
store.associateIndexWithNewTranslog(translogUUID);
|
||||
}
|
||||
trimUnsafeCommits(config);
|
||||
|
@ -1292,7 +1292,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
|
||||
create = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), create.primaryTerm(), indexResult.getVersion(),
|
||||
null, REPLICA, 0, -1, false);
|
||||
null, REPLICA, 0, -1, false, UNASSIGNED_SEQ_NO, 0);
|
||||
indexResult = replicaEngine.index(create);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
}
|
||||
|
@ -1306,7 +1306,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
|
||||
|
||||
create = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), create.primaryTerm(), indexResult.getVersion(),
|
||||
null, REPLICA, 0, -1, false);
|
||||
null, REPLICA, 0, -1, false, UNASSIGNED_SEQ_NO, 0);
|
||||
indexResult = replicaEngine.index(create);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
assertTrue(indexResult.isCreated());
|
||||
|
@ -1325,7 +1325,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
|
||||
|
||||
update = new Engine.Index(newUid(doc), doc, updateResult.getSeqNo(), update.primaryTerm(), updateResult.getVersion(),
|
||||
null, REPLICA, 0, -1, false);
|
||||
null, REPLICA, 0, -1, false, UNASSIGNED_SEQ_NO, 0);
|
||||
updateResult = replicaEngine.index(update);
|
||||
assertThat(updateResult.getVersion(), equalTo(2L));
|
||||
assertFalse(updateResult.isCreated());
|
||||
|
@ -1382,7 +1382,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
|
||||
index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(),
|
||||
null, REPLICA, 0, -1, false);
|
||||
null, REPLICA, 0, -1, false, UNASSIGNED_SEQ_NO, 0);
|
||||
indexResult = replicaEngine.index(index);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
}
|
||||
|
@ -1669,13 +1669,13 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
|
||||
public void testVersioningCreateExistsException() throws IOException {
|
||||
ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null);
|
||||
Engine.Index create = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false);
|
||||
Engine.Index create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false, UNASSIGNED_SEQ_NO, 0);
|
||||
Engine.IndexResult indexResult = engine.index(create);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
|
||||
create = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED,
|
||||
VersionType.INTERNAL, PRIMARY, 0, -1, false);
|
||||
create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED,
|
||||
VersionType.INTERNAL, PRIMARY, 0, -1, false, UNASSIGNED_SEQ_NO, 0);
|
||||
indexResult = engine.index(create);
|
||||
assertThat(indexResult.getResultType(), equalTo(Engine.Result.Type.FAILURE));
|
||||
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
|
@ -1721,11 +1721,12 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
Document doc = testDocumentWithTextField(index.docs().get(0).get("value"));
|
||||
ParsedDocument parsedDocument = testParsedDocument(index.id(), index.routing(), doc, index.source(), null);
|
||||
return new Engine.Index(index.uid(), parsedDocument, newSeqNo, index.primaryTerm(), index.version(),
|
||||
index.versionType(), index.origin(), index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry());
|
||||
index.versionType(), index.origin(), index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry(),
|
||||
UNASSIGNED_SEQ_NO, 0);
|
||||
} else {
|
||||
Engine.Delete delete = (Engine.Delete) operation;
|
||||
return new Engine.Delete(delete.type(), delete.id(), delete.uid(), newSeqNo, delete.primaryTerm(),
|
||||
delete.version(), delete.versionType(), delete.origin(), delete.startTime());
|
||||
delete.version(), delete.versionType(), delete.origin(), delete.startTime(), UNASSIGNED_SEQ_NO, 0);
|
||||
}
|
||||
};
|
||||
final List<Engine.Operation> allOps = new ArrayList<>();
|
||||
|
@ -1799,33 +1800,65 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
String lastFieldValue = null;
|
||||
int opsPerformed = 0;
|
||||
long lastOpVersion = currentOpVersion;
|
||||
long lastOpSeqNo = UNASSIGNED_SEQ_NO;
|
||||
long lastOpTerm = 0;
|
||||
final AtomicLong currentTerm = new AtomicLong(1);
|
||||
BiFunction<Long, Engine.Index, Engine.Index> indexWithVersion = (version, index) -> new Engine.Index(index.uid(), index.parsedDoc(),
|
||||
index.seqNo(), index.primaryTerm(), version, index.versionType(), index.origin(), index.startTime(),
|
||||
index.getAutoGeneratedIdTimestamp(), index.isRetry());
|
||||
UNASSIGNED_SEQ_NO, currentTerm.get(), version, index.versionType(), index.origin(), index.startTime(),
|
||||
index.getAutoGeneratedIdTimestamp(), index.isRetry(), UNASSIGNED_SEQ_NO, 0);
|
||||
BiFunction<Long, Engine.Delete, Engine.Delete> delWithVersion = (version, delete) -> new Engine.Delete(delete.type(), delete.id(),
|
||||
delete.uid(), delete.seqNo(), delete.primaryTerm(), version, delete.versionType(), delete.origin(), delete.startTime());
|
||||
delete.uid(), UNASSIGNED_SEQ_NO, currentTerm.get(), version, delete.versionType(), delete.origin(), delete.startTime(),
|
||||
UNASSIGNED_SEQ_NO, 0);
|
||||
TriFunction<Long, Long, Engine.Index, Engine.Index> indexWithSeq = (seqNo, term, index) -> new Engine.Index(index.uid(),
|
||||
index.parsedDoc(), UNASSIGNED_SEQ_NO, currentTerm.get(), index.version(), index.versionType(), index.origin(),
|
||||
index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry(), seqNo, term);
|
||||
TriFunction<Long, Long, Engine.Delete, Engine.Delete> delWithSeq = (seqNo, term, delete) -> new Engine.Delete(delete.type(),
|
||||
delete.id(), delete.uid(), UNASSIGNED_SEQ_NO, currentTerm.get(), delete.version(), delete.versionType(), delete.origin(),
|
||||
delete.startTime(), seqNo, term);
|
||||
for (Engine.Operation op : ops) {
|
||||
final boolean versionConflict = rarely();
|
||||
final boolean versionedOp = versionConflict || randomBoolean();
|
||||
final long conflictingVersion = docDeleted || randomBoolean() ?
|
||||
lastOpVersion + (randomBoolean() ? 1 : -1) :
|
||||
Versions.MATCH_DELETED;
|
||||
final long conflictingSeqNo = lastOpSeqNo == UNASSIGNED_SEQ_NO || randomBoolean() ?
|
||||
lastOpSeqNo + 5 : // use 5 to go above 0 for magic numbers
|
||||
lastOpSeqNo;
|
||||
final long conflictingTerm = conflictingSeqNo == lastOpSeqNo || randomBoolean() ? lastOpTerm + 1 : lastOpTerm;
|
||||
if (rarely()) {
|
||||
currentTerm.incrementAndGet();
|
||||
}
|
||||
final long correctVersion = docDeleted && randomBoolean() ? Versions.MATCH_DELETED : lastOpVersion;
|
||||
logger.info("performing [{}]{}{}",
|
||||
op.operationType().name().charAt(0),
|
||||
versionConflict ? " (conflict " + conflictingVersion + ")" : "",
|
||||
versionedOp ? " (versioned " + correctVersion + ")" : "");
|
||||
versionedOp ? " (versioned " + correctVersion + ", seqNo " + lastOpSeqNo + ", term " + lastOpTerm + " )" : "");
|
||||
if (op instanceof Engine.Index) {
|
||||
final Engine.Index index = (Engine.Index) op;
|
||||
if (versionConflict) {
|
||||
// generate a conflict
|
||||
Engine.IndexResult result = engine.index(indexWithVersion.apply(conflictingVersion, index));
|
||||
final Engine.IndexResult result;
|
||||
if (randomBoolean()) {
|
||||
result = engine.index(indexWithSeq.apply(conflictingSeqNo, conflictingTerm, index));
|
||||
} else {
|
||||
result = engine.index(indexWithVersion.apply(conflictingVersion, index));
|
||||
}
|
||||
assertThat(result.isCreated(), equalTo(false));
|
||||
assertThat(result.getVersion(), equalTo(lastOpVersion));
|
||||
assertThat(result.getResultType(), equalTo(Engine.Result.Type.FAILURE));
|
||||
assertThat(result.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
} else {
|
||||
Engine.IndexResult result = engine.index(versionedOp ? indexWithVersion.apply(correctVersion, index) : index);
|
||||
final Engine.IndexResult result;
|
||||
if (versionedOp) {
|
||||
// TODO: add support for non-existing docs
|
||||
if (randomBoolean() && lastOpSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
|
||||
result = engine.index(indexWithSeq.apply(lastOpSeqNo, lastOpTerm, index));
|
||||
} else {
|
||||
result = engine.index(indexWithVersion.apply(correctVersion, index));
|
||||
}
|
||||
} else {
|
||||
result = engine.index(index);
|
||||
}
|
||||
assertThat(result.isCreated(), equalTo(docDeleted));
|
||||
assertThat(result.getVersion(), equalTo(Math.max(lastOpVersion + 1, 1)));
|
||||
assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS));
|
||||
|
@ -1833,25 +1866,41 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
lastFieldValue = index.docs().get(0).get("value");
|
||||
docDeleted = false;
|
||||
lastOpVersion = result.getVersion();
|
||||
lastOpSeqNo = result.getSeqNo();
|
||||
lastOpTerm = result.getTerm();
|
||||
opsPerformed++;
|
||||
}
|
||||
} else {
|
||||
final Engine.Delete delete = (Engine.Delete) op;
|
||||
if (versionConflict) {
|
||||
// generate a conflict
|
||||
Engine.DeleteResult result = engine.delete(delWithVersion.apply(conflictingVersion, delete));
|
||||
Engine.DeleteResult result;
|
||||
if (randomBoolean()) {
|
||||
result = engine.delete(delWithSeq.apply(conflictingSeqNo, conflictingTerm, delete));
|
||||
} else {
|
||||
result = engine.delete(delWithVersion.apply(conflictingVersion, delete));
|
||||
}
|
||||
assertThat(result.isFound(), equalTo(docDeleted == false));
|
||||
assertThat(result.getVersion(), equalTo(lastOpVersion));
|
||||
assertThat(result.getResultType(), equalTo(Engine.Result.Type.FAILURE));
|
||||
assertThat(result.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
} else {
|
||||
Engine.DeleteResult result = engine.delete(versionedOp ? delWithVersion.apply(correctVersion, delete) : delete);
|
||||
final Engine.DeleteResult result;
|
||||
if (versionedOp && lastOpSeqNo != UNASSIGNED_SEQ_NO && randomBoolean()) {
|
||||
result = engine.delete(delWithSeq.apply(lastOpSeqNo, lastOpTerm, delete));
|
||||
} else if (versionedOp) {
|
||||
result = engine.delete(delWithVersion.apply(correctVersion, delete));
|
||||
} else {
|
||||
result = engine.delete(delete);
|
||||
}
|
||||
assertThat(result.isFound(), equalTo(docDeleted == false));
|
||||
assertThat(result.getVersion(), equalTo(Math.max(lastOpVersion + 1, 1)));
|
||||
assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS));
|
||||
assertThat(result.getFailure(), nullValue());
|
||||
docDeleted = true;
|
||||
lastOpVersion = result.getVersion();
|
||||
lastOpSeqNo = UNASSIGNED_SEQ_NO;
|
||||
lastOpTerm = 0;
|
||||
opsPerformed++;
|
||||
}
|
||||
}
|
||||
|
@ -2065,9 +2114,9 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
Engine.Index index = new Engine.Index(uidTerm,
|
||||
testParsedDocument("1", null, testDocument(),
|
||||
bytesArray(Strings.collectionToCommaDelimitedString(values)), null),
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO, 2,
|
||||
UNASSIGNED_SEQ_NO, 2,
|
||||
get.version(), VersionType.INTERNAL,
|
||||
PRIMARY, System.currentTimeMillis(), -1, false);
|
||||
PRIMARY, System.currentTimeMillis(), -1, false, UNASSIGNED_SEQ_NO, 0);
|
||||
Engine.IndexResult indexResult = engine.index(index);
|
||||
if (indexResult.getResultType() == Engine.Result.Type.SUCCESS) {
|
||||
history.add(new OpAndVersion(indexResult.getVersion(), removed, added));
|
||||
|
@ -2214,8 +2263,8 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
// we have some docs indexed, so delete one of them
|
||||
id = randomFrom(indexedIds);
|
||||
final Engine.Delete delete = new Engine.Delete(
|
||||
"test", id, newUid(id), SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm.get(),
|
||||
rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, 0);
|
||||
"test", id, newUid(id), UNASSIGNED_SEQ_NO, primaryTerm.get(),
|
||||
rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, 0, UNASSIGNED_SEQ_NO, 0);
|
||||
final Engine.DeleteResult result = initialEngine.delete(delete);
|
||||
if (result.getResultType() == Engine.Result.Type.SUCCESS) {
|
||||
assertThat(result.getSeqNo(), equalTo(primarySeqNo + 1));
|
||||
|
@ -2223,7 +2272,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
indexedIds.remove(id);
|
||||
primarySeqNo++;
|
||||
} else {
|
||||
assertThat(result.getSeqNo(), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO));
|
||||
assertThat(result.getSeqNo(), equalTo(UNASSIGNED_SEQ_NO));
|
||||
assertThat(initialEngine.getSeqNoStats(-1).getMaxSeqNo(), equalTo(primarySeqNo));
|
||||
}
|
||||
} else {
|
||||
|
@ -2231,9 +2280,9 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
id = randomFrom(ids);
|
||||
ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
|
||||
final Engine.Index index = new Engine.Index(newUid(doc), doc,
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm.get(),
|
||||
UNASSIGNED_SEQ_NO, primaryTerm.get(),
|
||||
rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL,
|
||||
PRIMARY, 0, -1, false);
|
||||
PRIMARY, 0, -1, false, UNASSIGNED_SEQ_NO, 0);
|
||||
final Engine.IndexResult result = initialEngine.index(index);
|
||||
if (result.getResultType() == Engine.Result.Type.SUCCESS) {
|
||||
assertThat(result.getSeqNo(), equalTo(primarySeqNo + 1));
|
||||
|
@ -2241,7 +2290,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
indexedIds.add(id);
|
||||
primarySeqNo++;
|
||||
} else {
|
||||
assertThat(result.getSeqNo(), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO));
|
||||
assertThat(result.getSeqNo(), equalTo(UNASSIGNED_SEQ_NO));
|
||||
assertThat(initialEngine.getSeqNoStats(-1).getMaxSeqNo(), equalTo(primarySeqNo));
|
||||
}
|
||||
}
|
||||
|
@ -2371,7 +2420,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
SequenceNumbers.NO_OPS_PERFORMED;
|
||||
long maxSeqNo = userData.containsKey(SequenceNumbers.MAX_SEQ_NO) ?
|
||||
Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO)) :
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO;
|
||||
UNASSIGNED_SEQ_NO;
|
||||
// local checkpoint and max seq no shouldn't go backwards
|
||||
assertThat(localCheckpoint, greaterThanOrEqualTo(prevLocalCheckpoint));
|
||||
assertThat(maxSeqNo, greaterThanOrEqualTo(prevMaxSeqNo));
|
||||
|
@ -2485,13 +2534,13 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
document.add(new TextField("value", "test1", Field.Store.YES));
|
||||
|
||||
ParsedDocument doc = testParsedDocument("1", null, document, B_2, null);
|
||||
engine.index(new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, 1,
|
||||
engine.index(new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, 1,
|
||||
VersionType.EXTERNAL,
|
||||
Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false));
|
||||
Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0));
|
||||
|
||||
// Delete document we just added:
|
||||
engine.delete(new Engine.Delete("test", "1", newUid(doc), SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
|
||||
10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime()));
|
||||
engine.delete(new Engine.Delete("test", "1", newUid(doc), UNASSIGNED_SEQ_NO, 0,
|
||||
10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), UNASSIGNED_SEQ_NO, 0));
|
||||
|
||||
// Get should not find the document
|
||||
Engine.GetResult getResult = engine.get(newGet(true, doc), searcherFactory);
|
||||
|
@ -2505,8 +2554,8 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
}
|
||||
|
||||
// Delete non-existent document
|
||||
engine.delete(new Engine.Delete("test", "2", newUid("2"), SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
|
||||
10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime()));
|
||||
engine.delete(new Engine.Delete("test", "2", newUid("2"), UNASSIGNED_SEQ_NO, 0,
|
||||
10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), UNASSIGNED_SEQ_NO, 0));
|
||||
|
||||
// Get should not find the document (we never indexed uid=2):
|
||||
getResult = engine.get(new Engine.Get(true, false, "type", "2", newUid("2")),
|
||||
|
@ -2514,8 +2563,8 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
assertThat(getResult.exists(), equalTo(false));
|
||||
|
||||
// Try to index uid=1 with a too-old version, should fail:
|
||||
Engine.Index index = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, 2,
|
||||
VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false);
|
||||
Engine.Index index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, 2,
|
||||
VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0);
|
||||
Engine.IndexResult indexResult = engine.index(index);
|
||||
assertThat(indexResult.getResultType(), equalTo(Engine.Result.Type.FAILURE));
|
||||
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
|
@ -2525,8 +2574,8 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
assertThat(getResult.exists(), equalTo(false));
|
||||
|
||||
// Try to index uid=2 with a too-old version, should fail:
|
||||
Engine.Index index1 = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, 2,
|
||||
VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false);
|
||||
Engine.Index index1 = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, 2,
|
||||
VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0);
|
||||
indexResult = engine.index(index1);
|
||||
assertThat(indexResult.getResultType(), equalTo(Engine.Result.Type.FAILURE));
|
||||
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
|
@ -2612,8 +2661,8 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
store.associateIndexWithNewTranslog(translogUUID);
|
||||
ParsedDocument doc = testParsedDocument(Integer.toString(0), null, testDocument(),
|
||||
new BytesArray("{}"), null);
|
||||
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
|
||||
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0);
|
||||
|
||||
try (InternalEngine engine = createEngine(config)) {
|
||||
engine.index(firstIndexRequest);
|
||||
|
@ -2702,8 +2751,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
// expected
|
||||
}
|
||||
// when a new translog is created it should be ok
|
||||
final String translogUUID = Translog.createEmptyTranslog(primaryTranslogDir,
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO, shardId, primaryTerm);
|
||||
final String translogUUID = Translog.createEmptyTranslog(primaryTranslogDir, UNASSIGNED_SEQ_NO, shardId, primaryTerm);
|
||||
store.associateIndexWithNewTranslog(translogUUID);
|
||||
EngineConfig config = config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null);
|
||||
engine = new InternalEngine(config);
|
||||
|
@ -2716,10 +2764,9 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
final int numDocs = randomIntBetween(1, 10);
|
||||
try (InternalEngine engine = createEngine(store, translogPath)) {
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(),
|
||||
new BytesArray("{}"), null);
|
||||
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
|
||||
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
|
||||
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0);
|
||||
Engine.IndexResult indexResult = engine.index(firstIndexRequest);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
}
|
||||
|
@ -2813,10 +2860,9 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
public void testSkipTranslogReplay() throws IOException {
|
||||
final int numDocs = randomIntBetween(1, 10);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
ParsedDocument doc = testParsedDocument(Integer.toString(i), null,
|
||||
testDocument(), new BytesArray("{}"), null);
|
||||
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc,
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
|
||||
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
|
||||
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0);
|
||||
Engine.IndexResult indexResult = engine.index(firstIndexRequest);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
}
|
||||
|
@ -2855,11 +2901,9 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
final LongSupplier inSyncGlobalCheckpointSupplier = () -> this.engine.getLocalCheckpoint();
|
||||
final int numDocs = randomIntBetween(1, 10);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(),
|
||||
new BytesArray("{}"), null);
|
||||
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc,
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY,
|
||||
System.nanoTime(), -1, false);
|
||||
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
|
||||
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0);
|
||||
Engine.IndexResult indexResult = engine.index(firstIndexRequest);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
}
|
||||
|
@ -2891,10 +2935,9 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
|
||||
final boolean flush = randomBoolean();
|
||||
int randomId = randomIntBetween(numDocs + 1, numDocs + 10);
|
||||
ParsedDocument doc =
|
||||
testParsedDocument(Integer.toString(randomId), null, testDocument(), new BytesArray("{}"), null);
|
||||
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
|
||||
1, VersionType.EXTERNAL, PRIMARY, System.nanoTime(), -1, false);
|
||||
ParsedDocument doc = testParsedDocument(Integer.toString(randomId), null, testDocument(), new BytesArray("{}"), null);
|
||||
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, 1,
|
||||
VersionType.EXTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0);
|
||||
Engine.IndexResult indexResult = engine.index(firstIndexRequest);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
if (flush) {
|
||||
|
@ -2903,8 +2946,8 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
}
|
||||
|
||||
doc = testParsedDocument(Integer.toString(randomId), null, testDocument(), new BytesArray("{}"), null);
|
||||
Engine.Index idxRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, 2,
|
||||
VersionType.EXTERNAL, PRIMARY, System.nanoTime(), -1, false);
|
||||
Engine.Index idxRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, 2,
|
||||
VersionType.EXTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0);
|
||||
Engine.IndexResult result = engine.index(idxRequest);
|
||||
engine.refresh("test");
|
||||
assertThat(result.getVersion(), equalTo(2L));
|
||||
|
@ -2937,10 +2980,9 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
public void testRecoverFromForeignTranslog() throws IOException {
|
||||
final int numDocs = randomIntBetween(1, 10);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
ParsedDocument doc =
|
||||
testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
|
||||
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
|
||||
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
|
||||
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0);
|
||||
Engine.IndexResult index = engine.index(firstIndexRequest);
|
||||
assertThat(index.getVersion(), equalTo(1L));
|
||||
}
|
||||
|
@ -2968,7 +3010,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
|
||||
IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5),
|
||||
config.getExternalRefreshListener(), config.getInternalRefreshListener(), null,
|
||||
new NoneCircuitBreakerService(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm::get, tombstoneDocSupplier());
|
||||
new NoneCircuitBreakerService(), () -> UNASSIGNED_SEQ_NO, primaryTerm::get, tombstoneDocSupplier());
|
||||
expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig));
|
||||
|
||||
engine = createEngine(store, primaryTranslogDir); // and recover again!
|
||||
|
@ -3265,7 +3307,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
Engine.Index retry = appendOnlyReplica(doc, true, 1, randomIntBetween(0, 5));
|
||||
Engine.Delete delete = new Engine.Delete(operation.type(), operation.id(), operation.uid(),
|
||||
Math.max(retry.seqNo(), operation.seqNo())+1, operation.primaryTerm(), operation.version()+1,
|
||||
operation.versionType(), REPLICA, operation.startTime()+1);
|
||||
operation.versionType(), REPLICA, operation.startTime()+1, UNASSIGNED_SEQ_NO, 0);
|
||||
// operations with a seq# equal or lower to the local checkpoint are not indexed to lucene
|
||||
// and the version lookup is skipped
|
||||
final boolean belowLckp = operation.seqNo() == 0 && retry.seqNo() == 0;
|
||||
|
@ -3424,19 +3466,19 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
boolean isRetry = false;
|
||||
long autoGeneratedIdTimestamp = 0;
|
||||
|
||||
Engine.Index index = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry);
|
||||
Engine.Index index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0);
|
||||
Engine.IndexResult indexResult = engine.index(index);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
|
||||
index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(),
|
||||
null, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry);
|
||||
null, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0);
|
||||
indexResult = replicaEngine.index(index);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
|
||||
isRetry = true;
|
||||
index = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY,
|
||||
VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry);
|
||||
index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL,
|
||||
PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0);
|
||||
indexResult = engine.index(index);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
engine.refresh("test");
|
||||
|
@ -3446,7 +3488,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
}
|
||||
|
||||
index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(),
|
||||
null, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry);
|
||||
null, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0);
|
||||
indexResult = replicaEngine.index(index);
|
||||
assertThat(indexResult.getResultType(), equalTo(Engine.Result.Type.SUCCESS));
|
||||
replicaEngine.refresh("test");
|
||||
|
@ -3463,20 +3505,19 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
boolean isRetry = true;
|
||||
long autoGeneratedIdTimestamp = 0;
|
||||
|
||||
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO,
|
||||
0, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry);
|
||||
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0);
|
||||
Engine.IndexResult result = engine.index(firstIndexRequest);
|
||||
assertThat(result.getVersion(), equalTo(1L));
|
||||
|
||||
Engine.Index firstIndexRequestReplica = new Engine.Index(newUid(doc), doc, result.getSeqNo(),
|
||||
firstIndexRequest.primaryTerm(), result.getVersion(), null, REPLICA, System.nanoTime(),
|
||||
autoGeneratedIdTimestamp, isRetry);
|
||||
Engine.Index firstIndexRequestReplica = new Engine.Index(newUid(doc), doc, result.getSeqNo(), firstIndexRequest.primaryTerm(),
|
||||
result.getVersion(), null, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0);
|
||||
Engine.IndexResult indexReplicaResult = replicaEngine.index(firstIndexRequestReplica);
|
||||
assertThat(indexReplicaResult.getVersion(), equalTo(1L));
|
||||
|
||||
isRetry = false;
|
||||
Engine.Index secondIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO,
|
||||
0, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry);
|
||||
Engine.Index secondIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0);
|
||||
Engine.IndexResult indexResult = engine.index(secondIndexRequest);
|
||||
assertTrue(indexResult.isCreated());
|
||||
engine.refresh("test");
|
||||
|
@ -3486,7 +3527,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
}
|
||||
|
||||
Engine.Index secondIndexRequestReplica = new Engine.Index(newUid(doc), doc, result.getSeqNo(), secondIndexRequest.primaryTerm(),
|
||||
result.getVersion(), null, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry);
|
||||
result.getVersion(), null, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0);
|
||||
replicaEngine.index(secondIndexRequestReplica);
|
||||
replicaEngine.refresh("test");
|
||||
try (Engine.Searcher searcher = replicaEngine.acquireSearcher("test")) {
|
||||
|
@ -3504,13 +3545,13 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
}
|
||||
|
||||
public Engine.Index appendOnlyPrimary(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp) {
|
||||
return new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY,
|
||||
VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, retry);
|
||||
return new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL,
|
||||
Engine.Operation.Origin.PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, retry, UNASSIGNED_SEQ_NO, 0);
|
||||
}
|
||||
|
||||
public Engine.Index appendOnlyReplica(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp, final long seqNo) {
|
||||
return new Engine.Index(newUid(doc), doc, seqNo, 2, 1, null,
|
||||
Engine.Operation.Origin.REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, retry);
|
||||
Engine.Operation.Origin.REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, retry, UNASSIGNED_SEQ_NO, 0);
|
||||
}
|
||||
|
||||
public void testRetryConcurrently() throws InterruptedException, IOException {
|
||||
|
@ -3762,7 +3803,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
Tuple<Long, Long> seqID = getSequenceID(engine, new Engine.Get(false, false,
|
||||
"type", "2", newUid("1")));
|
||||
// Non-existent doc returns no seqnum and no primary term
|
||||
assertThat(seqID.v1(), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO));
|
||||
assertThat(seqID.v1(), equalTo(UNASSIGNED_SEQ_NO));
|
||||
assertThat(seqID.v2(), equalTo(0L));
|
||||
|
||||
// create a document
|
||||
|
@ -3793,9 +3834,9 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
document = testDocumentWithTextField();
|
||||
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
|
||||
doc = testParsedDocument("1", null, document, B_1, null);
|
||||
engine.index(new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 3,
|
||||
engine.index(new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 3,
|
||||
Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY,
|
||||
System.nanoTime(), -1, false));
|
||||
System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0));
|
||||
engine.refresh("test");
|
||||
|
||||
seqID = getSequenceID(engine, newGet(false, doc));
|
||||
|
@ -3823,11 +3864,10 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
final ParsedDocument doc = EngineTestCase.createParsedDoc(id, null);
|
||||
if (isIndexing) {
|
||||
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(),
|
||||
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(),
|
||||
-1, true));
|
||||
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), -1, true, UNASSIGNED_SEQ_NO, 0L));
|
||||
} else {
|
||||
operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(),
|
||||
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis()));
|
||||
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), UNASSIGNED_SEQ_NO, 0L));
|
||||
}
|
||||
}
|
||||
seqNo++;
|
||||
|
@ -3854,7 +3894,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
equalTo(latestOps.get(id).operationType() == Engine.Operation.TYPE.INDEX));
|
||||
}
|
||||
assertThat(VersionsAndSeqNoResolver.loadDocIdAndVersion(
|
||||
searcher.reader(), newUid("any-" + between(1, 10))), nullValue());
|
||||
searcher.reader(), newUid("any-" + between(1, 10)), randomBoolean()), nullValue());
|
||||
Map<String, Long> liveOps = latestOps.entrySet().stream()
|
||||
.filter(e -> e.getValue().operationType() == Engine.Operation.TYPE.INDEX)
|
||||
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().seqNo()));
|
||||
|
@ -3989,7 +4029,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
final AtomicLong sequenceNumber = new AtomicLong();
|
||||
final Engine.Operation.Origin origin = randomFrom(LOCAL_TRANSLOG_RECOVERY, PEER_RECOVERY, PRIMARY, REPLICA);
|
||||
final LongSupplier sequenceNumberSupplier =
|
||||
origin == PRIMARY ? () -> SequenceNumbers.UNASSIGNED_SEQ_NO : sequenceNumber::getAndIncrement;
|
||||
origin == PRIMARY ? () -> UNASSIGNED_SEQ_NO : sequenceNumber::getAndIncrement;
|
||||
final Supplier<ParsedDocument> doc = () -> {
|
||||
final Document document = testDocumentWithTextField();
|
||||
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
|
||||
|
@ -4009,7 +4049,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
origin,
|
||||
System.nanoTime(),
|
||||
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
|
||||
false);
|
||||
false, UNASSIGNED_SEQ_NO, 0);
|
||||
operations.add(index);
|
||||
} else {
|
||||
final Engine.Delete delete = new Engine.Delete(
|
||||
|
@ -4021,7 +4061,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
i,
|
||||
origin == PRIMARY ? VersionType.EXTERNAL : null,
|
||||
origin,
|
||||
System.nanoTime());
|
||||
System.nanoTime(), UNASSIGNED_SEQ_NO, 0);
|
||||
operations.add(delete);
|
||||
}
|
||||
}
|
||||
|
@ -4270,7 +4310,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), get.uid());
|
||||
if (docIdAndSeqNo == null) {
|
||||
primaryTerm = 0;
|
||||
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
|
||||
seqNo = UNASSIGNED_SEQ_NO;
|
||||
} else {
|
||||
seqNo = docIdAndSeqNo.seqNo;
|
||||
NumericDocValues primaryTerms = docIdAndSeqNo.context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
|
||||
|
@ -4545,14 +4585,14 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
final Engine.Index index = new Engine.Index(
|
||||
new Term("_id", parsedDocument.id()),
|
||||
parsedDocument,
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO,
|
||||
UNASSIGNED_SEQ_NO,
|
||||
randomIntBetween(1, 8),
|
||||
Versions.MATCH_ANY,
|
||||
VersionType.INTERNAL,
|
||||
Engine.Operation.Origin.PRIMARY,
|
||||
System.currentTimeMillis(),
|
||||
System.currentTimeMillis(),
|
||||
randomBoolean());
|
||||
randomBoolean(), UNASSIGNED_SEQ_NO, 0);
|
||||
final Engine.IndexResult indexResult = e.index(index);
|
||||
assertThat(indexResult.getSeqNo(), equalTo(seqNo));
|
||||
assertThat(seqNoGenerator.get(), equalTo(seqNo + 1));
|
||||
|
@ -4561,12 +4601,12 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
type,
|
||||
id,
|
||||
new Term("_id", parsedDocument.id()),
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO,
|
||||
UNASSIGNED_SEQ_NO,
|
||||
randomIntBetween(1, 8),
|
||||
Versions.MATCH_ANY,
|
||||
VersionType.INTERNAL,
|
||||
Engine.Operation.Origin.PRIMARY,
|
||||
System.currentTimeMillis());
|
||||
System.currentTimeMillis(), UNASSIGNED_SEQ_NO, 0);
|
||||
final Engine.DeleteResult deleteResult = e.delete(delete);
|
||||
assertThat(deleteResult.getSeqNo(), equalTo(seqNo + 1));
|
||||
assertThat(seqNoGenerator.get(), equalTo(seqNo + 2));
|
||||
|
@ -4620,7 +4660,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
final List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
|
||||
// Keep only one safe commit as the oldest commit.
|
||||
final IndexCommit safeCommit = commits.get(0);
|
||||
if (lastSyncedGlobalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) {
|
||||
if (lastSyncedGlobalCheckpoint == UNASSIGNED_SEQ_NO) {
|
||||
// If the global checkpoint is still unassigned, we keep an empty(eg. initial) commit as a safe commit.
|
||||
assertThat(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
|
||||
equalTo(SequenceNumbers.NO_OPS_PERFORMED));
|
||||
|
@ -4887,29 +4927,27 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
.put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), TimeValue.timeValueMillis(1))).build();
|
||||
engine.engineConfig.getIndexSettings().updateIndexMetaData(indexMetaData);
|
||||
engine.onSettingsChanged();
|
||||
ParsedDocument document =
|
||||
testParsedDocument(Integer.toString(0), null, testDocumentWithTextField(), SOURCE, null);
|
||||
final Engine.Index doc = new Engine.Index(newUid(document), document, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false);
|
||||
ParsedDocument document = testParsedDocument(Integer.toString(0), null, testDocumentWithTextField(), SOURCE, null);
|
||||
final Engine.Index doc = new Engine.Index(newUid(document), document, UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false,
|
||||
UNASSIGNED_SEQ_NO, 0);
|
||||
// first index an append only document and then delete it. such that we have it in the tombstones
|
||||
engine.index(doc);
|
||||
engine.delete(new Engine.Delete(doc.type(), doc.id(), doc.uid(), primaryTerm.get()));
|
||||
|
||||
// now index more append only docs and refresh so we re-enabel the optimization for unsafe version map
|
||||
ParsedDocument document1 =
|
||||
testParsedDocument(Integer.toString(1), null, testDocumentWithTextField(), SOURCE, null);
|
||||
engine.index(new Engine.Index(newUid(document1), document1, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false));
|
||||
ParsedDocument document1 = testParsedDocument(Integer.toString(1), null, testDocumentWithTextField(), SOURCE, null);
|
||||
engine.index(new Engine.Index(newUid(document1), document1, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL,
|
||||
Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false, UNASSIGNED_SEQ_NO, 0));
|
||||
engine.refresh("test");
|
||||
ParsedDocument document2 =
|
||||
testParsedDocument(Integer.toString(2), null, testDocumentWithTextField(), SOURCE, null);
|
||||
engine.index(new Engine.Index(newUid(document2), document2, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false));
|
||||
ParsedDocument document2 = testParsedDocument(Integer.toString(2), null, testDocumentWithTextField(), SOURCE, null);
|
||||
engine.index(new Engine.Index(newUid(document2), document2, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL,
|
||||
Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false, UNASSIGNED_SEQ_NO, 0));
|
||||
engine.refresh("test");
|
||||
ParsedDocument document3 =
|
||||
testParsedDocument(Integer.toString(3), null, testDocumentWithTextField(), SOURCE, null);
|
||||
final Engine.Index doc3 = new Engine.Index(newUid(document3), document3, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false);
|
||||
ParsedDocument document3 = testParsedDocument(Integer.toString(3), null, testDocumentWithTextField(), SOURCE, null);
|
||||
final Engine.Index doc3 = new Engine.Index(newUid(document3), document3, UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false,
|
||||
UNASSIGNED_SEQ_NO, 0);
|
||||
engine.index(doc3);
|
||||
engine.engineConfig.setEnableGcDeletes(true);
|
||||
// once we are here the version map is unsafe again and we need to do a refresh inside the get calls to ensure we
|
||||
|
@ -5031,7 +5069,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
engine.index(doc);
|
||||
} else {
|
||||
engine.delete(new Engine.Delete(doc.type(), doc.id(), doc.uid(), seqno, doc.primaryTerm(),
|
||||
doc.version(), doc.versionType(), doc.origin(), threadPool.relativeTimeInMillis()));
|
||||
doc.version(), doc.versionType(), doc.origin(), threadPool.relativeTimeInMillis(), UNASSIGNED_SEQ_NO, 0));
|
||||
}
|
||||
maxSeqNoOfNonAppendOnly = seqno;
|
||||
} else { // On primary - do not update max_seqno for non-append-only operations
|
||||
|
@ -5103,7 +5141,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
ParsedDocument doc = testParsedDocument(Long.toString(seqNos.get(i)), null, testDocument(),
|
||||
new BytesArray("{}"), null);
|
||||
Engine.Index index = new Engine.Index(newUid(doc), doc, seqNos.get(i), 0,
|
||||
1, null, REPLICA, System.nanoTime(), -1, false);
|
||||
1, null, REPLICA, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0);
|
||||
engine.index(index);
|
||||
if (randomBoolean()) {
|
||||
engine.flush();
|
||||
|
@ -5395,11 +5433,11 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
final ParsedDocument doc = EngineTestCase.createParsedDoc(Integer.toString(between(1, 100)), null);
|
||||
if (randomBoolean()) {
|
||||
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(),
|
||||
i, null, Engine.Operation.Origin.REPLICA,
|
||||
threadPool.relativeTimeInMillis(), -1, true));
|
||||
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), -1, true, UNASSIGNED_SEQ_NO, 0L));
|
||||
} else if (randomBoolean()) {
|
||||
operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(),
|
||||
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis()));
|
||||
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(),
|
||||
UNASSIGNED_SEQ_NO, 0L));
|
||||
} else {
|
||||
operations.add(new Engine.NoOp(seqNo, primaryTerm.get(), Engine.Operation.Origin.REPLICA,
|
||||
threadPool.relativeTimeInMillis(), "test-" + i));
|
||||
|
|
|
@ -53,7 +53,7 @@ public class ReadOnlyEngineTests extends EngineTestCase {
|
|||
}
|
||||
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
|
||||
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
|
||||
System.nanoTime(), -1, false));
|
||||
System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
|
||||
if (get == null || rarely()) {
|
||||
get = newGet(randomBoolean(), doc);
|
||||
}
|
||||
|
@ -122,7 +122,7 @@ public class ReadOnlyEngineTests extends EngineTestCase {
|
|||
}
|
||||
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
|
||||
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
|
||||
System.nanoTime(), -1, false));
|
||||
System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
|
||||
if (rarely()) {
|
||||
engine.flush();
|
||||
}
|
||||
|
|
|
@ -2810,7 +2810,7 @@ public class TranslogTests extends ESTestCase {
|
|||
null);
|
||||
|
||||
Engine.Index eIndex = new Engine.Index(newUid(doc), doc, randomSeqNum, randomPrimaryTerm,
|
||||
1, VersionType.INTERNAL, Origin.PRIMARY, 0, 0, false);
|
||||
1, VersionType.INTERNAL, Origin.PRIMARY, 0, 0, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
|
||||
Engine.IndexResult eIndexResult = new Engine.IndexResult(1, randomPrimaryTerm, randomSeqNum, true);
|
||||
Translog.Index index = new Translog.Index(eIndex, eIndexResult);
|
||||
|
||||
|
@ -2821,7 +2821,7 @@ public class TranslogTests extends ESTestCase {
|
|||
assertEquals(index, serializedIndex);
|
||||
|
||||
Engine.Delete eDelete = new Engine.Delete(doc.type(), doc.id(), newUid(doc), randomSeqNum, randomPrimaryTerm,
|
||||
2, VersionType.INTERNAL, Origin.PRIMARY, 0);
|
||||
2, VersionType.INTERNAL, Origin.PRIMARY, 0, SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
|
||||
Engine.DeleteResult eDeleteResult = new Engine.DeleteResult(2, randomPrimaryTerm, randomSeqNum, true);
|
||||
Translog.Delete delete = new Translog.Delete(eDelete, eDeleteResult);
|
||||
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.elasticsearch.index.engine.Engine;
|
|||
import org.elasticsearch.index.engine.InternalEngineTests;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.mapper.Uid;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardTestCase;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
@ -254,7 +255,8 @@ public class FlushIT extends ESIntegTestCase {
|
|||
private void indexDoc(Engine engine, String id) throws IOException {
|
||||
final ParsedDocument doc = InternalEngineTests.createParsedDoc(id, null);
|
||||
final Engine.IndexResult indexResult = engine.index(new Engine.Index(new Term("_id", Uid.encodeId(doc.id())), doc,
|
||||
engine.getLocalCheckpoint() + 1, 1L, 1L, null, Engine.Operation.Origin.REPLICA, randomLong(), -1L, false));
|
||||
engine.getLocalCheckpoint() + 1, 1L, 1L, null, Engine.Operation.Origin.REPLICA, randomLong(), -1L, false,
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
|
||||
assertThat(indexResult.getFailure(), nullValue());
|
||||
}
|
||||
|
||||
|
|
|
@ -633,11 +633,12 @@ public abstract class EngineTestCase extends ESTestCase {
|
|||
protected Engine.Index replicaIndexForDoc(ParsedDocument doc, long version, long seqNo,
|
||||
boolean isRetry) {
|
||||
return new Engine.Index(newUid(doc), doc, seqNo, primaryTerm.get(), version, null, Engine.Operation.Origin.REPLICA,
|
||||
System.nanoTime(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry);
|
||||
System.nanoTime(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry, SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
|
||||
}
|
||||
|
||||
protected Engine.Delete replicaDeleteForDoc(String id, long version, long seqNo, long startTime) {
|
||||
return new Engine.Delete("test", id, newUid(id), seqNo, 1, version, null, Engine.Operation.Origin.REPLICA, startTime);
|
||||
return new Engine.Delete("test", id, newUid(id), seqNo, 1, version, null, Engine.Operation.Origin.REPLICA, startTime,
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
|
||||
}
|
||||
protected static void assertVisibleCount(InternalEngine engine, int numDocs) throws IOException {
|
||||
assertVisibleCount(engine, numDocs, true);
|
||||
|
@ -688,8 +689,8 @@ public abstract class EngineTestCase extends ESTestCase {
|
|||
version,
|
||||
forReplica ? null : versionType,
|
||||
forReplica ? REPLICA : PRIMARY,
|
||||
System.currentTimeMillis(), -1, false
|
||||
);
|
||||
System.currentTimeMillis(), -1, false,
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
|
||||
} else {
|
||||
op = new Engine.Delete("test", docId, id,
|
||||
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO,
|
||||
|
@ -697,7 +698,7 @@ public abstract class EngineTestCase extends ESTestCase {
|
|||
version,
|
||||
forReplica ? null : versionType,
|
||||
forReplica ? REPLICA : PRIMARY,
|
||||
System.currentTimeMillis());
|
||||
System.currentTimeMillis(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
|
||||
}
|
||||
ops.add(op);
|
||||
}
|
||||
|
@ -722,11 +723,11 @@ public abstract class EngineTestCase extends ESTestCase {
|
|||
switch (opType) {
|
||||
case INDEX:
|
||||
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(),
|
||||
i, null, Engine.Operation.Origin.REPLICA, startTime, -1, true));
|
||||
i, null, Engine.Operation.Origin.REPLICA, startTime, -1, true, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
|
||||
break;
|
||||
case DELETE:
|
||||
operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(),
|
||||
i, null, Engine.Operation.Origin.REPLICA, startTime));
|
||||
i, null, Engine.Operation.Origin.REPLICA, startTime, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
|
||||
break;
|
||||
case NO_OP:
|
||||
operations.add(new Engine.NoOp(seqNo, primaryTerm.get(), Engine.Operation.Origin.REPLICA, startTime, "test-" + i));
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.elasticsearch.index.mapper.DocumentMapperForType;
|
|||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.mapper.Mapping;
|
||||
import org.elasticsearch.index.mapper.RootObjectMapper;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.similarity.SimilarityService;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
|
@ -129,7 +130,7 @@ public class TranslogHandler implements Engine.TranslogRecoveryRunner {
|
|||
case DELETE:
|
||||
final Translog.Delete delete = (Translog.Delete) operation;
|
||||
final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(),
|
||||
delete.primaryTerm(), delete.version(), null, origin, System.nanoTime());
|
||||
delete.primaryTerm(), delete.version(), null, origin, System.nanoTime(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
|
||||
return engineDelete;
|
||||
case NO_OP:
|
||||
final Translog.NoOp noOp = (Translog.NoOp) operation;
|
||||
|
|
|
@ -205,7 +205,8 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
randomNonNegativeLong(),
|
||||
VersionType.EXTERNAL,
|
||||
origin,
|
||||
System.currentTimeMillis());
|
||||
System.currentTimeMillis(),
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
|
||||
|
||||
consumer.accept(followingEngine, delete);
|
||||
}
|
||||
|
@ -295,7 +296,8 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
final long version = randomBoolean() ? 1 : randomNonNegativeLong();
|
||||
final ParsedDocument parsedDocument = EngineTestCase.createParsedDoc(id, null);
|
||||
return new Engine.Index(EngineTestCase.newUid(parsedDocument), parsedDocument, seqNo, primaryTerm.get(), version,
|
||||
VersionType.EXTERNAL, origin, System.currentTimeMillis(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, randomBoolean());
|
||||
VersionType.EXTERNAL, origin, System.currentTimeMillis(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, randomBoolean(),
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
|
||||
}
|
||||
|
||||
private Engine.Index indexForPrimary(String id) {
|
||||
|
@ -315,11 +317,13 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
if (op instanceof Engine.Index) {
|
||||
Engine.Index index = (Engine.Index) op;
|
||||
result = engine.index(new Engine.Index(index.uid(), index.parsedDoc(), index.seqNo(), primaryTerm, index.version(),
|
||||
versionType, origin, index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry()));
|
||||
versionType, origin, index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry(),
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
|
||||
} else if (op instanceof Engine.Delete) {
|
||||
Engine.Delete delete = (Engine.Delete) op;
|
||||
result = engine.delete(new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), primaryTerm,
|
||||
delete.version(), versionType, origin, delete.startTime()));
|
||||
delete.version(), versionType, origin, delete.startTime(),
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
|
||||
} else {
|
||||
Engine.NoOp noOp = (Engine.NoOp) op;
|
||||
result = engine.noOp(new Engine.NoOp(noOp.seqNo(), primaryTerm, origin, noOp.startTime(), noOp.reason()));
|
||||
|
@ -575,10 +579,12 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
ParsedDocument doc = randomBoolean() ? EngineTestCase.createParsedDoc(docId, null) : nestedDocFunc.apply(docId, randomInt(3));
|
||||
if (randomBoolean()) {
|
||||
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, i, primaryTerm.get(), 1L,
|
||||
VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis(), -1, true));
|
||||
VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis(), -1, true,
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
|
||||
} else if (randomBoolean()) {
|
||||
operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), i, primaryTerm.get(), 1L,
|
||||
VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis()));
|
||||
VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis(),
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
|
||||
} else {
|
||||
operations.add(new Engine.NoOp(i, primaryTerm.get(), Engine.Operation.Origin.PRIMARY,
|
||||
threadPool.relativeTimeInMillis(), "test-" + i));
|
||||
|
|
|
@ -176,7 +176,7 @@ public class FrozenEngineTests extends EngineTestCase {
|
|||
numDocsAdded++;
|
||||
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
|
||||
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
|
||||
System.nanoTime(), -1, false));
|
||||
System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
|
||||
if (rarely()) {
|
||||
engine.flush();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue