diff --git a/core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDAndVersionLookup.java b/core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDAndVersionLookup.java index 6e4e3e1923d..caaf7fc84af 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDAndVersionLookup.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDAndVersionLookup.java @@ -29,7 +29,7 @@ import org.apache.lucene.index.TermsEnum; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.common.lucene.uid.Versions.DocIdAndVersion; +import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion; import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.mapper.VersionFieldMapper; @@ -51,49 +51,66 @@ final class PerThreadIDAndVersionLookup { private final TermsEnum termsEnum; /** _version data */ private final NumericDocValues versions; + /** Reused for iteration (when the term exists) */ private PostingsEnum docsEnum; + /** used for assertions to make sure class usage meets assumptions */ + private final Object readerKey; + /** * Initialize lookup for the provided segment */ PerThreadIDAndVersionLookup(LeafReader reader) throws IOException { - TermsEnum termsEnum = null; - NumericDocValues versions = null; - Fields fields = reader.fields(); - if (fields != null) { - Terms terms = fields.terms(UidFieldMapper.NAME); - if (terms != null) { - termsEnum = terms.iterator(); - assert termsEnum != null; - versions = reader.getNumericDocValues(VersionFieldMapper.NAME); - assert versions != null; - } + Terms terms = fields.terms(UidFieldMapper.NAME); + termsEnum = terms.iterator(); + if (termsEnum == null) { + throw new IllegalArgumentException("reader misses the [" + UidFieldMapper.NAME + + "] field"); } - - this.versions = versions; - this.termsEnum = termsEnum; + versions = reader.getNumericDocValues(VersionFieldMapper.NAME); + if (versions == null) { + throw new IllegalArgumentException("reader misses the [" + VersionFieldMapper.NAME + + "] field"); + } + Object readerKey = null; + assert (readerKey = reader.getCoreCacheKey()) != null; + this.readerKey = readerKey; } /** Return null if id is not found. */ - public DocIdAndVersion lookup(BytesRef id, Bits liveDocs, LeafReaderContext context) throws IOException { + public DocIdAndVersion lookupVersion(BytesRef id, Bits liveDocs, LeafReaderContext context) + throws IOException { + assert context.reader().getCoreCacheKey().equals(readerKey) : + "context's reader is not the same as the reader class was initialized on."; + int docID = getDocID(id, liveDocs); + + if (docID != DocIdSetIterator.NO_MORE_DOCS) { + return new DocIdAndVersion(docID, versions.get(docID), context); + } else { + return null; + } + } + + /** + * returns the internal lucene doc id for the given id bytes. + * {@link DocIdSetIterator#NO_MORE_DOCS} is returned if not found + * */ + private int getDocID(BytesRef id, Bits liveDocs) throws IOException { if (termsEnum.seekExact(id)) { + int docID = DocIdSetIterator.NO_MORE_DOCS; // there may be more than one matching docID, in the case of nested docs, so we want the last one: docsEnum = termsEnum.postings(docsEnum, 0); - int docID = DocIdSetIterator.NO_MORE_DOCS; for (int d = docsEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) { if (liveDocs != null && liveDocs.get(d) == false) { continue; } docID = d; } - - if (docID != DocIdSetIterator.NO_MORE_DOCS) { - return new DocIdAndVersion(docID, versions.get(docID), context); - } + return docID; + } else { + return DocIdSetIterator.NO_MORE_DOCS; } - - return null; } } diff --git a/core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java b/core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java index 6c5ffed0938..b7c62a8f244 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java @@ -19,32 +19,7 @@ package org.elasticsearch.common.lucene.uid; -import org.apache.lucene.index.Fields; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.LeafReader; -import org.apache.lucene.index.LeafReader.CoreClosedListener; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.NumericDocValues; -import org.apache.lucene.index.PostingsEnum; -import org.apache.lucene.index.SortedNumericDocValues; -import org.apache.lucene.index.Term; -import org.apache.lucene.index.Terms; -import org.apache.lucene.index.TermsEnum; -import org.apache.lucene.search.DocIdSetIterator; -import org.apache.lucene.util.Bits; -import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.CloseableThreadLocal; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.index.mapper.SeqNoFieldMapper; -import org.elasticsearch.index.mapper.UidFieldMapper; -import org.elasticsearch.index.seqno.SequenceNumbersService; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.ConcurrentMap; - -/** Utility class to resolve the Lucene doc ID and version for a given uid. */ -public class Versions { +public final class Versions { /** used to indicate the write operation should succeed regardless of current version **/ public static final long MATCH_ANY = -3L; @@ -59,210 +34,4 @@ public class Versions { * i.e., not found in the index and/or found as deleted (with version) in the version map */ public static final long MATCH_DELETED = -4L; - - // TODO: is there somewhere else we can store these? - static final ConcurrentMap> lookupStates = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); - - // Evict this reader from lookupStates once it's closed: - private static final CoreClosedListener removeLookupState = new CoreClosedListener() { - @Override - public void onClose(Object key) { - CloseableThreadLocal ctl = lookupStates.remove(key); - if (ctl != null) { - ctl.close(); - } - } - }; - - private static PerThreadIDAndVersionLookup getLookupState(LeafReader reader) throws IOException { - Object key = reader.getCoreCacheKey(); - CloseableThreadLocal ctl = lookupStates.get(key); - if (ctl == null) { - // First time we are seeing this reader's core; make a - // new CTL: - ctl = new CloseableThreadLocal<>(); - CloseableThreadLocal other = lookupStates.putIfAbsent(key, ctl); - if (other == null) { - // Our CTL won, we must remove it when the - // core is closed: - reader.addCoreClosedListener(removeLookupState); - } else { - // Another thread beat us to it: just use - // their CTL: - ctl = other; - } - } - - PerThreadIDAndVersionLookup lookupState = ctl.get(); - if (lookupState == null) { - lookupState = new PerThreadIDAndVersionLookup(reader); - ctl.set(lookupState); - } - - return lookupState; - } - - private Versions() { - } - - /** Wraps an {@link LeafReaderContext}, a doc ID relative to the context doc base and a version. */ - public static class DocIdAndVersion { - public final int docId; - public final long version; - public final LeafReaderContext context; - - public DocIdAndVersion(int docId, long version, LeafReaderContext context) { - this.docId = docId; - this.version = version; - this.context = context; - } - } - - /** - * Load the internal doc ID and version for the uid from the reader, returning
    - *
  • null if the uid wasn't found, - *
  • a doc ID and a version otherwise - *
- */ - public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException { - assert term.field().equals(UidFieldMapper.NAME); - List leaves = reader.leaves(); - if (leaves.isEmpty()) { - return null; - } - // iterate backwards to optimize for the frequently updated documents - // which are likely to be in the last segments - for (int i = leaves.size() - 1; i >= 0; i--) { - LeafReaderContext context = leaves.get(i); - LeafReader leaf = context.reader(); - PerThreadIDAndVersionLookup lookup = getLookupState(leaf); - DocIdAndVersion result = lookup.lookup(term.bytes(), leaf.getLiveDocs(), context); - if (result != null) { - return result; - } - } - return null; - } - - /** - * Load the version for the uid from the reader, returning
    - *
  • {@link #NOT_FOUND} if no matching doc exists, - *
  • the version associated with the provided uid otherwise - *
- */ - public static long loadVersion(IndexReader reader, Term term) throws IOException { - final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term); - return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version; - } - - - /** - * Returns the sequence number for the given uid term, returning - * {@code SequenceNumbersService.UNASSIGNED_SEQ_NO} if none is found. - */ - public static long loadSeqNo(IndexReader reader, Term term) throws IOException { - assert term.field().equals(UidFieldMapper.NAME) : "can only load _seq_no by uid"; - List leaves = reader.leaves(); - if (leaves.isEmpty()) { - return SequenceNumbersService.UNASSIGNED_SEQ_NO; - } - - // iterate backwards to optimize for the frequently updated documents - // which are likely to be in the last segments - for (int i = leaves.size() - 1; i >= 0; i--) { - LeafReader leaf = leaves.get(i).reader(); - Bits liveDocs = leaf.getLiveDocs(); - - TermsEnum termsEnum = null; - SortedNumericDocValues dvField = null; - PostingsEnum docsEnum = null; - - final Fields fields = leaf.fields(); - if (fields != null) { - Terms terms = fields.terms(UidFieldMapper.NAME); - if (terms != null) { - termsEnum = terms.iterator(); - assert termsEnum != null; - dvField = leaf.getSortedNumericDocValues(SeqNoFieldMapper.NAME); - assert dvField != null; - - final BytesRef id = term.bytes(); - if (termsEnum.seekExact(id)) { - // there may be more than one matching docID, in the - // case of nested docs, so we want the last one: - docsEnum = termsEnum.postings(docsEnum, 0); - int docID = DocIdSetIterator.NO_MORE_DOCS; - for (int d = docsEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) { - if (liveDocs != null && liveDocs.get(d) == false) { - continue; - } - docID = d; - } - - if (docID != DocIdSetIterator.NO_MORE_DOCS) { - dvField.setDocument(docID); - assert dvField.count() == 1 : "expected only a single value for _seq_no but got " + - dvField.count(); - return dvField.valueAt(0); - } - } - } - } - - } - return SequenceNumbersService.UNASSIGNED_SEQ_NO; - } - - /** - * Returns the primary term for the given uid term, returning {@code 0} if none is found. - */ - public static long loadPrimaryTerm(IndexReader reader, Term term) throws IOException { - assert term.field().equals(UidFieldMapper.NAME) : "can only load _primary_term by uid"; - List leaves = reader.leaves(); - if (leaves.isEmpty()) { - return 0; - } - - // iterate backwards to optimize for the frequently updated documents - // which are likely to be in the last segments - for (int i = leaves.size() - 1; i >= 0; i--) { - LeafReader leaf = leaves.get(i).reader(); - Bits liveDocs = leaf.getLiveDocs(); - - TermsEnum termsEnum = null; - NumericDocValues dvField = null; - PostingsEnum docsEnum = null; - - final Fields fields = leaf.fields(); - if (fields != null) { - Terms terms = fields.terms(UidFieldMapper.NAME); - if (terms != null) { - termsEnum = terms.iterator(); - assert termsEnum != null; - dvField = leaf.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); - assert dvField != null; - - final BytesRef id = term.bytes(); - if (termsEnum.seekExact(id)) { - // there may be more than one matching docID, in the - // case of nested docs, so we want the last one: - docsEnum = termsEnum.postings(docsEnum, 0); - int docID = DocIdSetIterator.NO_MORE_DOCS; - for (int d = docsEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) { - if (liveDocs != null && liveDocs.get(d) == false) { - continue; - } - docID = d; - } - - if (docID != DocIdSetIterator.NO_MORE_DOCS) { - return dvField.get(docID); - } - } - } - } - - } - return 0; - } } diff --git a/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsResolver.java b/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsResolver.java new file mode 100644 index 00000000000..fb5875cbae5 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsResolver.java @@ -0,0 +1,263 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.lucene.uid; + +import org.apache.lucene.index.Fields; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReader.CoreClosedListener; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.PostingsEnum; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.Term; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.CloseableThreadLocal; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; +import org.elasticsearch.index.mapper.UidFieldMapper; +import org.elasticsearch.index.seqno.SequenceNumbersService; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ConcurrentMap; + +import static org.elasticsearch.common.lucene.uid.Versions.NOT_FOUND; + +/** Utility class to resolve the Lucene doc ID and version for a given uid. */ +public class VersionsResolver { + + static final ConcurrentMap> + lookupStates = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); + + // Evict this reader from lookupStates once it's closed: + private static final CoreClosedListener removeLookupState = key -> { + CloseableThreadLocal ctl = lookupStates.remove(key); + if (ctl != null) { + ctl.close(); + } + }; + + private static PerThreadIDAndVersionLookup getLookupState(LeafReader reader) + throws IOException { + Object key = reader.getCoreCacheKey(); + CloseableThreadLocal ctl = lookupStates.get(key); + if (ctl == null) { + // First time we are seeing this reader's core; make a + // new CTL: + ctl = new CloseableThreadLocal<>(); + CloseableThreadLocal other = + lookupStates.putIfAbsent(key, ctl); + if (other == null) { + // Our CTL won, we must remove it when the + // core is closed: + reader.addCoreClosedListener(removeLookupState); + } else { + // Another thread beat us to it: just use + // their CTL: + ctl = other; + } + } + + PerThreadIDAndVersionLookup lookupState = ctl.get(); + if (lookupState == null) { + lookupState = new PerThreadIDAndVersionLookup(reader); + ctl.set(lookupState); + } + + return lookupState; + } + + private VersionsResolver() { + } + + /** + * Wraps an {@link LeafReaderContext}, a doc ID relative to the context doc base and + * a version. + **/ + public static class DocIdAndVersion { + public final int docId; + public final long version; + public final LeafReaderContext context; + + public DocIdAndVersion(int docId, long version, LeafReaderContext context) { + this.docId = docId; + this.version = version; + this.context = context; + } + } + + /** + * Load the internal doc ID and version for the uid from the reader, returning
    + *
  • null if the uid wasn't found, + *
  • a doc ID and a version otherwise + *
+ */ + public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) + throws IOException { + assert term.field().equals(UidFieldMapper.NAME); + List leaves = reader.leaves(); + if (leaves.isEmpty()) { + return null; + } + // iterate backwards to optimize for the frequently updated documents + // which are likely to be in the last segments + for (int i = leaves.size() - 1; i >= 0; i--) { + LeafReaderContext context = leaves.get(i); + LeafReader leaf = context.reader(); + PerThreadIDAndVersionLookup lookup = getLookupState(leaf); + DocIdAndVersion result = + lookup.lookupVersion(term.bytes(), leaf.getLiveDocs(), context); + if (result != null) { + return result; + } + } + return null; + } + + /** + * Load the version for the uid from the reader, returning
    + *
  • {@link Versions#NOT_FOUND} if no matching doc exists, + *
  • the version associated with the provided uid otherwise + *
+ */ + public static long loadVersion(IndexReader reader, Term term) throws IOException { + final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term); + return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version; + } + + /** + * Returns the sequence number for the given uid term, returning + * {@code SequenceNumbersService.UNASSIGNED_SEQ_NO} if none is found. + */ + public static long loadSeqNo(IndexReader reader, Term term) throws IOException { + assert term.field().equals(UidFieldMapper.NAME) : "can only load _seq_no by uid"; + List leaves = reader.leaves(); + if (leaves.isEmpty()) { + return SequenceNumbersService.UNASSIGNED_SEQ_NO; + } + + // iterate backwards to optimize for the frequently updated documents + // which are likely to be in the last segments + for (int i = leaves.size() - 1; i >= 0; i--) { + LeafReader leaf = leaves.get(i).reader(); + Bits liveDocs = leaf.getLiveDocs(); + + TermsEnum termsEnum = null; + SortedNumericDocValues dvField = null; + PostingsEnum docsEnum = null; + + final Fields fields = leaf.fields(); + if (fields != null) { + Terms terms = fields.terms(UidFieldMapper.NAME); + if (terms != null) { + termsEnum = terms.iterator(); + assert termsEnum != null; + dvField = leaf.getSortedNumericDocValues(SeqNoFieldMapper.NAME); + assert dvField != null; + + final BytesRef id = term.bytes(); + if (termsEnum.seekExact(id)) { + // there may be more than one matching docID, in the + // case of nested docs, so we want the last one: + docsEnum = termsEnum.postings(docsEnum, 0); + int docID = DocIdSetIterator.NO_MORE_DOCS; + for (int d = docsEnum.nextDoc(); + d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) { + if (liveDocs != null && liveDocs.get(d) == false) { + continue; + } + docID = d; + } + + if (docID != DocIdSetIterator.NO_MORE_DOCS) { + dvField.setDocument(docID); + assert dvField.count() == 1 : + "expected only a single value for _seq_no but got " + + dvField.count(); + return dvField.valueAt(0); + } + } + } + } + + } + return SequenceNumbersService.UNASSIGNED_SEQ_NO; + } + + /** + * Returns the primary term for the given uid term, returning {@code 0} if none is found. + */ + public static long loadPrimaryTerm(IndexReader reader, Term term) throws IOException { + assert term.field().equals(UidFieldMapper.NAME) : "can only load _primary_term by uid"; + List leaves = reader.leaves(); + if (leaves.isEmpty()) { + return 0; + } + + // iterate backwards to optimize for the frequently updated documents + // which are likely to be in the last segments + for (int i = leaves.size() - 1; i >= 0; i--) { + LeafReader leaf = leaves.get(i).reader(); + Bits liveDocs = leaf.getLiveDocs(); + + TermsEnum termsEnum = null; + NumericDocValues dvField = null; + PostingsEnum docsEnum = null; + + final Fields fields = leaf.fields(); + if (fields != null) { + Terms terms = fields.terms(UidFieldMapper.NAME); + if (terms != null) { + termsEnum = terms.iterator(); + assert termsEnum != null; + dvField = leaf.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); + assert dvField != null; + + final BytesRef id = term.bytes(); + if (termsEnum.seekExact(id)) { + // there may be more than one matching docID, in the + // case of nested docs, so we want the last one: + docsEnum = termsEnum.postings(docsEnum, 0); + int docID = DocIdSetIterator.NO_MORE_DOCS; + for (int d = docsEnum.nextDoc(); + d != DocIdSetIterator.NO_MORE_DOCS; + d = docsEnum.nextDoc()) { + if (liveDocs != null && liveDocs.get(d) == false) { + continue; + } + docID = d; + } + + if (docID != DocIdSetIterator.NO_MORE_DOCS) { + return dvField.get(docID); + } + } + } + } + + } + return 0; + } +} diff --git a/core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java b/core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java index dbfb2416bae..45b28d96ba2 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java +++ b/core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java @@ -35,12 +35,12 @@ class DeleteVersionValue extends VersionValue { } @Override - public long time() { + public long getTime() { return this.time; } @Override - public boolean delete() { + public boolean isDelete() { return true; } @@ -52,8 +52,8 @@ class DeleteVersionValue extends VersionValue { @Override public String toString() { return "DeleteVersionValue{" + - "version=" + version() + ", " + - "time=" + time + + "version=" + getVersion() + + ",time=" + time + '}'; } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index f6b452502a5..217bc459282 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -55,6 +55,8 @@ import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.common.lucene.uid.VersionsResolver; +import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ReleasableLock; @@ -419,9 +421,9 @@ public abstract class Engine implements Closeable { this.found = found; } - public DeleteResult(Exception failure, long version, long seqNo) { + public DeleteResult(Exception failure, long version, long seqNo, boolean found) { super(Operation.TYPE.DELETE, failure, version, seqNo); - this.found = false; + this.found = found; } public boolean isFound() { @@ -460,9 +462,9 @@ public abstract class Engine implements Closeable { protected final GetResult getFromSearcher(Get get, Function searcherFactory) throws EngineException { final Searcher searcher = searcherFactory.apply("get"); - final Versions.DocIdAndVersion docIdAndVersion; + final DocIdAndVersion docIdAndVersion; try { - docIdAndVersion = Versions.loadDocIdAndVersion(searcher.reader(), get.uid()); + docIdAndVersion = VersionsResolver.loadDocIdAndVersion(searcher.reader(), get.uid()); } catch (Exception e) { Releasables.closeWhileHandlingException(searcher); //TODO: A better exception goes here @@ -1037,9 +1039,10 @@ public abstract class Engine implements Closeable { } // TEST ONLY Index(Term uid, ParsedDocument doc, long version) { - this(uid, doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, version, VersionType.INTERNAL, + // use a primary term of 2 to allow tests to reduce it to a valid >0 term + this(uid, doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 2, version, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime(), -1, false); - } + } // TEST ONLY public ParsedDocument parsedDoc() { return this.doc; @@ -1227,12 +1230,12 @@ public abstract class Engine implements Closeable { public static class GetResult implements Releasable { private final boolean exists; private final long version; - private final Versions.DocIdAndVersion docIdAndVersion; + private final DocIdAndVersion docIdAndVersion; private final Searcher searcher; public static final GetResult NOT_EXISTS = new GetResult(false, Versions.NOT_FOUND, null, null); - private GetResult(boolean exists, long version, Versions.DocIdAndVersion docIdAndVersion, Searcher searcher) { + private GetResult(boolean exists, long version, DocIdAndVersion docIdAndVersion, Searcher searcher) { this.exists = exists; this.version = version; this.docIdAndVersion = docIdAndVersion; @@ -1242,7 +1245,7 @@ public abstract class Engine implements Closeable { /** * Build a non-realtime get result from the searcher. */ - public GetResult(Searcher searcher, Versions.DocIdAndVersion docIdAndVersion) { + public GetResult(Searcher searcher, DocIdAndVersion docIdAndVersion) { this(true, docIdAndVersion.version, docIdAndVersion, searcher); } @@ -1258,7 +1261,7 @@ public abstract class Engine implements Closeable { return this.searcher; } - public Versions.DocIdAndVersion docIdAndVersion() { + public DocIdAndVersion docIdAndVersion() { return docIdAndVersion; } diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 0fa6855ce08..aed68d812a6 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -51,6 +51,7 @@ import org.elasticsearch.common.lucene.LoggerInfoStream; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.common.lucene.uid.VersionsResolver; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -385,13 +386,13 @@ public class InternalEngine extends Engine { if (get.realtime()) { VersionValue versionValue = versionMap.getUnderLock(get.uid()); if (versionValue != null) { - if (versionValue.delete()) { + if (versionValue.isDelete()) { return GetResult.NOT_EXISTS; } - if (get.versionType().isVersionConflictForReads(versionValue.version(), get.version())) { + if (get.versionType().isVersionConflictForReads(versionValue.getVersion(), get.version())) { Uid uid = Uid.createUid(get.uid().text()); throw new VersionConflictEngineException(shardId, uid.type(), uid.id(), - get.versionType().explainConflictForReads(versionValue.version(), get.version())); + get.versionType().explainConflictForReads(versionValue.getVersion(), get.version())); } refresh("realtime_get"); } @@ -403,55 +404,46 @@ public class InternalEngine extends Engine { } /** - * Checks for version conflicts. If a non-critical version conflict exists true is returned. In the case of a critical - * version conflict (if operation origin is primary) a {@link VersionConflictEngineException} is thrown. - * - * @param op the operation - * @param currentVersion the current version - * @param expectedVersion the expected version - * @param deleted {@code true} if the current version is not found or represents a delete - * @return true iff a non-critical version conflict (origin recovery or replica) is found otherwise false - * @throws VersionConflictEngineException if a critical version conflict was found where the operation origin is primary - * @throws IllegalArgumentException if an unsupported version type is used. + * the status of the current doc version in lucene, compared to the version in an incoming + * operation */ - private boolean checkVersionConflict(final Operation op, final long currentVersion, final long expectedVersion, final boolean deleted) { - if (op.versionType() == VersionType.FORCE) { - if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { - // If index was created in 5.0 or later, 'force' is not allowed at all - throw new IllegalArgumentException("version type [FORCE] may not be used for indices created after 6.0"); - } else if (op.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { - // For earlier indices, 'force' is only allowed for translog recovery - throw new IllegalArgumentException("version type [FORCE] may not be used for non-translog operations"); - } - } - - if (op.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) { - if (op.origin() == Operation.Origin.PRIMARY) { - // fatal version conflict - throw new VersionConflictEngineException( - shardId, - op.type(), - op.id(), - op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted)); - - } else { - /* Version conflicts during recovery and on replicas are normal due to asynchronous execution; as such, we should return a - * successful result.*/ - return true; - } - } else { - return false; - } + enum OpVsLuceneDocStatus { + /** the op is more recent than the one that last modified the doc found in lucene*/ + OP_NEWER, + /** the op is older or the same as the one that last modified the doc found in lucene*/ + OP_STALE_OR_EQUAL, + /** no doc was found in lucene */ + LUCENE_DOC_NOT_FOUND } - private long checkDeletedAndGCed(VersionValue versionValue) { - long currentVersion; - if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().relativeTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) { - currentVersion = Versions.NOT_FOUND; // deleted, and GC - } else { - currentVersion = versionValue.version(); + /** resolves the current version of the document, returning null if not found */ + private VersionValue resolveDocVersion(final Operation op) throws IOException { + assert incrementVersionLookup(); // used for asserting in tests + VersionValue versionValue = versionMap.getUnderLock(op.uid()); + if (versionValue == null) { + assert incrementIndexVersionLookup(); // used for asserting in tests + final long currentVersion = loadCurrentVersionFromIndex(op.uid()); + if (currentVersion != Versions.NOT_FOUND) { + versionValue = new VersionValue(currentVersion); + } + } else if (engineConfig.isEnableGcDeletes() && versionValue.isDelete() && + (engineConfig.getThreadPool().relativeTimeInMillis() - versionValue.getTime()) > + getGcDeletesInMillis()) { + versionValue = null; + } + return versionValue; + } + + private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnVersions(final Operation op) + throws IOException { + assert op.version() >= 0 : "versions should be non-negative. got " + op.version(); + final VersionValue versionValue = resolveDocVersion(op); + if (versionValue == null) { + return OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND; + } else { + return op.version() > versionValue.getVersion() ? + OpVsLuceneDocStatus.OP_NEWER : OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; } - return currentVersion; } private boolean canOptimizeAddDocument(Index index) { @@ -492,7 +484,7 @@ public class InternalEngine extends Engine { return true; } - private boolean assertSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) { + private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) { if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1_UNRELEASED) && origin == Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { // legacy support assert seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO : "old op recovering but it already has a seq no.;" + @@ -507,20 +499,29 @@ public class InternalEngine extends Engine { return true; } + private boolean assertSequenceNumberBeforeIndexing(final Engine.Operation.Origin origin, final long seqNo) { + if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED) || + origin == Operation.Origin.PRIMARY) { + // sequence number should be set when operation origin is primary or when all shards are on new nodes + assert seqNo >= 0 : "ops should have an assigned seq no.; origin: " + origin; + } + return true; + } + + @Override public IndexResult index(Index index) throws IOException { final boolean doThrottle = index.origin().isRecovery() == false; try (ReleasableLock releasableLock = readLock.acquire()) { ensureOpen(); - assert assertSequenceNumber(index.origin(), index.seqNo()); + assert assertIncomingSequenceNumber(index.origin(), index.seqNo()); assert assertVersionType(index); - final Translog.Location location; - long seqNo = index.seqNo(); try (Releasable ignored = acquireLock(index.uid()); Releasable indexThrottle = doThrottle ? () -> {} : throttle.acquireThrottle()) { lastWriteNanos = index.startTime(); - /* if we have an autoGeneratedID that comes into the engine we can potentially optimize - * and just use addDocument instead of updateDocument and skip the entire version and index lookup across the board. + /* A NOTE ABOUT APPEND ONLY OPTIMIZATIONS: + * if we have an autoGeneratedID that comes into the engine we can potentially optimize + * and just use addDocument instead of updateDocument and skip the entire version and index lookupVersion across the board. * Yet, we have to deal with multiple document delivery, for this we use a property of the document that is added * to detect if it has potentially been added before. We use the documents timestamp for this since it's something * that: @@ -543,62 +544,37 @@ public class InternalEngine extends Engine { * if A arrives on the shard first we use addDocument since maxUnsafeAutoIdTimestamp is < 10. A` will then just be skipped or calls * updateDocument. */ - long currentVersion; - final boolean deleted; - // if anything is fishy here ie. there is a retry we go and force updateDocument below so we are updating the document in the - // lucene index without checking the version map but we still do the version check - final boolean forceUpdateDocument; - final boolean canOptimizeAddDocument = canOptimizeAddDocument(index); - if (canOptimizeAddDocument) { - forceUpdateDocument = isForceUpdateDocument(index); - currentVersion = Versions.NOT_FOUND; - deleted = true; + final IndexingStrategy plan; + + if (index.origin() == Operation.Origin.PRIMARY) { + plan = planIndexingAsPrimary(index); } else { - // update the document - forceUpdateDocument = false; // we don't force it - it depends on the version - final VersionValue versionValue = versionMap.getUnderLock(index.uid()); - assert incrementVersionLookup(); - if (versionValue == null) { - currentVersion = loadCurrentVersionFromIndex(index.uid()); - deleted = currentVersion == Versions.NOT_FOUND; - } else { - currentVersion = checkDeletedAndGCed(versionValue); - deleted = versionValue.delete(); - } - } - final long expectedVersion = index.version(); - Optional resultOnVersionConflict; - try { - final boolean isVersionConflict = checkVersionConflict(index, currentVersion, expectedVersion, deleted); - resultOnVersionConflict = isVersionConflict ? Optional.of(new IndexResult(currentVersion, index.seqNo(), false)) - : Optional.empty(); - } catch (IllegalArgumentException | VersionConflictEngineException ex) { - resultOnVersionConflict = Optional.of(new IndexResult(ex, currentVersion, index.seqNo())); + // non-primary mode (i.e., replica or recovery) + plan = planIndexingAsNonPrimary(index); } final IndexResult indexResult; - if (resultOnVersionConflict.isPresent()) { - indexResult = resultOnVersionConflict.get(); + if (plan.earlyResultOnPreFlightError.isPresent()) { + indexResult = plan.earlyResultOnPreFlightError.get(); + assert indexResult.hasFailure(); + } else if (plan.indexIntoLucene) { + indexResult = indexIntoLucene(index, plan); } else { - // no version conflict - if (index.origin() == Operation.Origin.PRIMARY) { - seqNo = seqNoService().generateSeqNo(); - } - indexResult = indexIntoLucene(index, seqNo, currentVersion, deleted, forceUpdateDocument, canOptimizeAddDocument, expectedVersion); + indexResult = new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, + plan.currentNotFoundOrDeleted); } - if (indexResult.hasFailure() == false) { - location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY - ? translog.add(new Translog.Index(index, indexResult)) - : null; + if (indexResult.hasFailure() == false && + index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { + Translog.Location location = + translog.add(new Translog.Index(index, indexResult)); indexResult.setTranslogLocation(location); } + if (indexResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + seqNoService().markSeqNoAsCompleted(indexResult.getSeqNo()); + } indexResult.setTook(System.nanoTime() - index.startTime()); indexResult.freeze(); return indexResult; - } finally { - if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) { - seqNoService().markSeqNoAsCompleted(seqNo); - } } } catch (RuntimeException | IOException e) { try { @@ -610,24 +586,94 @@ public class InternalEngine extends Engine { } } - private IndexResult indexIntoLucene(Index index, long seqNo, long currentVersion, boolean deleted, boolean forceUpdateDocument, boolean canOptimizeAddDocument, long expectedVersion) throws IOException { + private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException { + final IndexingStrategy plan; + if (canOptimizeAddDocument(index) && mayHaveBeenIndexedBefore(index) == false) { + // no need to deal with out of order delivery - we never saw this one + assert index.version() == 1L : + "can optimize on replicas but incoming version is [" + index.version() + "]"; + plan = IndexingStrategy.optimizedAppendOnly(index.seqNo()); + } else { + // drop out of order operations + assert index.versionType().versionTypeForReplicationAndRecovery() == index.versionType() : + "resolving out of order delivery based on versioning but version type isn't fit for it. got [" + + index.versionType() + "]"; + // unlike the primary, replicas don't really care to about creation status of documents + // this allows to ignore the case where a document was found in the live version maps in + // a delete state and return false for the created flag in favor of code simplicity + final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnVersions(index); + if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { + plan = IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); + } else { + plan = IndexingStrategy.processNormally( + opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.seqNo(), index.version() + ); + } + } + return plan; + } + + private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { + assert index.origin() == Operation.Origin.PRIMARY : + "planing as primary but origin isn't. got " + index.origin(); + final IndexingStrategy plan; + // resolve an external operation into an internal one which is safe to replay + if (canOptimizeAddDocument(index)) { + if (mayHaveBeenIndexedBefore(index)) { + plan = IndexingStrategy.overrideExistingAsIfNotThere(seqNoService().generateSeqNo(), 1L); + } else { + plan = IndexingStrategy.optimizedAppendOnly(seqNoService().generateSeqNo()); + } + } else { + // resolves incoming version + final VersionValue versionValue = resolveDocVersion(index); + final long currentVersion; + final boolean currentNotFoundOrDeleted; + if (versionValue == null) { + currentVersion = Versions.NOT_FOUND; + currentNotFoundOrDeleted = true; + } else { + currentVersion = versionValue.getVersion(); + currentNotFoundOrDeleted = versionValue.isDelete(); + } + if (index.versionType().isVersionConflictForWrites( + currentVersion, index.version(), currentNotFoundOrDeleted)) { + plan = IndexingStrategy.skipDueToVersionConflict( + new VersionConflictEngineException(shardId, index, currentVersion, + currentNotFoundOrDeleted), + currentNotFoundOrDeleted, currentVersion); + } else { + plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted, + seqNoService().generateSeqNo(), + index.versionType().updateVersion(currentVersion, index.version()) + ); + } + } + return plan; + } + + private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) + throws IOException { + assert assertSequenceNumberBeforeIndexing(index.origin(), plan.seqNoForIndexing); + assert plan.versionForIndexing >= 0 : "version must be set. got " + plan.versionForIndexing; + assert plan.indexIntoLucene; /* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence * number service if this is on the primary, or the existing document's sequence number if this is on the replica. The * primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created. */ - index.parsedDoc().updateSeqID(seqNo, index.primaryTerm()); - final long updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); - index.parsedDoc().version().setLongValue(updatedVersion); + index.parsedDoc().updateSeqID(plan.seqNoForIndexing, index.primaryTerm()); + index.parsedDoc().version().setLongValue(plan.versionForIndexing); try { - if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) { - // document does not exists, we can optimize for create, but double check if assertions are running - assert assertDocDoesNotExist(index, canOptimizeAddDocument == false); - index(index.docs(), indexWriter); - } else { + if (plan.useLuceneUpdateDocument) { update(index.uid(), index.docs(), indexWriter); + } else { + // document does not exists, we can optimize for create, but double check if assertions are running + assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false); + index(index.docs(), indexWriter); } - versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion)); - return new IndexResult(updatedVersion, seqNo, deleted); + versionMap.putUnderLock(index.uid().bytes(), new VersionValue(plan.versionForIndexing)); + return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, + plan.currentNotFoundOrDeleted); } catch (Exception ex) { if (indexWriter.getTragicException() == null) { /* There is no tragic event recorded so this must be a document failure. @@ -639,19 +685,29 @@ public class InternalEngine extends Engine { * Bottom line is that we can only rely on the fact that if it's a document failure then * `indexWriter.getTragicException()` will be null otherwise we have to rethrow and treat it as fatal or rather * non-document failure + * + * we return a `MATCH_ANY` version to indicate no document was index. The value is + * not used anyway */ - return new IndexResult(ex, currentVersion, index.seqNo()); + return new IndexResult(ex, Versions.MATCH_ANY, index.seqNo()); } else { throw ex; } } } - private boolean isForceUpdateDocument(Index index) { - boolean forceUpdateDocument; + /** + * returns true if the indexing operation may have already be processed by this engine. + * Note that it is OK to rarely return true even if this is not the case. However a `false` + * return value must always be correct. + * + */ + private boolean mayHaveBeenIndexedBefore(Index index) { + assert canOptimizeAddDocument(index); + boolean mayHaveBeenIndexBefore; long deOptimizeTimestamp = maxUnsafeAutoIdTimestamp.get(); if (index.isRetry()) { - forceUpdateDocument = true; + mayHaveBeenIndexBefore = true; do { deOptimizeTimestamp = maxUnsafeAutoIdTimestamp.get(); if (deOptimizeTimestamp >= index.getAutoGeneratedIdTimestamp()) { @@ -662,9 +718,9 @@ public class InternalEngine extends Engine { assert maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp(); } else { // in this case we force - forceUpdateDocument = deOptimizeTimestamp >= index.getAutoGeneratedIdTimestamp(); + mayHaveBeenIndexBefore = deOptimizeTimestamp >= index.getAutoGeneratedIdTimestamp(); } - return forceUpdateDocument; + return mayHaveBeenIndexBefore; } private static void index(final List docs, final IndexWriter indexWriter) throws IOException { @@ -675,13 +731,70 @@ public class InternalEngine extends Engine { } } + private static final class IndexingStrategy { + final boolean currentNotFoundOrDeleted; + final boolean useLuceneUpdateDocument; + final long seqNoForIndexing; + final long versionForIndexing; + final boolean indexIntoLucene; + final Optional earlyResultOnPreFlightError; + + private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpdateDocument, + boolean indexIntoLucene, long seqNoForIndexing, + long versionForIndexing, IndexResult earlyResultOnPreFlightError) { + assert useLuceneUpdateDocument == false || indexIntoLucene : + "use lucene update is set to true, but we're not indexing into lucene"; + assert (indexIntoLucene && earlyResultOnPreFlightError != null) == false : + "can only index into lucene or have a preflight result but not both." + + "indexIntoLucene: " + indexIntoLucene + + " earlyResultOnPreFlightError:" + earlyResultOnPreFlightError; + this.currentNotFoundOrDeleted = currentNotFoundOrDeleted; + this.useLuceneUpdateDocument = useLuceneUpdateDocument; + this.seqNoForIndexing = seqNoForIndexing; + this.versionForIndexing = versionForIndexing; + this.indexIntoLucene = indexIntoLucene; + this.earlyResultOnPreFlightError = + earlyResultOnPreFlightError == null ? Optional.empty() : + Optional.of(earlyResultOnPreFlightError); + } + + static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing) { + return new IndexingStrategy(true, false, true, seqNoForIndexing, 1, null); + } + + static IndexingStrategy skipDueToVersionConflict(VersionConflictEngineException e, + boolean currentNotFoundOrDeleted, + long currentVersion) { + return new IndexingStrategy(currentNotFoundOrDeleted, false, + false, SequenceNumbersService.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND, + new IndexResult(e, currentVersion)); + } + + static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted, + long seqNoForIndexing, long versionForIndexing) { + return new IndexingStrategy(currentNotFoundOrDeleted, currentNotFoundOrDeleted == false, + true, seqNoForIndexing, versionForIndexing, null); + } + + static IndexingStrategy overrideExistingAsIfNotThere( + long seqNoForIndexing, long versionForIndexing) { + return new IndexingStrategy(true, true, true, seqNoForIndexing, versionForIndexing, null); + } + + static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, + long seqNoForIndexing, long versionForIndexing) { + return new IndexingStrategy(currentNotFoundOrDeleted, false, + false, seqNoForIndexing, versionForIndexing, null); + } + } + /** * Asserts that the doc in the index operation really doesn't exist */ private boolean assertDocDoesNotExist(final Index index, final boolean allowDeleted) throws IOException { final VersionValue versionValue = versionMap.getUnderLock(index.uid()); if (versionValue != null) { - if (versionValue.delete() == false || allowDeleted == false) { + if (versionValue.isDelete() == false || allowDeleted == false) { throw new AssertionError("doc [" + index.type() + "][" + index.id() + "] exists in version map (version " + versionValue + ")"); } } else { @@ -705,12 +818,39 @@ public class InternalEngine extends Engine { @Override public DeleteResult delete(Delete delete) throws IOException { - DeleteResult result; - try (ReleasableLock ignored = readLock.acquire()) { - assert assertVersionType(delete); + assert assertVersionType(delete); + assert assertIncomingSequenceNumber(delete.origin(), delete.seqNo()); + final DeleteResult deleteResult; + // NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments: + try (ReleasableLock ignored = readLock.acquire(); Releasable ignored2 = acquireLock(delete.uid())) { ensureOpen(); - // NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments: - result = innerDelete(delete); + lastWriteNanos = delete.startTime(); + final DeletionStrategy plan; + if (delete.origin() == Operation.Origin.PRIMARY) { + plan = planDeletionAsPrimary(delete); + } else { + plan = planDeletionAsNonPrimary(delete); + } + + if (plan.earlyResultOnPreflightError.isPresent()) { + deleteResult = plan.earlyResultOnPreflightError.get(); + } else if (plan.deleteFromLucene) { + deleteResult = deleteInLucene(delete, plan); + } else { + deleteResult = new DeleteResult(plan.versionOfDeletion, plan.seqNoOfDeletion, + plan.currentlyDeleted == false); + } + if (!deleteResult.hasFailure() && + delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { + Translog.Location location = + translog.add(new Translog.Delete(delete, deleteResult)); + deleteResult.setTranslogLocation(location); + } + if (deleteResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + seqNoService().markSeqNoAsCompleted(deleteResult.getSeqNo()); + } + deleteResult.setTook(System.nanoTime() - delete.startTime()); + deleteResult.freeze(); } catch (RuntimeException | IOException e) { try { maybeFailEngine("index", e); @@ -720,7 +860,128 @@ public class InternalEngine extends Engine { throw e; } maybePruneDeletedTombstones(); - return result; + return deleteResult; + } + + private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException { + assert delete.origin() != Operation.Origin.PRIMARY : "planing as primary but got " + + delete.origin(); + // drop out of order operations + assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() : + "resolving out of order delivery based on versioning but version type isn't fit for it. got [" + + delete.versionType() + "]"; + // unlike the primary, replicas don't really care to about found status of documents + // this allows to ignore the case where a document was found in the live version maps in + // a delete state and return true for the found flag in favor of code simplicity + final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnVersions(delete); + + final DeletionStrategy plan; + if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { + plan = DeletionStrategy.processButSkipLucene(false, delete.seqNo(), delete.version()); + } else { + plan = DeletionStrategy.processNormally( + opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, + delete.seqNo(), delete.version()); + } + return plan; + } + + private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException { + assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + + delete.origin(); + // resolve operation from external to internal + final VersionValue versionValue = resolveDocVersion(delete); + assert incrementVersionLookup(); + final long currentVersion; + final boolean currentlyDeleted; + if (versionValue == null) { + currentVersion = Versions.NOT_FOUND; + currentlyDeleted = true; + } else { + currentVersion = versionValue.getVersion(); + currentlyDeleted = versionValue.isDelete(); + } + final DeletionStrategy plan; + if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) { + plan = DeletionStrategy.skipDueToVersionConflict( + new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted), + currentVersion, currentlyDeleted); + } else { + plan = DeletionStrategy.processNormally(currentlyDeleted, + seqNoService().generateSeqNo(), + delete.versionType().updateVersion(currentVersion, delete.version())); + } + return plan; + } + + private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) + throws IOException { + try { + if (plan.currentlyDeleted == false) { + // any exception that comes from this is a either an ACE or a fatal exception there + // can't be any document failures coming from this + indexWriter.deleteDocuments(delete.uid()); + } + versionMap.putUnderLock(delete.uid().bytes(), + new DeleteVersionValue(plan.versionOfDeletion, + engineConfig.getThreadPool().relativeTimeInMillis())); + return new DeleteResult( + plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false); + } catch (Exception ex) { + if (indexWriter.getTragicException() == null) { + // there is no tragic event and such it must be a document level failure + return new DeleteResult(ex, plan.versionOfDeletion, plan.versionOfDeletion, + plan.currentlyDeleted == false); + } else { + throw ex; + } + } + } + + private static final class DeletionStrategy { + // of a rare double delete + final boolean deleteFromLucene; + final boolean currentlyDeleted; + final long seqNoOfDeletion; + final long versionOfDeletion; + final Optional earlyResultOnPreflightError; + + private DeletionStrategy(boolean deleteFromLucene, boolean currentlyDeleted, + long seqNoOfDeletion, long versionOfDeletion, + DeleteResult earlyResultOnPreflightError) { + assert (deleteFromLucene && earlyResultOnPreflightError != null) == false : + "can only delete from lucene or have a preflight result but not both." + + "deleteFromLucene: " + deleteFromLucene + + " earlyResultOnPreFlightError:" + earlyResultOnPreflightError; + this.deleteFromLucene = deleteFromLucene; + this.currentlyDeleted = currentlyDeleted; + this.seqNoOfDeletion = seqNoOfDeletion; + this.versionOfDeletion = versionOfDeletion; + this.earlyResultOnPreflightError = earlyResultOnPreflightError == null ? + Optional.empty() : Optional.of(earlyResultOnPreflightError); + } + + static DeletionStrategy skipDueToVersionConflict(VersionConflictEngineException e, + long currentVersion, boolean currentlyDeleted) { + return new DeletionStrategy(false, currentlyDeleted, + SequenceNumbersService.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND, + new DeleteResult(e, currentVersion, SequenceNumbersService.UNASSIGNED_SEQ_NO, + currentlyDeleted == false)); + } + + static DeletionStrategy processNormally(boolean currentlyDeleted, + long seqNoOfDeletion, long versionOfDeletion) { + return new DeletionStrategy(true, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, + null); + + } + + public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted, + long seqNoOfDeletion, + long versionOfDeletion) { + return new DeletionStrategy(false, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, + null); + } } private void maybePruneDeletedTombstones() { @@ -731,84 +992,6 @@ public class InternalEngine extends Engine { } } - private DeleteResult innerDelete(Delete delete) throws IOException { - assert assertSequenceNumber(delete.origin(), delete.seqNo()); - final Translog.Location location; - final long updatedVersion; - final boolean found; - long seqNo = delete.seqNo(); - try (Releasable ignored = acquireLock(delete.uid())) { - lastWriteNanos = delete.startTime(); - final long currentVersion; - final boolean deleted; - final VersionValue versionValue = versionMap.getUnderLock(delete.uid()); - assert incrementVersionLookup(); - if (versionValue == null) { - currentVersion = loadCurrentVersionFromIndex(delete.uid()); - deleted = currentVersion == Versions.NOT_FOUND; - } else { - currentVersion = checkDeletedAndGCed(versionValue); - deleted = versionValue.delete(); - } - - final long expectedVersion = delete.version(); - Optional resultOnVersionConflict; - try { - final boolean isVersionConflict = checkVersionConflict(delete, currentVersion, expectedVersion, deleted); - resultOnVersionConflict = isVersionConflict ? Optional.of(new DeleteResult(expectedVersion, delete.seqNo(), true)) - : Optional.empty(); - } catch (IllegalArgumentException | VersionConflictEngineException ex) { - resultOnVersionConflict = Optional.of(new DeleteResult(ex, expectedVersion, delete.seqNo())); - } - final DeleteResult deleteResult; - if (resultOnVersionConflict.isPresent()) { - deleteResult = resultOnVersionConflict.get(); - } else { - if (delete.origin() == Operation.Origin.PRIMARY) { - seqNo = seqNoService().generateSeqNo(); - } - updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion); - found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue); - deleteResult = new DeleteResult(updatedVersion, seqNo, found); - - versionMap.putUnderLock(delete.uid().bytes(), - new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().relativeTimeInMillis())); - } - if (!deleteResult.hasFailure()) { - location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY - ? translog.add(new Translog.Delete(delete, deleteResult)) - : null; - deleteResult.setTranslogLocation(location); - } - deleteResult.setTook(System.nanoTime() - delete.startTime()); - deleteResult.freeze(); - return deleteResult; - } finally { - if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) { - seqNoService().markSeqNoAsCompleted(seqNo); - } - } - } - - private boolean deleteIfFound(Term uid, long currentVersion, boolean deleted, VersionValue versionValue) throws IOException { - assert uid != null : "uid must not be null"; - final boolean found; - if (currentVersion == Versions.NOT_FOUND) { - // doc does not exist and no prior deletes - found = false; - } else if (versionValue != null && deleted) { - // a "delete on delete", in this case, we still increment the version, log it, and return that version - found = false; - } else { - // we deleted a currently existing document - // any exception that comes from this is a either an ACE or a fatal exception there can't be any document failures coming - // from this. - indexWriter.deleteDocuments(uid); - found = true; - } - return found; - } - @Override public NoOpResult noOp(final NoOp noOp) { NoOpResult noOpResult; @@ -1059,7 +1242,7 @@ public class InternalEngine extends Engine { // Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator: VersionValue versionValue = versionMap.getTombstoneUnderLock(uid); if (versionValue != null) { - if (timeMSec - versionValue.time() > getGcDeletesInMillis()) { + if (timeMSec - versionValue.getTime() > getGcDeletesInMillis()) { versionMap.removeTombstoneUnderLock(uid); } } @@ -1069,6 +1252,11 @@ public class InternalEngine extends Engine { lastDeleteVersionPruneTimeMSec = timeMSec; } + // testing + void clearDeletedTombstones() { + versionMap.clearTombstones(); + } + @Override public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException, IOException { @@ -1302,7 +1490,7 @@ public class InternalEngine extends Engine { private long loadCurrentVersionFromIndex(Term uid) throws IOException { assert incrementIndexVersionLookup(); try (Searcher searcher = acquireSearcher("load_version")) { - return Versions.loadVersion(searcher.reader(), uid); + return VersionsResolver.loadVersion(searcher.reader(), uid); } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java b/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java index 44b302e85cd..7233420309c 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java +++ b/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java @@ -164,7 +164,7 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable { if (prev != null) { // Deduct RAM for the version we just replaced: long prevBytes = BASE_BYTES_PER_CHM_ENTRY; - if (prev.delete() == false) { + if (prev.isDelete() == false) { prevBytes += prev.ramBytesUsed() + uidRAMBytesUsed; } ramBytesUsedCurrent.addAndGet(-prevBytes); @@ -172,13 +172,13 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable { // Add RAM for the new version: long newBytes = BASE_BYTES_PER_CHM_ENTRY; - if (version.delete() == false) { + if (version.isDelete() == false) { newBytes += version.ramBytesUsed() + uidRAMBytesUsed; } ramBytesUsedCurrent.addAndGet(newBytes); final VersionValue prevTombstone; - if (version.delete()) { + if (version.isDelete()) { // Also enroll the delete into tombstones, and account for its RAM too: prevTombstone = tombstones.put(uid, version); @@ -187,7 +187,7 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable { // the accounting to current: ramBytesUsedTombstones.addAndGet(BASE_BYTES_PER_CHM_ENTRY + version.ramBytesUsed() + uidRAMBytesUsed); - if (prevTombstone == null && prev != null && prev.delete()) { + if (prevTombstone == null && prev != null && prev.isDelete()) { // If prev was a delete that had already been removed from tombstones, then current was already accounting for the // BytesRef/VersionValue RAM, so we now deduct that as well: ramBytesUsedCurrent.addAndGet(-(prev.ramBytesUsed() + uidRAMBytesUsed)); @@ -211,12 +211,12 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable { final VersionValue prev = tombstones.remove(uid); if (prev != null) { - assert prev.delete(); + assert prev.isDelete(); long v = ramBytesUsedTombstones.addAndGet(-(BASE_BYTES_PER_CHM_ENTRY + prev.ramBytesUsed() + uidRAMBytesUsed)); assert v >= 0: "bytes=" + v; } final VersionValue curVersion = maps.current.get(uid); - if (curVersion != null && curVersion.delete()) { + if (curVersion != null && curVersion.isDelete()) { // We now shift accounting of the BytesRef from tombstones to current, because a refresh would clear this RAM. This should be // uncommon, because with the default refresh=1s and gc_deletes=60s, deletes should be cleared from current long before we drop // them from tombstones: @@ -234,6 +234,11 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable { return tombstones.entrySet(); } + /** clears all tombstones ops */ + void clearTombstones() { + tombstones.clear(); + } + /** Called when this index is closed. */ synchronized void clear() { maps = new Maps(); diff --git a/core/src/main/java/org/elasticsearch/index/engine/VersionConflictEngineException.java b/core/src/main/java/org/elasticsearch/index/engine/VersionConflictEngineException.java index b743141d0c7..f829e35af89 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/VersionConflictEngineException.java +++ b/core/src/main/java/org/elasticsearch/index/engine/VersionConflictEngineException.java @@ -26,6 +26,10 @@ import java.io.IOException; public class VersionConflictEngineException extends EngineException { + public VersionConflictEngineException(ShardId shardId, Engine.Operation op, long currentVersion, boolean deleted) { + this(shardId, op.type(), op.id(), op.versionType().explainConflictForWrites(currentVersion, op.version(), deleted)); + } + public VersionConflictEngineException(ShardId shardId, String type, String id, String explanation) { this(shardId, null, type, id, explanation); } diff --git a/core/src/main/java/org/elasticsearch/index/engine/VersionValue.java b/core/src/main/java/org/elasticsearch/index/engine/VersionValue.java index 6ee484b2b38..53550578cc3 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/VersionValue.java +++ b/core/src/main/java/org/elasticsearch/index/engine/VersionValue.java @@ -29,25 +29,25 @@ class VersionValue implements Accountable { private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(VersionValue.class); + /** the version of the document. used for versioned indexed operations and as a BWC layer, where no seq# are set yet */ private final long version; VersionValue(long version) { this.version = version; } - public long time() { + public long getTime() { throw new UnsupportedOperationException(); } - public long version() { + public long getVersion() { return version; } - public boolean delete() { + public boolean isDelete() { return false; } - @Override public long ramBytesUsed() { return BASE_RAM_BYTES_USED; @@ -61,7 +61,6 @@ class VersionValue implements Accountable { @Override public String toString() { return "VersionValue{" + - "version=" + version + - '}'; + "version=" + version + "}"; } } diff --git a/core/src/main/java/org/elasticsearch/index/get/ShardGetService.java b/core/src/main/java/org/elasticsearch/index/get/ShardGetService.java index cfab3382c18..33f55c7a916 100644 --- a/core/src/main/java/org/elasticsearch/index/get/ShardGetService.java +++ b/core/src/main/java/org/elasticsearch/index/get/ShardGetService.java @@ -24,7 +24,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.util.set.Sets; @@ -179,7 +179,7 @@ public final class ShardGetService extends AbstractIndexShardComponent { private GetResult innerGetLoadFromStoredFields(String type, String id, String[] gFields, FetchSourceContext fetchSourceContext, Engine.GetResult get, MapperService mapperService) { Map fields = null; BytesReference source = null; - Versions.DocIdAndVersion docIdAndVersion = get.docIdAndVersion(); + DocIdAndVersion docIdAndVersion = get.docIdAndVersion(); FieldsVisitor fieldVisitor = buildFieldsVisitors(gFields, fetchSourceContext); if (fieldVisitor != null) { try { diff --git a/core/src/main/java/org/elasticsearch/index/mapper/ParseContext.java b/core/src/main/java/org/elasticsearch/index/mapper/ParseContext.java index ad21cce6674..f1b5760e901 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/ParseContext.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/ParseContext.java @@ -29,7 +29,6 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lucene.all.AllEntries; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.index.mapper.SeqNoFieldMapper; import java.util.ArrayList; import java.util.Iterator; diff --git a/core/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java b/core/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java index c160f2b8cb9..9f844a3371e 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java @@ -26,32 +26,19 @@ import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.index.DocValuesType; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexableField; -import org.apache.lucene.index.LeafReader; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.PointValues; -import org.apache.lucene.search.BoostQuery; import org.apache.lucene.search.MatchNoDocsQuery; import org.apache.lucene.search.Query; -import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.elasticsearch.action.fieldstats.FieldStats; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.fielddata.IndexNumericFieldData.NumericType; import org.elasticsearch.index.fielddata.plain.DocValuesIndexFieldData; -import org.elasticsearch.index.mapper.MappedFieldType; -import org.elasticsearch.index.mapper.Mapper; -import org.elasticsearch.index.mapper.MapperParsingException; -import org.elasticsearch.index.mapper.MetadataFieldMapper; -import org.elasticsearch.index.mapper.ParseContext; -import org.elasticsearch.index.mapper.Mapper.TypeParser.ParserContext; import org.elasticsearch.index.mapper.ParseContext.Document; import org.elasticsearch.index.query.QueryShardContext; -import org.elasticsearch.index.query.QueryShardException; import org.elasticsearch.index.seqno.SequenceNumbersService; import java.io.IOException; diff --git a/core/src/main/java/org/elasticsearch/index/termvectors/TermVectorsService.java b/core/src/main/java/org/elasticsearch/index/termvectors/TermVectorsService.java index 7981c891242..77d8204e45d 100644 --- a/core/src/main/java/org/elasticsearch/index/termvectors/TermVectorsService.java +++ b/core/src/main/java/org/elasticsearch/index/termvectors/TermVectorsService.java @@ -34,7 +34,7 @@ import org.elasticsearch.action.termvectors.TermVectorsResponse; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; @@ -98,7 +98,7 @@ public class TermVectorsService { final Engine.Searcher searcher = indexShard.acquireSearcher("term_vector"); try { Fields topLevelFields = MultiFields.getFields(get.searcher() != null ? get.searcher().reader() : searcher.reader()); - Versions.DocIdAndVersion docIdAndVersion = get.docIdAndVersion(); + DocIdAndVersion docIdAndVersion = get.docIdAndVersion(); /* from an artificial document */ if (request.doc() != null) { termVectorsByField = generateTermVectorsFromDoc(indexShard, request); diff --git a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java index a67a728edcc..d771ced56ff 100644 --- a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java +++ b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java @@ -31,17 +31,17 @@ import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.FixedBitSet; import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.lucene.uid.Versions.DocIdAndVersion; +import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion; import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.test.ESTestCase; -/** - * test per-segment lookup of version-related datastructures +/** + * test per-segment lookup of version-related data structures */ public class VersionLookupTests extends ESTestCase { - /** + /** * test version lookup actually works */ public void testSimple() throws Exception { @@ -55,20 +55,20 @@ public class VersionLookupTests extends ESTestCase { LeafReaderContext segment = reader.leaves().get(0); PerThreadIDAndVersionLookup lookup = new PerThreadIDAndVersionLookup(segment.reader()); // found doc - DocIdAndVersion result = lookup.lookup(new BytesRef("6"), null, segment); + DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), null, segment); assertNotNull(result); assertEquals(87, result.version); assertEquals(0, result.docId); // not found doc - assertNull(lookup.lookup(new BytesRef("7"), null, segment)); + assertNull(lookup.lookupVersion(new BytesRef("7"), null, segment)); // deleted doc - assertNull(lookup.lookup(new BytesRef("6"), new Bits.MatchNoBits(1), segment)); + assertNull(lookup.lookupVersion(new BytesRef("6"), new Bits.MatchNoBits(1), segment)); reader.close(); writer.close(); dir.close(); } - - /** + + /** * test version lookup with two documents matching the ID */ public void testTwoDocuments() throws Exception { @@ -83,26 +83,26 @@ public class VersionLookupTests extends ESTestCase { LeafReaderContext segment = reader.leaves().get(0); PerThreadIDAndVersionLookup lookup = new PerThreadIDAndVersionLookup(segment.reader()); // return the last doc when there are duplicates - DocIdAndVersion result = lookup.lookup(new BytesRef("6"), null, segment); + DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), null, segment); assertNotNull(result); assertEquals(87, result.version); assertEquals(1, result.docId); // delete the first doc only FixedBitSet live = new FixedBitSet(2); live.set(1); - result = lookup.lookup(new BytesRef("6"), live, segment); + result = lookup.lookupVersion(new BytesRef("6"), live, segment); assertNotNull(result); assertEquals(87, result.version); assertEquals(1, result.docId); // delete the second doc only live.clear(1); live.set(0); - result = lookup.lookup(new BytesRef("6"), live, segment); + result = lookup.lookupVersion(new BytesRef("6"), live, segment); assertNotNull(result); assertEquals(87, result.version); assertEquals(0, result.docId); // delete both docs - assertNull(lookup.lookup(new BytesRef("6"), new Bits.MatchNoBits(2), segment)); + assertNull(lookup.lookupVersion(new BytesRef("6"), new Bits.MatchNoBits(2), segment)); reader.close(); writer.close(); dir.close(); diff --git a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java index 45693e101d4..6b9960294e4 100644 --- a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java +++ b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java @@ -38,6 +38,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import static org.elasticsearch.common.lucene.uid.VersionsResolver.loadDocIdAndVersion; +import static org.elasticsearch.common.lucene.uid.VersionsResolver.loadVersion; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; @@ -61,15 +63,15 @@ public class VersionsTests extends ESTestCase { Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)); DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "_na_", 1)); - MatcherAssert.assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND)); + MatcherAssert.assertThat(loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND)); Document doc = new Document(); doc.add(new Field(UidFieldMapper.NAME, "1", UidFieldMapper.Defaults.FIELD_TYPE)); doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 1)); writer.updateDocument(new Term(UidFieldMapper.NAME, "1"), doc); directoryReader = reopen(directoryReader); - assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(1L)); - assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(1L)); + assertThat(loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(1L)); + assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(1L)); doc = new Document(); Field uid = new Field(UidFieldMapper.NAME, "1", UidFieldMapper.Defaults.FIELD_TYPE); @@ -78,8 +80,8 @@ public class VersionsTests extends ESTestCase { doc.add(version); writer.updateDocument(new Term(UidFieldMapper.NAME, "1"), doc); directoryReader = reopen(directoryReader); - assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(2L)); - assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(2L)); + assertThat(loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(2L)); + assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(2L)); // test reuse of uid field doc = new Document(); @@ -89,13 +91,13 @@ public class VersionsTests extends ESTestCase { writer.updateDocument(new Term(UidFieldMapper.NAME, "1"), doc); directoryReader = reopen(directoryReader); - assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(3L)); - assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(3L)); + assertThat(loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(3L)); + assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(3L)); writer.deleteDocuments(new Term(UidFieldMapper.NAME, "1")); directoryReader = reopen(directoryReader); - assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND)); - assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), nullValue()); + assertThat(loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND)); + assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), nullValue()); directoryReader.close(); writer.close(); dir.close(); @@ -121,21 +123,21 @@ public class VersionsTests extends ESTestCase { writer.updateDocuments(new Term(UidFieldMapper.NAME, "1"), docs); DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "_na_", 1)); - assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(5L)); - assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(5L)); + assertThat(loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(5L)); + assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(5L)); version.setLongValue(6L); writer.updateDocuments(new Term(UidFieldMapper.NAME, "1"), docs); version.setLongValue(7L); writer.updateDocuments(new Term(UidFieldMapper.NAME, "1"), docs); directoryReader = reopen(directoryReader); - assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(7L)); - assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(7L)); + assertThat(loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(7L)); + assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(7L)); writer.deleteDocuments(new Term(UidFieldMapper.NAME, "1")); directoryReader = reopen(directoryReader); - assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND)); - assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), nullValue()); + assertThat(loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND)); + assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), nullValue()); directoryReader.close(); writer.close(); dir.close(); @@ -143,7 +145,7 @@ public class VersionsTests extends ESTestCase { /** Test that version map cache works, is evicted on close, etc */ public void testCache() throws Exception { - int size = Versions.lookupStates.size(); + int size = VersionsResolver.lookupStates.size(); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)); @@ -153,22 +155,22 @@ public class VersionsTests extends ESTestCase { writer.addDocument(doc); DirectoryReader reader = DirectoryReader.open(writer); // should increase cache size by 1 - assertEquals(87, Versions.loadVersion(reader, new Term(UidFieldMapper.NAME, "6"))); - assertEquals(size+1, Versions.lookupStates.size()); + assertEquals(87, loadVersion(reader, new Term(UidFieldMapper.NAME, "6"))); + assertEquals(size+1, VersionsResolver.lookupStates.size()); // should be cache hit - assertEquals(87, Versions.loadVersion(reader, new Term(UidFieldMapper.NAME, "6"))); - assertEquals(size+1, Versions.lookupStates.size()); + assertEquals(87, loadVersion(reader, new Term(UidFieldMapper.NAME, "6"))); + assertEquals(size+1, VersionsResolver.lookupStates.size()); reader.close(); writer.close(); // core should be evicted from the map - assertEquals(size, Versions.lookupStates.size()); + assertEquals(size, VersionsResolver.lookupStates.size()); dir.close(); } /** Test that version map cache behaves properly with a filtered reader */ public void testCacheFilterReader() throws Exception { - int size = Versions.lookupStates.size(); + int size = VersionsResolver.lookupStates.size(); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)); @@ -177,18 +179,18 @@ public class VersionsTests extends ESTestCase { doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87)); writer.addDocument(doc); DirectoryReader reader = DirectoryReader.open(writer); - assertEquals(87, Versions.loadVersion(reader, new Term(UidFieldMapper.NAME, "6"))); - assertEquals(size+1, Versions.lookupStates.size()); + assertEquals(87, loadVersion(reader, new Term(UidFieldMapper.NAME, "6"))); + assertEquals(size+1, VersionsResolver.lookupStates.size()); // now wrap the reader DirectoryReader wrapped = ElasticsearchDirectoryReader.wrap(reader, new ShardId("bogus", "_na_", 5)); - assertEquals(87, Versions.loadVersion(wrapped, new Term(UidFieldMapper.NAME, "6"))); + assertEquals(87, loadVersion(wrapped, new Term(UidFieldMapper.NAME, "6"))); // same size map: core cache key is shared - assertEquals(size+1, Versions.lookupStates.size()); + assertEquals(size+1, VersionsResolver.lookupStates.size()); reader.close(); writer.close(); // core should be evicted from the map - assertEquals(size, Versions.lookupStates.size()); + assertEquals(size, VersionsResolver.lookupStates.size()); dir.close(); } } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 65bebc40933..7e5d8d7e73e 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -75,17 +75,19 @@ import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.FileSystemUtils; -import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.common.lucene.uid.VersionsResolver; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; @@ -96,6 +98,7 @@ import org.elasticsearch.index.analysis.IndexAnalyzers; import org.elasticsearch.index.analysis.NamedAnalyzer; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.engine.Engine.Searcher; +import org.elasticsearch.index.fieldvisitor.FieldsVisitor; import org.elasticsearch.index.mapper.ContentPath; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.DocumentMapperForType; @@ -157,16 +160,17 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; import java.util.function.LongSupplier; import java.util.function.Supplier; import static java.util.Collections.emptyMap; +import static java.util.Collections.shuffle; import static org.elasticsearch.index.engine.Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThan; @@ -265,11 +269,16 @@ public class InternalEngineTests extends ESTestCase { private static Document testDocumentWithTextField() { + return testDocumentWithTextField("test"); + } + + private static Document testDocumentWithTextField(String value) { Document document = testDocument(); - document.add(new TextField("value", "test", Field.Store.YES)); + document.add(new TextField("value", value, Field.Store.YES)); return document; } + private static Document testDocument() { return new Document(); } @@ -287,6 +296,8 @@ public class InternalEngineTests extends ESTestCase { document.add(seqID.seqNo); document.add(seqID.seqNoDocValue); document.add(seqID.primaryTerm); + BytesRef ref = source.toBytesRef(); + document.add(new StoredField(SourceFieldMapper.NAME, ref.bytes, ref.offset, ref.length)); return new ParsedDocument(versionField, seqID, id, type, routing, Arrays.asList(document), source, XContentType.JSON, mappingUpdate); } @@ -412,7 +423,11 @@ public class InternalEngineTests extends ESTestCase { private static final BytesReference B_1 = new BytesArray(new byte[]{1}); private static final BytesReference B_2 = new BytesArray(new byte[]{2}); private static final BytesReference B_3 = new BytesArray(new byte[]{3}); - private static final BytesArray SOURCE = new BytesArray("{}".getBytes(Charset.defaultCharset())); + private static final BytesArray SOURCE = bytesArray("{}"); + + private static BytesArray bytesArray(String string) { + return new BytesArray(string.getBytes(Charset.defaultCharset())); + } public void testSegments() throws Exception { try (Store store = createStore(); @@ -1185,124 +1200,6 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getVersion(), equalTo(1L)); } - public void testExternalVersioningNewIndex() throws IOException { - ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); - Engine.Index index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 12, VersionType.EXTERNAL, PRIMARY, 0, -1, false); - Engine.IndexResult indexResult = engine.index(index); - assertThat(indexResult.getVersion(), equalTo(12L)); - - index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); - indexResult = replicaEngine.index(index); - assertThat(indexResult.getVersion(), equalTo(12L)); - } - - public void testVersioningIndexConflict() throws IOException { - ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); - Engine.Index index = indexForDoc(doc); - Engine.IndexResult indexResult = engine.index(index); - assertThat(indexResult.getVersion(), equalTo(1L)); - - index = indexForDoc(doc); - indexResult = engine.index(index); - assertThat(indexResult.getVersion(), equalTo(2L)); - - index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 1L, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, 0, -1, false); - indexResult = engine.index(index); - assertTrue(indexResult.hasFailure()); - assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); - - // future versions should not work as well - index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 3L, VersionType.INTERNAL, PRIMARY, 0, -1, false); - indexResult = engine.index(index); - assertTrue(indexResult.hasFailure()); - assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); - } - - public void testExternalVersioningIndexConflict() throws IOException { - ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); - Engine.Index index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 12, VersionType.EXTERNAL, PRIMARY, 0, -1, false); - Engine.IndexResult indexResult = engine.index(index); - assertThat(indexResult.getVersion(), equalTo(12L)); - - index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 14, VersionType.EXTERNAL, PRIMARY, 0, -1, false); - indexResult = engine.index(index); - assertThat(indexResult.getVersion(), equalTo(14L)); - - index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 13, VersionType.EXTERNAL, PRIMARY, 0, -1, false); - indexResult = engine.index(index); - assertTrue(indexResult.hasFailure()); - assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); - } - - public void testForceVersioningNotAllowedExceptForOlderIndices() throws Exception { - ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); - Engine.Index index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 42, VersionType.FORCE, PRIMARY, 0, -1, false); - - Engine.IndexResult indexResult = engine.index(index); - assertTrue(indexResult.hasFailure()); - assertThat(indexResult.getFailure(), instanceOf(IllegalArgumentException.class)); - assertThat(indexResult.getFailure().getMessage(), containsString("version type [FORCE] may not be used for indices created after 6.0")); - - IndexSettings oldIndexSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder() - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_5_0_0_beta1) - .build()); - try (Store store = createStore(); - Engine engine = createEngine(oldIndexSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) { - index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 84, VersionType.FORCE, PRIMARY, 0, -1, false); - Engine.IndexResult result = engine.index(index); - assertTrue(result.hasFailure()); - assertThat(result.getFailure(), instanceOf(IllegalArgumentException.class)); - assertThat(result.getFailure().getMessage(), containsString("version type [FORCE] may not be used for non-translog operations")); - - index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 84, VersionType.FORCE, - Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, 0, -1, false); - result = engine.index(index); - assertThat(result.getVersion(), equalTo(84L)); - } - } - - public void testVersioningIndexConflictWithFlush() throws IOException { - ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); - Engine.Index index = indexForDoc(doc); - Engine.IndexResult indexResult = engine.index(index); - assertThat(indexResult.getVersion(), equalTo(1L)); - - index = indexForDoc(doc); - indexResult = engine.index(index); - assertThat(indexResult.getVersion(), equalTo(2L)); - - engine.flush(); - - index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 1L, VersionType.INTERNAL, PRIMARY, 0, -1, false); - indexResult = engine.index(index); - assertTrue(indexResult.hasFailure()); - assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); - - // future versions should not work as well - index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 3L, VersionType.INTERNAL, PRIMARY, 0, -1, false); - indexResult = engine.index(index); - assertTrue(indexResult.hasFailure()); - assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); - } - - public void testExternalVersioningIndexConflictWithFlush() throws IOException { - ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); - Engine.Index index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 12, VersionType.EXTERNAL, PRIMARY, 0, -1, false); - Engine.IndexResult indexResult = engine.index(index); - assertThat(indexResult.getVersion(), equalTo(12L)); - - index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 14, VersionType.EXTERNAL, PRIMARY, 0, -1, false); - indexResult = engine.index(index); - assertThat(indexResult.getVersion(), equalTo(14L)); - - engine.flush(); - - index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 13, VersionType.EXTERNAL, PRIMARY, 0, -1, false); - indexResult = engine.index(index); - assertTrue(indexResult.hasFailure()); - assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); - } - public void testForceMerge() throws IOException { try (Store store = createStore(); Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), @@ -1400,78 +1297,6 @@ public class InternalEngineTests extends ESTestCase { } - public void testVersioningDeleteConflict() throws IOException { - ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); - Engine.Index index = indexForDoc(doc); - Engine.IndexResult indexResult = engine.index(index); - assertThat(indexResult.getVersion(), equalTo(1L)); - - index = indexForDoc(doc); - indexResult = engine.index(index); - assertThat(indexResult.getVersion(), equalTo(2L)); - - Engine.Delete delete = new Engine.Delete("test", "1", newUid(doc), SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 1L, VersionType.INTERNAL, PRIMARY, 0); - Engine.DeleteResult result = engine.delete(delete); - assertTrue(result.hasFailure()); - assertThat(result.getFailure(), instanceOf(VersionConflictEngineException.class)); - - // future versions should not work as well - delete = new Engine.Delete("test", "1", newUid(doc), SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 3L, VersionType.INTERNAL, PRIMARY, 0); - result = engine.delete(delete); - assertTrue(result.hasFailure()); - assertThat(result.getFailure(), instanceOf(VersionConflictEngineException.class)); - - // now actually delete - delete = new Engine.Delete("test", "1", newUid(doc), SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 2L, VersionType.INTERNAL, PRIMARY, 0); - result = engine.delete(delete); - assertThat(result.getVersion(), equalTo(3L)); - - // now check if we can index to a delete doc with version - index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 2L, VersionType.INTERNAL, PRIMARY, 0, -1, false); - indexResult = engine.index(index); - assertTrue(indexResult.hasFailure()); - assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); - } - - public void testVersioningDeleteConflictWithFlush() throws IOException { - ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); - Engine.Index index = indexForDoc(doc); - Engine.IndexResult indexResult = engine.index(index); - assertThat(indexResult.getVersion(), equalTo(1L)); - - index = indexForDoc(doc); - indexResult = engine.index(index); - assertThat(indexResult.getVersion(), equalTo(2L)); - - engine.flush(); - - Engine.Delete delete = new Engine.Delete("test", "1", newUid(doc), SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 1L, VersionType.INTERNAL, PRIMARY, 0); - Engine.DeleteResult deleteResult = engine.delete(delete); - assertTrue(deleteResult.hasFailure()); - assertThat(deleteResult.getFailure(), instanceOf(VersionConflictEngineException.class)); - - // future versions should not work as well - delete = new Engine.Delete("test", "1", newUid(doc), SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 3L, VersionType.INTERNAL, PRIMARY, 0); - deleteResult = engine.delete(delete); - assertTrue(deleteResult.hasFailure()); - assertThat(deleteResult.getFailure(), instanceOf(VersionConflictEngineException.class)); - - engine.flush(); - - // now actually delete - delete = new Engine.Delete("test", "1", newUid(doc), SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 2L, VersionType.INTERNAL, PRIMARY, 0); - deleteResult = engine.delete(delete); - assertThat(deleteResult.getVersion(), equalTo(3L)); - - engine.flush(); - - // now check if we can index to a delete doc with version - index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 2L, VersionType.INTERNAL, PRIMARY, 0, -1, false); - indexResult = engine.index(index); - assertTrue(indexResult.hasFailure()); - assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); - } - public void testVersioningCreateExistsException() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index create = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false); @@ -1484,136 +1309,462 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } - public void testVersioningCreateExistsExceptionWithFlush() throws IOException { - ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); - Engine.Index create = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false); - Engine.IndexResult indexResult = engine.index(create); - assertThat(indexResult.getVersion(), equalTo(1L)); - - engine.flush(); - - create = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false); - indexResult = engine.index(create); - assertTrue(indexResult.hasFailure()); - assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); + protected List generateSingleDocHistory(boolean forReplica, boolean externalVersioning, boolean partialOldPrimary, + long primaryTerm, int minOpCount, int maxOpCount) { + final int numOfOps = randomIntBetween(minOpCount, maxOpCount); + final List ops = new ArrayList<>(); + final Term id = newUid(Uid.createUid("test", "1")); + final int startWithSeqNo; + if (partialOldPrimary) { + startWithSeqNo = randomBoolean() ? numOfOps - 1 : randomIntBetween(0, numOfOps - 1); + } else { + startWithSeqNo = 0; + } + final String valuePrefix = forReplica ? "r_" : "p_"; + final boolean incrementTermWhenIntroducingSeqNo = randomBoolean(); + final VersionType versionType = externalVersioning ? VersionType.EXTERNAL : VersionType.INTERNAL; + for (int i = 0; i < numOfOps; i++) { + final Engine.Operation op; + if (randomBoolean()) { + op = new Engine.Index(id, testParsedDocument("1", "test", null, testDocumentWithTextField(valuePrefix + i), B_1, null), + forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbersService.UNASSIGNED_SEQ_NO, + forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, + forReplica || externalVersioning ? i : Versions.MATCH_ANY, + forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType, + forReplica ? REPLICA : PRIMARY, + System.currentTimeMillis(), -1, false + ); + } else { + op = new Engine.Delete("test", "1", id, + forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbersService.UNASSIGNED_SEQ_NO, + forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, + forReplica || externalVersioning ? i : Versions.MATCH_ANY, + forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType, + forReplica ? REPLICA : PRIMARY, + System.currentTimeMillis()); + } + ops.add(op); + } + return ops; } - public void testVersioningReplicaConflict1() throws IOException { - final ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); - final Engine.Index v1Index = indexForDoc(doc); - final Engine.IndexResult v1Result = engine.index(v1Index); - assertThat(v1Result.getVersion(), equalTo(1L)); - - final Engine.Index v2Index = indexForDoc(doc); - final Engine.IndexResult v2Result = engine.index(v2Index); - assertThat(v2Result.getVersion(), equalTo(2L)); - - // apply the second index to the replica, should work fine - final Engine.Index replicaV2Index = new Engine.Index( - newUid(doc), - doc, - v2Result.getSeqNo(), - v2Index.primaryTerm(), - v2Result.getVersion(), - VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), - REPLICA, - 0, - -1, - false); - final Engine.IndexResult replicaV2Result = replicaEngine.index(replicaV2Index); - assertThat(replicaV2Result.getVersion(), equalTo(2L)); - - // now, the old one should produce an indexing result - final Engine.Index replicaV1Index = new Engine.Index( - newUid(doc), - doc, - v1Result.getSeqNo(), - v1Index.primaryTerm(), - v1Result.getVersion(), - VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), - REPLICA, - 0, - -1, - false); - final Engine.IndexResult replicaV1Result = replicaEngine.index(replicaV1Index); - assertFalse(replicaV1Result.hasFailure()); - assertFalse(replicaV1Result.isCreated()); - assertThat(replicaV1Result.getVersion(), equalTo(2L)); - - // second version on replica should fail as well - final Engine.IndexResult replicaV2ReplayResult = replicaEngine.index(replicaV2Index); - assertFalse(replicaV2Result.hasFailure()); - assertFalse(replicaV1Result.isCreated()); - assertThat(replicaV2ReplayResult.getVersion(), equalTo(2L)); + public void testOutOfOrderDocsOnReplica() throws IOException { + final List ops = generateSingleDocHistory(true, true, false, 2, 2, 20); + assertOpsOnReplica(ops, replicaEngine); } - public void testVersioningReplicaConflict2() throws IOException { - final ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); - final Engine.Index v1Index = indexForDoc(doc); - final Engine.IndexResult v1Result = engine.index(v1Index); - assertThat(v1Result.getVersion(), equalTo(1L)); + public void testOutOfOrderDocsOnReplicaOldPrimary() throws IOException { + IndexSettings oldSettings = IndexSettingsModule.newIndexSettings("testOld", Settings.builder() + .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), "1h") // make sure this doesn't kick in on us + .put(EngineConfig.INDEX_CODEC_SETTING.getKey(), codecName) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_5_4_0_UNRELEASED) + .put(IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.getKey(), + between(10, 10 * IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.get(Settings.EMPTY))) + .build()); - // apply the first index to the replica, should work fine - final Engine.Index replicaV1Index = new Engine.Index( - newUid(doc), - doc, - v1Result.getSeqNo(), - v1Index.primaryTerm(), - v1Result.getVersion(), - VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), - REPLICA, - 0, - -1, - false); - Engine.IndexResult replicaV1Result = replicaEngine.index(replicaV1Index); - assertThat(replicaV1Result.getVersion(), equalTo(1L)); + try (Store oldReplicaStore = createStore(); + InternalEngine replicaEngine = + createEngine(oldSettings, oldReplicaStore, createTempDir("translog-old-replica"), newMergePolicy())) { + final List ops = generateSingleDocHistory(true, true, true, 2, 2, 20); + assertOpsOnReplica(ops, replicaEngine); + } + } - // index it again - final Engine.Index v2Index = indexForDoc(doc); - final Engine.IndexResult v2Result = engine.index(v2Index); - assertThat(v2Result.getVersion(), equalTo(2L)); + private void assertOpsOnReplica(List ops, InternalEngine replicaEngine) throws IOException { + final Engine.Operation lastOp = ops.get(ops.size() - 1); + final String lastFieldValue; + if (lastOp instanceof Engine.Index) { + Engine.Index index = (Engine.Index) lastOp; + lastFieldValue = index.docs().get(0).get("value"); + } else { + // delete + lastFieldValue = null; + } + int firstOpWithSeqNo = 0; + while (firstOpWithSeqNo < ops.size() && ops.get(firstOpWithSeqNo).seqNo() < 0) { + firstOpWithSeqNo++; + } + // shuffle ops but make sure legacy ops are first + shuffle(ops.subList(0, firstOpWithSeqNo), random()); + shuffle(ops.subList(firstOpWithSeqNo, ops.size()), random()); + boolean firstOp = true; + for (Engine.Operation op : ops) { + logger.info("performing [{}], v [{}], seq# [{}], term [{}]", + op.operationType().name().charAt(0), op.version(), op.seqNo(), op.primaryTerm()); + if (op instanceof Engine.Index) { + Engine.IndexResult result = replicaEngine.index((Engine.Index) op); + // replicas don't really care to about creation status of documents + // this allows to ignore the case where a document was found in the live version maps in + // a delete state and return false for the created flag in favor of code simplicity + // as deleted or not. This check is just signal regression so a decision can be made if it's + // intentional + assertThat(result.isCreated(), equalTo(firstOp)); + assertThat(result.getVersion(), equalTo(op.version())); + assertThat(result.hasFailure(), equalTo(false)); - // now delete it - final Engine.Delete delete = new Engine.Delete("test", "1", newUid(doc)); - final Engine.DeleteResult deleteResult = engine.delete(delete); - assertThat(deleteResult.getVersion(), equalTo(3L)); + } else { + Engine.DeleteResult result = replicaEngine.delete((Engine.Delete) op); + // Replicas don't really care to about found status of documents + // this allows to ignore the case where a document was found in the live version maps in + // a delete state and return true for the found flag in favor of code simplicity + // his check is just signal regression so a decision can be made if it's + // intentional + assertThat(result.isFound(), equalTo(firstOp == false)); + assertThat(result.getVersion(), equalTo(op.version())); + assertThat(result.hasFailure(), equalTo(false)); + } + if (randomBoolean()) { + engine.refresh("test"); + } if (randomBoolean()) { + engine.flush(); + } + firstOp = false; + } - // apply the delete on the replica (skipping the second index) - final Engine.Delete replicaDelete = new Engine.Delete( - "test", - "1", - newUid(doc), - deleteResult.getSeqNo(), - delete.primaryTerm(), - deleteResult.getVersion(), - VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), - REPLICA, - 0); - final Engine.DeleteResult replicaDeleteResult = replicaEngine.delete(replicaDelete); - assertThat(replicaDeleteResult.getVersion(), equalTo(3L)); + assertVisibleCount(replicaEngine, lastFieldValue == null ? 0 : 1); + if (lastFieldValue != null) { + try (Searcher searcher = replicaEngine.acquireSearcher("test")) { + final TotalHitCountCollector collector = new TotalHitCountCollector(); + searcher.searcher().search(new TermQuery(new Term("value", lastFieldValue)), collector); + assertThat(collector.getTotalHits(), equalTo(1)); + } + } + } - // second time delete with same version should just produce the same version - final Engine.DeleteResult deleteReplayResult = replicaEngine.delete(replicaDelete); - assertFalse(deleteReplayResult.hasFailure()); - assertTrue(deleteReplayResult.isFound()); - assertThat(deleteReplayResult.getVersion(), equalTo(3L)); + public void testConcurrentOutOfDocsOnReplica() throws IOException, InterruptedException { + final List ops = generateSingleDocHistory(true, true, false, 2, 100, 300); + final Engine.Operation lastOp = ops.get(ops.size() - 1); + final String lastFieldValue; + if (lastOp instanceof Engine.Index) { + Engine.Index index = (Engine.Index) lastOp; + lastFieldValue = index.docs().get(0).get("value"); + } else { + // delete + lastFieldValue = null; + } + shuffle(ops, random()); + concurrentlyApplyOps(ops, engine); - // now do the second index on the replica, it should result in the current version - final Engine.Index replicaV2Index = new Engine.Index( - newUid(doc), - doc, - v2Result.getSeqNo(), - v2Index.primaryTerm(), - v2Result.getVersion(), - VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), - REPLICA, - 0, - -1, - false); - final Engine.IndexResult replicaV2Result = replicaEngine.index(replicaV2Index); - assertFalse(replicaV2Result.hasFailure()); - assertFalse(replicaV2Result.isCreated()); - assertThat(replicaV2Result.getVersion(), equalTo(3L)); + assertVisibleCount(engine, lastFieldValue == null ? 0 : 1); + if (lastFieldValue != null) { + try (Searcher searcher = engine.acquireSearcher("test")) { + final TotalHitCountCollector collector = new TotalHitCountCollector(); + searcher.searcher().search(new TermQuery(new Term("value", lastFieldValue)), collector); + assertThat(collector.getTotalHits(), equalTo(1)); + } + } + } + + private void concurrentlyApplyOps(List ops, InternalEngine engine) throws InterruptedException { + Thread[] thread = new Thread[randomIntBetween(3, 5)]; + CountDownLatch startGun = new CountDownLatch(thread.length); + AtomicInteger offset = new AtomicInteger(-1); + for (int i = 0; i < thread.length; i++) { + thread[i] = new Thread(() -> { + startGun.countDown(); + try { + startGun.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + int docOffset; + while ((docOffset = offset.incrementAndGet()) < ops.size()) { + try { + final Engine.Operation op = ops.get(docOffset); + if (op instanceof Engine.Index) { + engine.index((Engine.Index)op); + } else { + engine.delete((Engine.Delete)op); + } + if ((docOffset + 1) % 4 == 0) { + engine.refresh("test"); + } + } catch (IOException e) { + throw new AssertionError(e); + } + } + }); + thread[i].start(); + } + for (int i = 0; i < thread.length; i++) { + thread[i].join(); + } + } + + public void testInternalVersioningOnPrimary() throws IOException { + final List ops = generateSingleDocHistory(false, false, false, 2, 2, 20); + assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine); + } + + private int assertOpsOnPrimary(List ops, long currentOpVersion, boolean docDeleted, InternalEngine engine) + throws IOException { + String lastFieldValue = null; + int opsPerformed = 0; + long lastOpVersion = currentOpVersion; + BiFunction indexWithVersion = (version, index) -> new Engine.Index(index.uid(), index.parsedDoc(), + index.seqNo(), index.primaryTerm(), version, index.versionType(), index.origin(), index.startTime(), + index.getAutoGeneratedIdTimestamp(), index.isRetry()); + BiFunction delWithVersion = (version, delete) -> new Engine.Delete(delete.type(), delete.id(), + delete.uid(), delete.seqNo(), delete.primaryTerm(), version, delete.versionType(), delete.origin(), delete.startTime()); + for (Engine.Operation op : ops) { + final boolean versionConflict = rarely(); + final boolean versionedOp = versionConflict || randomBoolean(); + final long conflictingVersion = docDeleted || randomBoolean() ? + lastOpVersion + (randomBoolean() ? 1 : -1) : + Versions.MATCH_DELETED; + final long correctVersion = docDeleted && randomBoolean() ? Versions.MATCH_DELETED : lastOpVersion; + logger.info("performing [{}]{}{}", + op.operationType().name().charAt(0), + versionConflict ? " (conflict " + conflictingVersion +")" : "", + versionedOp ? " (versioned " + correctVersion + ")" : ""); + if (op instanceof Engine.Index) { + final Engine.Index index = (Engine.Index) op; + if (versionConflict) { + // generate a conflict + Engine.IndexResult result = engine.index(indexWithVersion.apply(conflictingVersion, index)); + assertThat(result.isCreated(), equalTo(false)); + assertThat(result.getVersion(), equalTo(lastOpVersion)); + assertThat(result.hasFailure(), equalTo(true)); + assertThat(result.getFailure(), instanceOf(VersionConflictEngineException.class)); + } else { + Engine.IndexResult result = engine.index(versionedOp ? indexWithVersion.apply(correctVersion, index) : index); + assertThat(result.isCreated(), equalTo(docDeleted)); + assertThat(result.getVersion(), equalTo(Math.max(lastOpVersion + 1, 1))); + assertThat(result.hasFailure(), equalTo(false)); + assertThat(result.getFailure(), nullValue()); + lastFieldValue = index.docs().get(0).get("value"); + docDeleted = false; + lastOpVersion = result.getVersion(); + opsPerformed++; + } + } else { + final Engine.Delete delete = (Engine.Delete) op; + if (versionConflict) { + // generate a conflict + Engine.DeleteResult result = engine.delete(delWithVersion.apply(conflictingVersion, delete)); + assertThat(result.isFound(), equalTo(docDeleted == false)); + assertThat(result.getVersion(), equalTo(lastOpVersion)); + assertThat(result.hasFailure(), equalTo(true)); + assertThat(result.getFailure(), instanceOf(VersionConflictEngineException.class)); + } else { + Engine.DeleteResult result = engine.delete(versionedOp ? delWithVersion.apply(correctVersion, delete) : delete); + assertThat(result.isFound(), equalTo(docDeleted == false)); + assertThat(result.getVersion(), equalTo(Math.max(lastOpVersion + 1, 1))); + assertThat(result.hasFailure(), equalTo(false)); + assertThat(result.getFailure(), nullValue()); + docDeleted = true; + lastOpVersion = result.getVersion(); + opsPerformed++; + } + } + if (randomBoolean()) { + // refresh and take the chance to check everything is ok so far + assertVisibleCount(engine, docDeleted ? 0 : 1); + if (docDeleted == false) { + try (Searcher searcher = engine.acquireSearcher("test")) { + final TotalHitCountCollector collector = new TotalHitCountCollector(); + searcher.searcher().search(new TermQuery(new Term("value", lastFieldValue)), collector); + assertThat(collector.getTotalHits(), equalTo(1)); + } + } + } + if (randomBoolean()) { + engine.flush(); + } + + if (rarely()) { + // simulate GC deletes + engine.refresh("gc_simulation"); + engine.clearDeletedTombstones(); + if (docDeleted) { + lastOpVersion = Versions.NOT_FOUND; + } + } + } + + assertVisibleCount(engine, docDeleted ? 0 : 1); + if (docDeleted == false) { + try (Searcher searcher = engine.acquireSearcher("test")) { + final TotalHitCountCollector collector = new TotalHitCountCollector(); + searcher.searcher().search(new TermQuery(new Term("value", lastFieldValue)), collector); + assertThat(collector.getTotalHits(), equalTo(1)); + } + } + return opsPerformed; + } + + public void testExternalVersioningOnPrimary() throws IOException { + final List ops = generateSingleDocHistory(false, true, false, 2, 2, 20); + final Engine.Operation lastOp = ops.get(ops.size() - 1); + final String lastFieldValue; + if (lastOp instanceof Engine.Index) { + Engine.Index index = (Engine.Index) lastOp; + lastFieldValue = index.docs().get(0).get("value"); + } else { + // delete + lastFieldValue = null; + } + shuffle(ops, random()); + long highestOpVersion = Versions.NOT_FOUND; + long seqNo = -1; + boolean docDeleted = true; + for (Engine.Operation op : ops) { + logger.info("performing [{}], v [{}], seq# [{}], term [{}]", + op.operationType().name().charAt(0), op.version(), op.seqNo(), op.primaryTerm()); + if (op instanceof Engine.Index) { + final Engine.Index index = (Engine.Index) op; + Engine.IndexResult result = engine.index(index); + if (op.version() > highestOpVersion) { + seqNo++; + assertThat(result.getSeqNo(), equalTo(seqNo)); + assertThat(result.isCreated(), equalTo(docDeleted)); + assertThat(result.getVersion(), equalTo(op.version())); + assertThat(result.hasFailure(), equalTo(false)); + assertThat(result.getFailure(), nullValue()); + docDeleted = false; + highestOpVersion = op.version(); + } else { + assertThat(result.isCreated(), equalTo(false)); + assertThat(result.getVersion(), equalTo(highestOpVersion)); + assertThat(result.hasFailure(), equalTo(true)); + assertThat(result.getFailure(), instanceOf(VersionConflictEngineException.class)); + } + } else { + final Engine.Delete delete = (Engine.Delete) op; + Engine.DeleteResult result = engine.delete(delete); + if (op.version() > highestOpVersion) { + seqNo++; + assertThat(result.getSeqNo(), equalTo(seqNo)); + assertThat(result.isFound(), equalTo(docDeleted == false)); + assertThat(result.getVersion(), equalTo(op.version())); + assertThat(result.hasFailure(), equalTo(false)); + assertThat(result.getFailure(), nullValue()); + docDeleted = true; + highestOpVersion = op.version(); + } else { + assertThat(result.isFound(), equalTo(docDeleted == false)); + assertThat(result.getVersion(), equalTo(highestOpVersion)); + assertThat(result.hasFailure(), equalTo(true)); + assertThat(result.getFailure(), instanceOf(VersionConflictEngineException.class)); + } + } + if (randomBoolean()) { + engine.refresh("test"); + } + if (randomBoolean()) { + engine.flush(); + } + } + + assertVisibleCount(engine, docDeleted ? 0 : 1); + if (docDeleted == false) { + try (Searcher searcher = engine.acquireSearcher("test")) { + final TotalHitCountCollector collector = new TotalHitCountCollector(); + searcher.searcher().search(new TermQuery(new Term("value", lastFieldValue)), collector); + assertThat(collector.getTotalHits(), equalTo(1)); + } + } + } + + public void testVersioningPromotedReplica() throws IOException { + final List replicaOps = generateSingleDocHistory(true, true, false, 1, 2, 20); + List primaryOps = generateSingleDocHistory(false, false, false, 2, 2, 20); + Engine.Operation lastReplicaOp = replicaOps.get(replicaOps.size() - 1); + final boolean deletedOnReplica = lastReplicaOp instanceof Engine.Delete; + final long finalReplicaVersion = lastReplicaOp.version(); + final long finalReplicaSeqNo = lastReplicaOp.seqNo(); + assertOpsOnReplica(replicaOps, replicaEngine); + final int opsOnPrimary = assertOpsOnPrimary(primaryOps, finalReplicaVersion, deletedOnReplica, replicaEngine); + final long currentSeqNo = getSequenceID(replicaEngine, new Engine.Get(false, lastReplicaOp.uid())).v1(); + try (Searcher searcher = engine.acquireSearcher("test")) { + final TotalHitCountCollector collector = new TotalHitCountCollector(); + searcher.searcher().search(new MatchAllDocsQuery(), collector); + if (collector.getTotalHits() > 0) { + // last op wasn't delete + assertThat(currentSeqNo, equalTo(finalReplicaSeqNo + opsOnPrimary)); + } + } + } + + public void testConcurrentExternalVersioningOnPrimary() throws IOException, InterruptedException { + final List ops = generateSingleDocHistory(false, true, false, 2, 100, 300); + final Engine.Operation lastOp = ops.get(ops.size() - 1); + final String lastFieldValue; + if (lastOp instanceof Engine.Index) { + Engine.Index index = (Engine.Index) lastOp; + lastFieldValue = index.docs().get(0).get("value"); + } else { + // delete + lastFieldValue = null; + } + shuffle(ops, random()); + concurrentlyApplyOps(ops, engine); + + assertVisibleCount(engine, lastFieldValue == null ? 0 : 1); + if (lastFieldValue != null) { + try (Searcher searcher = engine.acquireSearcher("test")) { + final TotalHitCountCollector collector = new TotalHitCountCollector(); + searcher.searcher().search(new TermQuery(new Term("value", lastFieldValue)), collector); + assertThat(collector.getTotalHits(), equalTo(1)); + } + } + } + + public void testConcurrentGetAndSetOnPrimary() throws IOException, InterruptedException { + Thread[] thread = new Thread[randomIntBetween(3, 5)]; + CountDownLatch startGun = new CountDownLatch(thread.length); + final int opsPerThread = randomIntBetween(10, 20); + final Set currentValues = ConcurrentCollections.newConcurrentSet(); + final AtomicInteger idGenerator = new AtomicInteger(); + ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), bytesArray(""), null); + final Term uidTerm = newUid(doc); + engine.index(indexForDoc(doc)); + for (int i = 0; i < thread.length; i++) { + thread[i] = new Thread(() -> { + startGun.countDown(); + try { + startGun.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + for (int op = 0; op < opsPerThread; op++) { + try (Engine.GetResult get = engine.get(new Engine.Get(true, uidTerm))) { + FieldsVisitor visitor = new FieldsVisitor(true); + get.docIdAndVersion().context.reader().document(get.docIdAndVersion().docId, visitor); + List values = new ArrayList<>(Strings.commaDelimitedListToSet(visitor.source().utf8ToString())); + String removed = op % 3 == 0 && values.size() > 0 ? values.remove(0) : null; + String added = "v_" + idGenerator.incrementAndGet(); + values.add(added); + Engine.Index index = new Engine.Index(uidTerm, + testParsedDocument("1", "test", null, testDocument(), + bytesArray(Strings.collectionToCommaDelimitedString(values)), null), + SequenceNumbersService.UNASSIGNED_SEQ_NO, 2, + get.version(), VersionType.INTERNAL, + PRIMARY, System.currentTimeMillis(), -1, false); + Engine.IndexResult indexResult = engine.index(index); + if (indexResult.hasFailure() == false) { + boolean exists = removed == null ? true : currentValues.remove(removed); + assertTrue(removed + " should exist", exists); + exists = currentValues.add(added); + assertTrue(added + " should not exist", exists); + } + + } catch (IOException e) { + throw new AssertionError(e); + } + } + }); + thread[i].start(); + } + for (int i = 0; i < thread.length; i++) { + thread[i].join(); + } + try (Engine.GetResult get = engine.get(new Engine.Get(true, uidTerm))) { + FieldsVisitor visitor = new FieldsVisitor(true); + get.docIdAndVersion().context.reader().document(get.docIdAndVersion().docId, visitor); + List values = Arrays.asList(Strings.commaDelimitedListToStringArray(visitor.source().utf8ToString())); + assertThat(currentValues, equalTo(new HashSet<>(values))); + } } public void testBasicCreatedFlag() throws IOException { @@ -1633,21 +1784,6 @@ public class InternalEngineTests extends ESTestCase { assertTrue(indexResult.isCreated()); } - public void testCreatedFlagAfterFlush() throws IOException { - ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); - Engine.Index index = indexForDoc(doc); - Engine.IndexResult indexResult = engine.index(index); - assertTrue(indexResult.isCreated()); - - engine.delete(new Engine.Delete(null, "1", newUid(doc))); - - engine.flush(); - - index = indexForDoc(doc); - indexResult = engine.index(index); - assertTrue(indexResult.isCreated()); - } - private static class MockAppender extends AbstractAppender { public boolean sawIndexWriterMessage; @@ -2035,6 +2171,13 @@ public class InternalEngineTests extends ESTestCase { return new Engine.Index(newUid(doc), doc); } + private Engine.Index replicaIndexForDoc(ParsedDocument doc, long version, long seqNo, + boolean isRetry) { + return new Engine.Index(newUid(doc), doc, seqNo, 1, version, VersionType.EXTERNAL, + Engine.Operation.Origin.REPLICA, System.nanoTime(), + IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry); + } + public void testExtractShardId() { try (Engine.Searcher test = this.engine.acquireSearcher("test")) { ShardId shardId = ShardUtils.extractShardId(test.getDirectoryReader()); @@ -2122,11 +2265,7 @@ public class InternalEngineTests extends ESTestCase { Engine.IndexResult indexResult = engine.index(firstIndexRequest); assertThat(indexResult.getVersion(), equalTo(1L)); } - engine.refresh("test"); - try (Engine.Searcher searcher = engine.acquireSearcher("test")) { - TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10)); - assertThat(topDocs.totalHits, equalTo(numDocs)); - } + assertVisibleCount(engine, numDocs); engine.close(); final MockDirectoryWrapper directory = DirectoryUtils.getLeaf(store.directory(), MockDirectoryWrapper.class); if (directory != null) { @@ -2158,9 +2297,21 @@ public class InternalEngineTests extends ESTestCase { // no mock directory, no fun. engine = createEngine(store, primaryTranslogDir); } - try (Engine.Searcher searcher = engine.acquireSearcher("test")) { - TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10)); - assertThat(topDocs.totalHits, equalTo(numDocs)); + assertVisibleCount(engine, numDocs, false); + } + + private static void assertVisibleCount(InternalEngine engine, int numDocs) throws IOException { + assertVisibleCount(engine, numDocs, true); + } + + private static void assertVisibleCount(InternalEngine engine, int numDocs, boolean refresh) throws IOException { + if (refresh) { + engine.refresh("test"); + } + try (Searcher searcher = engine.acquireSearcher("test")) { + final TotalHitCountCollector collector = new TotalHitCountCollector(); + searcher.searcher().search(new MatchAllDocsQuery(), collector); + assertThat(collector.getTotalHits(), equalTo(numDocs)); } } @@ -2172,11 +2323,7 @@ public class InternalEngineTests extends ESTestCase { Engine.IndexResult indexResult = engine.index(firstIndexRequest); assertThat(indexResult.getVersion(), equalTo(1L)); } - engine.refresh("test"); - try (Engine.Searcher searcher = engine.acquireSearcher("test")) { - TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10)); - assertThat(topDocs.totalHits, equalTo(numDocs)); - } + assertVisibleCount(engine, numDocs); engine.close(); engine = new InternalEngine(engine.config()); @@ -2294,11 +2441,7 @@ public class InternalEngineTests extends ESTestCase { Engine.IndexResult indexResult = engine.index(firstIndexRequest); assertThat(indexResult.getVersion(), equalTo(1L)); } - engine.refresh("test"); - try (Engine.Searcher searcher = engine.acquireSearcher("test")) { - TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10)); - assertThat(topDocs.totalHits, equalTo(numDocs)); - } + assertVisibleCount(engine, numDocs); TranslogHandler parser = (TranslogHandler) engine.config().getTranslogRecoveryPerformer(); parser.mappingUpdate = dynamicUpdate(); @@ -2307,10 +2450,7 @@ public class InternalEngineTests extends ESTestCase { engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); // we need to reuse the engine config unless the parser.mappingModified won't work engine.recoverFromTranslog(); - try (Engine.Searcher searcher = engine.acquireSearcher("test")) { - TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10)); - assertThat(topDocs.totalHits, equalTo(numDocs)); - } + assertVisibleCount(engine, numDocs, false); parser = (TranslogHandler) engine.config().getTranslogRecoveryPerformer(); assertEquals(numDocs, parser.recoveredOps.get()); if (parser.mappingUpdate != null) { @@ -2322,10 +2462,7 @@ public class InternalEngineTests extends ESTestCase { engine.close(); engine = createEngine(store, primaryTranslogDir); - try (Engine.Searcher searcher = engine.acquireSearcher("test")) { - TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10)); - assertThat(topDocs.totalHits, equalTo(numDocs)); - } + assertVisibleCount(engine, numDocs, false); parser = (TranslogHandler) engine.config().getTranslogRecoveryPerformer(); assertEquals(0, parser.recoveredOps.get()); @@ -2411,11 +2548,7 @@ public class InternalEngineTests extends ESTestCase { Engine.IndexResult index = engine.index(firstIndexRequest); assertThat(index.getVersion(), equalTo(1L)); } - engine.refresh("test"); - try (Engine.Searcher searcher = engine.acquireSearcher("test")) { - TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10)); - assertThat(topDocs.totalHits, equalTo(numDocs)); - } + assertVisibleCount(engine, numDocs); Translog.TranslogGeneration generation = engine.getTranslog().getGeneration(); engine.close(); @@ -2444,10 +2577,7 @@ public class InternalEngineTests extends ESTestCase { } engine = createEngine(store, primaryTranslogDir); // and recover again! - try (Engine.Searcher searcher = engine.acquireSearcher("test")) { - TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10)); - assertThat(topDocs.totalHits, equalTo(numDocs)); - } + assertVisibleCount(engine, numDocs, false); } public void testShardNotAvailableExceptionWhenEngineClosedConcurrently() throws IOException, InterruptedException { @@ -2625,10 +2755,11 @@ public class InternalEngineTests extends ESTestCase { // all these simulated exceptions are not fatal to the IW so we treat them as document failures if (randomBoolean()) { throwingIndexWriter.get().setThrowFailure(() -> new IOException("simulated")); - expectThrows(IOException.class, () -> engine.delete(new Engine.Delete("test", "1", newUid(doc1)))); + assertThat(engine.delete(new Engine.Delete("test", "1", newUid(doc1))).getFailure(), instanceOf(IOException.class)); } else { throwingIndexWriter.get().setThrowFailure(() -> new IllegalArgumentException("simulated max token length")); - expectThrows(IllegalArgumentException.class, () -> engine.delete(new Engine.Delete("test", "1", newUid(doc1)))); + assertThat(engine.delete(new Engine.Delete("test", "1", newUid(doc1))).getFailure(), + instanceOf(IllegalArgumentException.class)); } // test non document level failure is thrown @@ -2667,10 +2798,10 @@ public class InternalEngineTests extends ESTestCase { } } - public void testDoubleDelivery() throws IOException { + public void testDoubleDeliveryPrimary() throws IOException { final ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); - Engine.Index operation = randomAppendOnly(doc, false, 1); - Engine.Index retry = randomAppendOnly(doc, true, 1); + Engine.Index operation = appendOnlyPrimary(doc, false, 1); + Engine.Index retry = appendOnlyPrimary(doc, true, 1); if (randomBoolean()) { Engine.IndexResult indexResult = engine.index(operation); assertFalse(engine.indexWriterHasDeletions()); @@ -2721,6 +2852,105 @@ public class InternalEngineTests extends ESTestCase { } } + public void testDoubleDeliveryReplicaAppendingOnly() throws IOException { + final ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), + new BytesArray("{}".getBytes(Charset.defaultCharset())), null); + Engine.Index operation = appendOnlyReplica(doc, false, 1, randomIntBetween(0, 20)); + Engine.Index retry = appendOnlyReplica(doc, true, 1, operation.seqNo()); + if (randomBoolean()) { + Engine.IndexResult indexResult = engine.index(operation); + assertFalse(engine.indexWriterHasDeletions()); + assertEquals(0, engine.getNumVersionLookups()); + assertNotNull(indexResult.getTranslogLocation()); + Engine.IndexResult retryResult = engine.index(retry); + assertFalse(engine.indexWriterHasDeletions()); + assertEquals(1, engine.getNumVersionLookups()); + assertNotNull(retryResult.getTranslogLocation()); + assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0); + } else { + Engine.IndexResult retryResult = engine.index(retry); + assertFalse(engine.indexWriterHasDeletions()); + assertEquals(1, engine.getNumVersionLookups()); + assertNotNull(retryResult.getTranslogLocation()); + Engine.IndexResult indexResult = engine.index(operation); + assertFalse(engine.indexWriterHasDeletions()); + assertEquals(2, engine.getNumVersionLookups()); + assertNotNull(retryResult.getTranslogLocation()); + assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0); + } + + engine.refresh("test"); + try (Engine.Searcher searcher = engine.acquireSearcher("test")) { + TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10); + assertEquals(1, topDocs.totalHits); + } + operation = randomAppendOnly(doc, false, 1); + retry = randomAppendOnly(doc, true, 1); + if (randomBoolean()) { + Engine.IndexResult indexResult = engine.index(operation); + assertNotNull(indexResult.getTranslogLocation()); + Engine.IndexResult retryResult = engine.index(retry); + assertNotNull(retryResult.getTranslogLocation()); + assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0); + } else { + Engine.IndexResult retryResult = engine.index(retry); + assertNotNull(retryResult.getTranslogLocation()); + Engine.IndexResult indexResult = engine.index(operation); + assertNotNull(retryResult.getTranslogLocation()); + assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0); + } + + engine.refresh("test"); + try (Engine.Searcher searcher = engine.acquireSearcher("test")) { + TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10); + assertEquals(1, topDocs.totalHits); + } + } + + public void testDoubleDeliveryReplica() throws IOException { + final ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), + new BytesArray("{}".getBytes(Charset.defaultCharset())), null); + Engine.Index operation = replicaIndexForDoc(doc, 1, 20, false); + Engine.Index duplicate = replicaIndexForDoc(doc, 1, 20, true); + if (randomBoolean()) { + Engine.IndexResult indexResult = engine.index(operation); + assertFalse(engine.indexWriterHasDeletions()); + assertEquals(1, engine.getNumVersionLookups()); + assertNotNull(indexResult.getTranslogLocation()); + if (randomBoolean()) { + engine.refresh("test"); + } + Engine.IndexResult retryResult = engine.index(duplicate); + assertFalse(engine.indexWriterHasDeletions()); + assertEquals(2, engine.getNumVersionLookups()); + assertNotNull(retryResult.getTranslogLocation()); + assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0); + } else { + Engine.IndexResult retryResult = engine.index(duplicate); + assertFalse(engine.indexWriterHasDeletions()); + assertEquals(1, engine.getNumVersionLookups()); + assertNotNull(retryResult.getTranslogLocation()); + if (randomBoolean()) { + engine.refresh("test"); + } + Engine.IndexResult indexResult = engine.index(operation); + assertFalse(engine.indexWriterHasDeletions()); + assertEquals(2, engine.getNumVersionLookups()); + assertNotNull(retryResult.getTranslogLocation()); + assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0); + } + + engine.refresh("test"); + try (Engine.Searcher searcher = engine.acquireSearcher("test")) { + TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10); + assertEquals(1, topDocs.totalHits); + } + engine.refresh("test"); + try (Engine.Searcher searcher = engine.acquireSearcher("test")) { + TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10); + assertEquals(1, topDocs.totalHits); + } + } public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOException { @@ -2791,10 +3021,19 @@ public class InternalEngineTests extends ESTestCase { public Engine.Index randomAppendOnly(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp) { if (randomBoolean()) { - return new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, - VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, retry); + return appendOnlyPrimary(doc, retry, autoGeneratedIdTimestamp); + } else { + return appendOnlyReplica(doc, retry, autoGeneratedIdTimestamp, 0); } - return new Engine.Index(newUid(doc), doc, 0, 0, 1, VersionType.EXTERNAL, + } + + public Engine.Index appendOnlyPrimary(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp) { + return new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, + VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, retry); + } + + public Engine.Index appendOnlyReplica(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp, final long seqNo) { + return new Engine.Index(newUid(doc), doc, seqNo, 2, 1, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, retry); } @@ -2802,10 +3041,18 @@ public class InternalEngineTests extends ESTestCase { Thread[] thread = new Thread[randomIntBetween(3, 5)]; int numDocs = randomIntBetween(1000, 10000); List docs = new ArrayList<>(); + final boolean primary = randomBoolean(); for (int i = 0; i < numDocs; i++) { final ParsedDocument doc = testParsedDocument(Integer.toString(i), "test", null, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); - Engine.Index originalIndex = randomAppendOnly(doc, false, i); - Engine.Index retryIndex = randomAppendOnly(doc, true, i); + final Engine.Index originalIndex; + final Engine.Index retryIndex; + if (primary) { + originalIndex = appendOnlyPrimary(doc, false, i); + retryIndex = appendOnlyPrimary(doc, true, i); + } else { + originalIndex = appendOnlyReplica(doc, false, i, i * 2); + retryIndex = appendOnlyReplica(doc, true, i, i * 2); + } docs.add(originalIndex); docs.add(retryIndex); } @@ -2834,14 +3081,26 @@ public class InternalEngineTests extends ESTestCase { for (int i = 0; i < thread.length; i++) { thread[i].join(); } - assertEquals(0, engine.getNumVersionLookups()); - assertEquals(0, engine.getNumIndexVersionsLookups()); + if (primary) { + assertEquals(0, engine.getNumVersionLookups()); + assertEquals(0, engine.getNumIndexVersionsLookups()); + } else { + // we don't really know what order the operations will arrive and thus can't predict how many + // version lookups will be needed + assertThat(engine.getNumIndexVersionsLookups(), lessThanOrEqualTo(engine.getNumVersionLookups())); + } engine.refresh("test"); try (Engine.Searcher searcher = engine.acquireSearcher("test")) { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(numDocs, topDocs.totalHits); } - assertTrue(engine.indexWriterHasDeletions()); + if (primary) { + // primaries rely on lucene dedup and may index the same document twice + assertTrue(engine.indexWriterHasDeletions()); + } else { + // replicas rely on seq# based dedup and in this setup (same seq#) should never rely on lucene + assertFalse(engine.indexWriterHasDeletions()); + } } public void testEngineMaxTimestampIsInitialized() throws IOException { @@ -3001,7 +3260,7 @@ public class InternalEngineTests extends ESTestCase { seqID = getSequenceID(engine, new Engine.Get(false, newUid(doc))); logger.info("--> got seqID: {}", seqID); assertThat(seqID.v1(), equalTo(0L)); - assertThat(seqID.v2(), equalTo(0L)); + assertThat(seqID.v2(), equalTo(2L)); // Index the same document again document = testDocumentWithTextField(); @@ -3013,13 +3272,13 @@ public class InternalEngineTests extends ESTestCase { seqID = getSequenceID(engine, new Engine.Get(false, newUid(doc))); logger.info("--> got seqID: {}", seqID); assertThat(seqID.v1(), equalTo(1L)); - assertThat(seqID.v2(), equalTo(0L)); + assertThat(seqID.v2(), equalTo(2L)); // Index the same document for the third time, this time changing the primary term document = testDocumentWithTextField(); document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE)); doc = testParsedDocument("1", "test", null, document, B_1, null); - engine.index(new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 1, + engine.index(new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 3, Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false)); engine.refresh("test"); @@ -3027,7 +3286,7 @@ public class InternalEngineTests extends ESTestCase { seqID = getSequenceID(engine, new Engine.Get(false, newUid(doc))); logger.info("--> got seqID: {}", seqID); assertThat(seqID.v1(), equalTo(2L)); - assertThat(seqID.v2(), equalTo(1L)); + assertThat(seqID.v2(), equalTo(3L)); // we can query by the _seq_no Engine.Searcher searchResult = engine.acquireSearcher("test"); @@ -3113,7 +3372,7 @@ public class InternalEngineTests extends ESTestCase { } public void testSequenceNumberAdvancesToMaxSeqNoOnEngineOpenOnReplica() throws IOException { - final long v = Versions.MATCH_ANY; + final long v = 1; final VersionType t = VersionType.EXTERNAL; final long ts = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; final int docs = randomIntBetween(1, 32); @@ -3150,6 +3409,7 @@ public class InternalEngineTests extends ESTestCase { } } + /** java docs */ public void testOutOfOrderSequenceNumbersWithVersionConflict() throws IOException { final List operations = new ArrayList<>(); @@ -3278,16 +3538,12 @@ public class InternalEngineTests extends ESTestCase { * second is the primary term. */ private Tuple getSequenceID(Engine engine, Engine.Get get) throws EngineException { - final Searcher searcher = engine.acquireSearcher("get"); - try { - long seqNum = Versions.loadSeqNo(searcher.reader(), get.uid()); - long primaryTerm = Versions.loadPrimaryTerm(searcher.reader(), get.uid()); - return new Tuple(seqNum, primaryTerm); + try (Searcher searcher = engine.acquireSearcher("get")) { + long seqNum = VersionsResolver.loadSeqNo(searcher.reader(), get.uid()); + long primaryTerm = VersionsResolver.loadPrimaryTerm(searcher.reader(), get.uid()); + return new Tuple<>(seqNum, primaryTerm); } catch (Exception e) { - Releasables.closeWhileHandlingException(searcher); throw new EngineException(shardId, "unable to retrieve sequence id", e); - } finally { - searcher.close(); } } diff --git a/core/src/test/java/org/elasticsearch/index/engine/VersionValueTests.java b/core/src/test/java/org/elasticsearch/index/engine/VersionValueTests.java index e66f55ff676..7af8ebc7580 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/VersionValueTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/VersionValueTests.java @@ -20,8 +20,6 @@ package org.elasticsearch.index.engine; import org.apache.lucene.util.RamUsageTester; -import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.index.translog.TranslogTests; import org.elasticsearch.test.ESTestCase; public class VersionValueTests extends ESTestCase { diff --git a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 39d8778c2a4..7a11f89b73b 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -98,13 +98,9 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase future.get(); thread.join(); shards.assertAllEqual(numDocs); - Engine engine = IndexShardTests.getEngineFromShard(replica); - assertEquals("expected at no version lookups ", InternalEngineTests.getNumVersionLookups((InternalEngine) engine), 0); - for (IndexShard shard : shards) { - engine = IndexShardTests.getEngineFromShard(shard); - assertEquals(0, InternalEngineTests.getNumIndexVersionsLookups((InternalEngine) engine)); - assertEquals(0, InternalEngineTests.getNumVersionLookups((InternalEngine) engine)); - } + Engine engine = IndexShardTests.getEngineFromShard(shards.getPrimary()); + assertEquals(0, InternalEngineTests.getNumIndexVersionsLookups((InternalEngine) engine)); + assertEquals(0, InternalEngineTests.getNumVersionLookups((InternalEngine) engine)); } } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 3858b6647f9..743510e373c 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -49,7 +49,6 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.SegmentsStats; -import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper;