From ecf81688fb0c0d0f0f85300d0cf9026d7eb535ef Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 14 Apr 2017 21:46:17 +0200 Subject: [PATCH] Use sequence numbers to identify out of order delivery in replicas & recovery (#24060) Internal indexing requests in Elasticsearch may be processed out of order and repeatedly. This is important during recovery and due to concurrency in replicating requests between primary and replicas. As such, a replica/recovering shard needs to be able to identify that an incoming request contains information that is old and thus need not be processed. The current logic is based on external version. This is sadly not sufficient. This PR moves the logic to rely on sequences numbers and primary terms which give the semantics we need. Relates to #10708 --- ... => PerThreadIDVersionAndSeqNoLookup.java} | 37 ++- .../lucene/uid/VersionsAndSeqNoResolver.java | 180 ++++++++++++ .../common/lucene/uid/VersionsResolver.java | 263 ------------------ .../index/engine/DeleteVersionValue.java | 15 +- .../elasticsearch/index/engine/Engine.java | 6 +- .../index/engine/InternalEngine.java | 92 ++++-- .../index/engine/LiveVersionMap.java | 8 +- .../index/engine/VersionValue.java | 24 +- .../index/get/ShardGetService.java | 2 +- .../index/mapper/ParseContext.java | 14 +- .../index/mapper/ParsedDocument.java | 4 +- .../index/mapper/SeqNoFieldMapper.java | 15 +- .../index/termvectors/TermVectorsService.java | 2 +- .../common/lucene/uid/VersionLookupTests.java | 6 +- .../common/lucene/uid/VersionsTests.java | 20 +- .../index/IndexingSlowLogTests.java | 2 +- .../index/engine/InternalEngineTests.java | 30 +- .../index/engine/LiveVersionMapTests.java | 4 +- .../index/engine/VersionValueTests.java | 4 +- .../ESIndexLevelReplicationTestCase.java | 26 +- .../IndexLevelReplicationTests.java | 28 ++ .../index/shard/IndexShardIT.java | 2 +- .../index/shard/IndexShardTests.java | 2 +- .../index/shard/RefreshListenersTests.java | 2 +- .../index/translog/TranslogTests.java | 2 +- .../recovery/RecoverySourceHandlerTests.java | 2 +- 26 files changed, 414 insertions(+), 378 deletions(-) rename core/src/main/java/org/elasticsearch/common/lucene/uid/{PerThreadIDAndVersionLookup.java => PerThreadIDVersionAndSeqNoLookup.java} (73%) create mode 100644 core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java delete mode 100644 core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsResolver.java diff --git a/core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDAndVersionLookup.java b/core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java similarity index 73% rename from core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDAndVersionLookup.java rename to core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java index caaf7fc84af..80977618c4b 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDAndVersionLookup.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java @@ -29,9 +29,12 @@ import org.apache.lucene.index.TermsEnum; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion; +import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo; +import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.mapper.VersionFieldMapper; +import org.elasticsearch.index.seqno.SequenceNumbersService; import java.io.IOException; @@ -43,7 +46,7 @@ import java.io.IOException; * in more than one document! It will only return the first one it * finds. */ -final class PerThreadIDAndVersionLookup { +final class PerThreadIDVersionAndSeqNoLookup { // TODO: do we really need to store all this stuff? some if it might not speed up anything. // we keep it around for now, to reduce the amount of e.g. hash lookups by field and stuff @@ -51,7 +54,10 @@ final class PerThreadIDAndVersionLookup { private final TermsEnum termsEnum; /** _version data */ private final NumericDocValues versions; - + /** _seq_no data */ + private final NumericDocValues seqNos; + /** _primary_term data */ + private final NumericDocValues primaryTerms; /** Reused for iteration (when the term exists) */ private PostingsEnum docsEnum; @@ -61,7 +67,7 @@ final class PerThreadIDAndVersionLookup { /** * Initialize lookup for the provided segment */ - PerThreadIDAndVersionLookup(LeafReader reader) throws IOException { + PerThreadIDVersionAndSeqNoLookup(LeafReader reader) throws IOException { Fields fields = reader.fields(); Terms terms = fields.terms(UidFieldMapper.NAME); termsEnum = terms.iterator(); @@ -74,6 +80,8 @@ final class PerThreadIDAndVersionLookup { throw new IllegalArgumentException("reader misses the [" + VersionFieldMapper.NAME + "] field"); } + seqNos = reader.getNumericDocValues(SeqNoFieldMapper.NAME); + primaryTerms = reader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); Object readerKey = null; assert (readerKey = reader.getCoreCacheKey()) != null; this.readerKey = readerKey; @@ -113,4 +121,25 @@ final class PerThreadIDAndVersionLookup { return DocIdSetIterator.NO_MORE_DOCS; } } + + /** Return null if id is not found. */ + DocIdAndSeqNo lookupSeqNo(BytesRef id, Bits liveDocs, LeafReaderContext context) throws IOException { + assert context.reader().getCoreCacheKey().equals(readerKey) : + "context's reader is not the same as the reader class was initialized on."; + int docID = getDocID(id, liveDocs); + if (docID != DocIdSetIterator.NO_MORE_DOCS) { + return new DocIdAndSeqNo(docID, seqNos == null ? SequenceNumbersService.UNASSIGNED_SEQ_NO : seqNos.get(docID), context); + } else { + return null; + } + } + + /** + * returns 0 if the primary term is not found. + * + * Note that 0 is an illegal primary term. See {@link org.elasticsearch.cluster.metadata.IndexMetaData#primaryTerm(int)} + **/ + long lookUpPrimaryTerm(int docID) throws IOException { + return primaryTerms == null ? 0 : primaryTerms.get(docID); + } } diff --git a/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java b/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java new file mode 100644 index 00000000000..1cbae29a3da --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java @@ -0,0 +1,180 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.lucene.uid; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReader.CoreClosedListener; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.Term; +import org.apache.lucene.util.CloseableThreadLocal; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.index.mapper.UidFieldMapper; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ConcurrentMap; + +import static org.elasticsearch.common.lucene.uid.Versions.NOT_FOUND; + +/** Utility class to resolve the Lucene doc ID, version, seqNo and primaryTerms for a given uid. */ +public final class VersionsAndSeqNoResolver { + + static final ConcurrentMap> lookupStates = + ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); + + // Evict this reader from lookupStates once it's closed: + private static final CoreClosedListener removeLookupState = key -> { + CloseableThreadLocal ctl = lookupStates.remove(key); + if (ctl != null) { + ctl.close(); + } + }; + + private static PerThreadIDVersionAndSeqNoLookup getLookupState(LeafReader reader) throws IOException { + Object key = reader.getCoreCacheKey(); + CloseableThreadLocal ctl = lookupStates.get(key); + if (ctl == null) { + // First time we are seeing this reader's core; make a new CTL: + ctl = new CloseableThreadLocal<>(); + CloseableThreadLocal other = lookupStates.putIfAbsent(key, ctl); + if (other == null) { + // Our CTL won, we must remove it when the core is closed: + reader.addCoreClosedListener(removeLookupState); + } else { + // Another thread beat us to it: just use their CTL: + ctl = other; + } + } + + PerThreadIDVersionAndSeqNoLookup lookupState = ctl.get(); + if (lookupState == null) { + lookupState = new PerThreadIDVersionAndSeqNoLookup(reader); + ctl.set(lookupState); + } + + return lookupState; + } + + private VersionsAndSeqNoResolver() { + } + + /** Wraps an {@link LeafReaderContext}, a doc ID relative to the context doc base and a version. */ + public static class DocIdAndVersion { + public final int docId; + public final long version; + public final LeafReaderContext context; + + DocIdAndVersion(int docId, long version, LeafReaderContext context) { + this.docId = docId; + this.version = version; + this.context = context; + } + } + + /** Wraps an {@link LeafReaderContext}, a doc ID relative to the context doc base and a seqNo. */ + public static class DocIdAndSeqNo { + public final int docId; + public final long seqNo; + public final LeafReaderContext context; + + DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context) { + this.docId = docId; + this.seqNo = seqNo; + this.context = context; + } + } + + + /** + * Load the internal doc ID and version for the uid from the reader, returning
    + *
  • null if the uid wasn't found, + *
  • a doc ID and a version otherwise + *
+ */ + public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException { + assert term.field().equals(UidFieldMapper.NAME) : "unexpected term field " + term.field(); + List leaves = reader.leaves(); + if (leaves.isEmpty()) { + return null; + } + // iterate backwards to optimize for the frequently updated documents + // which are likely to be in the last segments + for (int i = leaves.size() - 1; i >= 0; i--) { + LeafReaderContext context = leaves.get(i); + LeafReader leaf = context.reader(); + PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf); + DocIdAndVersion result = lookup.lookupVersion(term.bytes(), leaf.getLiveDocs(), context); + if (result != null) { + return result; + } + } + return null; + } + + /** + * Load the internal doc ID and sequence number for the uid from the reader, returning
    + *
  • null if the uid wasn't found, + *
  • a doc ID and the associated seqNo otherwise + *
+ */ + public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) throws IOException { + assert term.field().equals(UidFieldMapper.NAME) : "unexpected term field " + term.field(); + List leaves = reader.leaves(); + if (leaves.isEmpty()) { + return null; + } + // iterate backwards to optimize for the frequently updated documents + // which are likely to be in the last segments + for (int i = leaves.size() - 1; i >= 0; i--) { + LeafReaderContext context = leaves.get(i); + LeafReader leaf = context.reader(); + PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf); + DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf.getLiveDocs(), context); + if (result != null) { + return result; + } + } + return null; + } + + /** + * Load the primaryTerm associated with the given {@link DocIdAndSeqNo} + */ + public static long loadPrimaryTerm(DocIdAndSeqNo docIdAndSeqNo) throws IOException { + LeafReader leaf = docIdAndSeqNo.context.reader(); + PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf); + long result = lookup.lookUpPrimaryTerm(docIdAndSeqNo.docId); + assert result > 0 : "should always resolve a primary term for a resolved sequence number. primary_term [" + result + "]" + + " docId [" + docIdAndSeqNo.docId + "] seqNo [" + docIdAndSeqNo.seqNo + "]"; + return result; + } + + /** + * Load the version for the uid from the reader, returning
    + *
  • {@link Versions#NOT_FOUND} if no matching doc exists, + *
  • the version associated with the provided uid otherwise + *
+ */ + public static long loadVersion(IndexReader reader, Term term) throws IOException { + final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term); + return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version; + } +} diff --git a/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsResolver.java b/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsResolver.java deleted file mode 100644 index fb5875cbae5..00000000000 --- a/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsResolver.java +++ /dev/null @@ -1,263 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.lucene.uid; - -import org.apache.lucene.index.Fields; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.LeafReader; -import org.apache.lucene.index.LeafReader.CoreClosedListener; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.NumericDocValues; -import org.apache.lucene.index.PostingsEnum; -import org.apache.lucene.index.SortedNumericDocValues; -import org.apache.lucene.index.Term; -import org.apache.lucene.index.Terms; -import org.apache.lucene.index.TermsEnum; -import org.apache.lucene.search.DocIdSetIterator; -import org.apache.lucene.util.Bits; -import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.CloseableThreadLocal; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.index.mapper.SeqNoFieldMapper; -import org.elasticsearch.index.mapper.UidFieldMapper; -import org.elasticsearch.index.seqno.SequenceNumbersService; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.ConcurrentMap; - -import static org.elasticsearch.common.lucene.uid.Versions.NOT_FOUND; - -/** Utility class to resolve the Lucene doc ID and version for a given uid. */ -public class VersionsResolver { - - static final ConcurrentMap> - lookupStates = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); - - // Evict this reader from lookupStates once it's closed: - private static final CoreClosedListener removeLookupState = key -> { - CloseableThreadLocal ctl = lookupStates.remove(key); - if (ctl != null) { - ctl.close(); - } - }; - - private static PerThreadIDAndVersionLookup getLookupState(LeafReader reader) - throws IOException { - Object key = reader.getCoreCacheKey(); - CloseableThreadLocal ctl = lookupStates.get(key); - if (ctl == null) { - // First time we are seeing this reader's core; make a - // new CTL: - ctl = new CloseableThreadLocal<>(); - CloseableThreadLocal other = - lookupStates.putIfAbsent(key, ctl); - if (other == null) { - // Our CTL won, we must remove it when the - // core is closed: - reader.addCoreClosedListener(removeLookupState); - } else { - // Another thread beat us to it: just use - // their CTL: - ctl = other; - } - } - - PerThreadIDAndVersionLookup lookupState = ctl.get(); - if (lookupState == null) { - lookupState = new PerThreadIDAndVersionLookup(reader); - ctl.set(lookupState); - } - - return lookupState; - } - - private VersionsResolver() { - } - - /** - * Wraps an {@link LeafReaderContext}, a doc ID relative to the context doc base and - * a version. - **/ - public static class DocIdAndVersion { - public final int docId; - public final long version; - public final LeafReaderContext context; - - public DocIdAndVersion(int docId, long version, LeafReaderContext context) { - this.docId = docId; - this.version = version; - this.context = context; - } - } - - /** - * Load the internal doc ID and version for the uid from the reader, returning
    - *
  • null if the uid wasn't found, - *
  • a doc ID and a version otherwise - *
- */ - public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) - throws IOException { - assert term.field().equals(UidFieldMapper.NAME); - List leaves = reader.leaves(); - if (leaves.isEmpty()) { - return null; - } - // iterate backwards to optimize for the frequently updated documents - // which are likely to be in the last segments - for (int i = leaves.size() - 1; i >= 0; i--) { - LeafReaderContext context = leaves.get(i); - LeafReader leaf = context.reader(); - PerThreadIDAndVersionLookup lookup = getLookupState(leaf); - DocIdAndVersion result = - lookup.lookupVersion(term.bytes(), leaf.getLiveDocs(), context); - if (result != null) { - return result; - } - } - return null; - } - - /** - * Load the version for the uid from the reader, returning
    - *
  • {@link Versions#NOT_FOUND} if no matching doc exists, - *
  • the version associated with the provided uid otherwise - *
- */ - public static long loadVersion(IndexReader reader, Term term) throws IOException { - final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term); - return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version; - } - - /** - * Returns the sequence number for the given uid term, returning - * {@code SequenceNumbersService.UNASSIGNED_SEQ_NO} if none is found. - */ - public static long loadSeqNo(IndexReader reader, Term term) throws IOException { - assert term.field().equals(UidFieldMapper.NAME) : "can only load _seq_no by uid"; - List leaves = reader.leaves(); - if (leaves.isEmpty()) { - return SequenceNumbersService.UNASSIGNED_SEQ_NO; - } - - // iterate backwards to optimize for the frequently updated documents - // which are likely to be in the last segments - for (int i = leaves.size() - 1; i >= 0; i--) { - LeafReader leaf = leaves.get(i).reader(); - Bits liveDocs = leaf.getLiveDocs(); - - TermsEnum termsEnum = null; - SortedNumericDocValues dvField = null; - PostingsEnum docsEnum = null; - - final Fields fields = leaf.fields(); - if (fields != null) { - Terms terms = fields.terms(UidFieldMapper.NAME); - if (terms != null) { - termsEnum = terms.iterator(); - assert termsEnum != null; - dvField = leaf.getSortedNumericDocValues(SeqNoFieldMapper.NAME); - assert dvField != null; - - final BytesRef id = term.bytes(); - if (termsEnum.seekExact(id)) { - // there may be more than one matching docID, in the - // case of nested docs, so we want the last one: - docsEnum = termsEnum.postings(docsEnum, 0); - int docID = DocIdSetIterator.NO_MORE_DOCS; - for (int d = docsEnum.nextDoc(); - d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) { - if (liveDocs != null && liveDocs.get(d) == false) { - continue; - } - docID = d; - } - - if (docID != DocIdSetIterator.NO_MORE_DOCS) { - dvField.setDocument(docID); - assert dvField.count() == 1 : - "expected only a single value for _seq_no but got " + - dvField.count(); - return dvField.valueAt(0); - } - } - } - } - - } - return SequenceNumbersService.UNASSIGNED_SEQ_NO; - } - - /** - * Returns the primary term for the given uid term, returning {@code 0} if none is found. - */ - public static long loadPrimaryTerm(IndexReader reader, Term term) throws IOException { - assert term.field().equals(UidFieldMapper.NAME) : "can only load _primary_term by uid"; - List leaves = reader.leaves(); - if (leaves.isEmpty()) { - return 0; - } - - // iterate backwards to optimize for the frequently updated documents - // which are likely to be in the last segments - for (int i = leaves.size() - 1; i >= 0; i--) { - LeafReader leaf = leaves.get(i).reader(); - Bits liveDocs = leaf.getLiveDocs(); - - TermsEnum termsEnum = null; - NumericDocValues dvField = null; - PostingsEnum docsEnum = null; - - final Fields fields = leaf.fields(); - if (fields != null) { - Terms terms = fields.terms(UidFieldMapper.NAME); - if (terms != null) { - termsEnum = terms.iterator(); - assert termsEnum != null; - dvField = leaf.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); - assert dvField != null; - - final BytesRef id = term.bytes(); - if (termsEnum.seekExact(id)) { - // there may be more than one matching docID, in the - // case of nested docs, so we want the last one: - docsEnum = termsEnum.postings(docsEnum, 0); - int docID = DocIdSetIterator.NO_MORE_DOCS; - for (int d = docsEnum.nextDoc(); - d != DocIdSetIterator.NO_MORE_DOCS; - d = docsEnum.nextDoc()) { - if (liveDocs != null && liveDocs.get(d) == false) { - continue; - } - docID = d; - } - - if (docID != DocIdSetIterator.NO_MORE_DOCS) { - return dvField.get(docID); - } - } - } - } - - } - return 0; - } -} diff --git a/core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java b/core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java index 45b28d96ba2..899c06eb196 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java +++ b/core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java @@ -27,18 +27,13 @@ class DeleteVersionValue extends VersionValue { private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(DeleteVersionValue.class); - private final long time; + final long time; - DeleteVersionValue(long version, long time) { - super(version); + DeleteVersionValue(long version,long seqNo, long term, long time) { + super(version, seqNo, term); this.time = time; } - @Override - public long getTime() { - return this.time; - } - @Override public boolean isDelete() { return true; @@ -52,7 +47,9 @@ class DeleteVersionValue extends VersionValue { @Override public String toString() { return "DeleteVersionValue{" + - "version=" + getVersion() + + "version=" + version + + ", seqNo=" + seqNo + + ", term=" + term + ",time=" + time + '}'; } diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 217bc459282..59655abf289 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -55,8 +55,8 @@ import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.uid.Versions; -import org.elasticsearch.common.lucene.uid.VersionsResolver; -import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion; +import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; +import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ReleasableLock; @@ -464,7 +464,7 @@ public abstract class Engine implements Closeable { final Searcher searcher = searcherFactory.apply("get"); final DocIdAndVersion docIdAndVersion; try { - docIdAndVersion = VersionsResolver.loadDocIdAndVersion(searcher.reader(), get.uid()); + docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.reader(), get.uid()); } catch (Exception e) { Releasables.closeWhileHandlingException(searcher); //TODO: A better exception goes here diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 333dd769eaf..b3053ba3f2d 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -51,7 +51,8 @@ import org.elasticsearch.common.lucene.LoggerInfoStream; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.lucene.uid.Versions; -import org.elasticsearch.common.lucene.uid.VersionsResolver; +import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; +import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -389,10 +390,10 @@ public class InternalEngine extends Engine { if (versionValue.isDelete()) { return GetResult.NOT_EXISTS; } - if (get.versionType().isVersionConflictForReads(versionValue.getVersion(), get.version())) { + if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) { Uid uid = Uid.createUid(get.uid().text()); throw new VersionConflictEngineException(shardId, uid.type(), uid.id(), - get.versionType().explainConflictForReads(versionValue.getVersion(), get.version())); + get.versionType().explainConflictForReads(versionValue.version, get.version())); } refresh("realtime_get"); } @@ -416,6 +417,43 @@ public class InternalEngine extends Engine { LUCENE_DOC_NOT_FOUND } + private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException { + assert op.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found"; + final OpVsLuceneDocStatus status; + final VersionValue versionValue = versionMap.getUnderLock(op.uid()); + assert incrementVersionLookup(); + if (versionValue != null) { + if (op.seqNo() > versionValue.seqNo || + (op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term)) + status = OpVsLuceneDocStatus.OP_NEWER; + else { + status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + } + } else { + // load from index + assert incrementIndexVersionLookup(); + try (Searcher searcher = acquireSearcher("load_seq_no")) { + DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), op.uid()); + if (docAndSeqNo == null) { + status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND; + } else if (op.seqNo() > docAndSeqNo.seqNo) { + status = OpVsLuceneDocStatus.OP_NEWER; + } else if (op.seqNo() == docAndSeqNo.seqNo) { + // load term to tie break + final long existingTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docAndSeqNo); + if (op.primaryTerm() > existingTerm) { + status = OpVsLuceneDocStatus.OP_NEWER; + } else { + status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + } + } else { + status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + } + } + } + return status; + } + /** resolves the current version of the document, returning null if not found */ private VersionValue resolveDocVersion(final Operation op) throws IOException { assert incrementVersionLookup(); // used for asserting in tests @@ -424,11 +462,10 @@ public class InternalEngine extends Engine { assert incrementIndexVersionLookup(); // used for asserting in tests final long currentVersion = loadCurrentVersionFromIndex(op.uid()); if (currentVersion != Versions.NOT_FOUND) { - versionValue = new VersionValue(currentVersion); + versionValue = new VersionValue(currentVersion, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0L); } } else if (engineConfig.isEnableGcDeletes() && versionValue.isDelete() && - (engineConfig.getThreadPool().relativeTimeInMillis() - versionValue.getTime()) > - getGcDeletesInMillis()) { + (engineConfig.getThreadPool().relativeTimeInMillis() - ((DeleteVersionValue)versionValue).time) > getGcDeletesInMillis()) { versionValue = null; } return versionValue; @@ -436,12 +473,13 @@ public class InternalEngine extends Engine { private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnVersions(final Operation op) throws IOException { + assert op.seqNo() == SequenceNumbersService.UNASSIGNED_SEQ_NO : "op is resolved based on versions but have a seq#"; assert op.version() >= 0 : "versions should be non-negative. got " + op.version(); final VersionValue versionValue = resolveDocVersion(op); if (versionValue == null) { return OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND; } else { - return op.versionType().isVersionConflictForWrites(versionValue.getVersion(), op.version(), versionValue.isDelete()) ? + return op.versionType().isVersionConflictForWrites(versionValue.version, op.version(), versionValue.isDelete()) ? OpVsLuceneDocStatus.OP_STALE_OR_EQUAL : OpVsLuceneDocStatus.OP_NEWER; } } @@ -601,7 +639,16 @@ public class InternalEngine extends Engine { // unlike the primary, replicas don't really care to about creation status of documents // this allows to ignore the case where a document was found in the live version maps in // a delete state and return false for the created flag in favor of code simplicity - final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnVersions(index); + final OpVsLuceneDocStatus opVsLucene; + if (index.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); + } else { + // This can happen if the primary is still on an old node and send traffic without seq# or we recover from translog + // created by an old version. + assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1_UNRELEASED) : + "index is newly created but op has no sequence numbers. op: " + index; + opVsLucene = compareOpToLuceneDocBasedOnVersions(index); + } if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { plan = IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); } else { @@ -633,7 +680,7 @@ public class InternalEngine extends Engine { currentVersion = Versions.NOT_FOUND; currentNotFoundOrDeleted = true; } else { - currentVersion = versionValue.getVersion(); + currentVersion = versionValue.version; currentNotFoundOrDeleted = versionValue.isDelete(); } if (index.versionType().isVersionConflictForWrites( @@ -671,9 +718,9 @@ public class InternalEngine extends Engine { assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false); index(index.docs(), indexWriter); } - versionMap.putUnderLock(index.uid().bytes(), new VersionValue(plan.versionForIndexing)); - return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, - plan.currentNotFoundOrDeleted); + versionMap.putUnderLock(index.uid().bytes(), + new VersionValue(plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm())); + return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); } catch (Exception ex) { if (indexWriter.getTragicException() == null) { /* There is no tragic event recorded so this must be a document failure. @@ -873,7 +920,14 @@ public class InternalEngine extends Engine { // unlike the primary, replicas don't really care to about found status of documents // this allows to ignore the case where a document was found in the live version maps in // a delete state and return true for the found flag in favor of code simplicity - final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnVersions(delete); + final OpVsLuceneDocStatus opVsLucene; + if (delete.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); + } else { + assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1_UNRELEASED) : + "index is newly created but op has no sequence numbers. op: " + delete; + opVsLucene = compareOpToLuceneDocBasedOnVersions(delete); + } final DeletionStrategy plan; if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { @@ -898,7 +952,7 @@ public class InternalEngine extends Engine { currentVersion = Versions.NOT_FOUND; currentlyDeleted = true; } else { - currentVersion = versionValue.getVersion(); + currentVersion = versionValue.version; currentlyDeleted = versionValue.isDelete(); } final DeletionStrategy plan; @@ -923,7 +977,7 @@ public class InternalEngine extends Engine { indexWriter.deleteDocuments(delete.uid()); } versionMap.putUnderLock(delete.uid().bytes(), - new DeleteVersionValue(plan.versionOfDeletion, + new DeleteVersionValue(plan.versionOfDeletion, plan.seqNoOfDeletion, delete.primaryTerm(), engineConfig.getThreadPool().relativeTimeInMillis())); return new DeleteResult( plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false); @@ -1235,14 +1289,14 @@ public class InternalEngine extends Engine { // TODO: not good that we reach into LiveVersionMap here; can we move this inside VersionMap instead? problem is the dirtyLock... // we only need to prune the deletes map; the current/old version maps are cleared on refresh: - for (Map.Entry entry : versionMap.getAllTombstones()) { + for (Map.Entry entry : versionMap.getAllTombstones()) { BytesRef uid = entry.getKey(); try (Releasable ignored = acquireLock(uid)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set? // Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator: - VersionValue versionValue = versionMap.getTombstoneUnderLock(uid); + DeleteVersionValue versionValue = versionMap.getTombstoneUnderLock(uid); if (versionValue != null) { - if (timeMSec - versionValue.getTime() > getGcDeletesInMillis()) { + if (timeMSec - versionValue.time > getGcDeletesInMillis()) { versionMap.removeTombstoneUnderLock(uid); } } @@ -1490,7 +1544,7 @@ public class InternalEngine extends Engine { private long loadCurrentVersionFromIndex(Term uid) throws IOException { assert incrementIndexVersionLookup(); try (Searcher searcher = acquireSearcher("load_version")) { - return VersionsResolver.loadVersion(searcher.reader(), uid); + return VersionsAndSeqNoResolver.loadVersion(searcher.reader(), uid); } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java b/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java index 7233420309c..9ee4bd43c21 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java +++ b/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java @@ -55,7 +55,7 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable { } // All deletes also go here, and delete "tombstones" are retained after refresh: - private final Map tombstones = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); + private final Map tombstones = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); private volatile Maps maps = new Maps(); @@ -180,7 +180,7 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable { final VersionValue prevTombstone; if (version.isDelete()) { // Also enroll the delete into tombstones, and account for its RAM too: - prevTombstone = tombstones.put(uid, version); + prevTombstone = tombstones.put(uid, (DeleteVersionValue)version); // We initially account for BytesRef/VersionValue RAM for a delete against the tombstones, because this RAM will not be freed up // on refresh. Later, in removeTombstoneUnderLock, if we clear the tombstone entry but the delete remains in current, we shift @@ -225,12 +225,12 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable { } /** Caller has a lock, so that this uid will not be concurrently added/deleted by another thread. */ - VersionValue getTombstoneUnderLock(BytesRef uid) { + DeleteVersionValue getTombstoneUnderLock(BytesRef uid) { return tombstones.get(uid); } /** Iterates over all deleted versions, including new ones (not yet exposed via reader) and old ones (exposed via reader but not yet GC'd). */ - Iterable> getAllTombstones() { + Iterable> getAllTombstones() { return tombstones.entrySet(); } diff --git a/core/src/main/java/org/elasticsearch/index/engine/VersionValue.java b/core/src/main/java/org/elasticsearch/index/engine/VersionValue.java index 53550578cc3..1c2fa300520 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/VersionValue.java +++ b/core/src/main/java/org/elasticsearch/index/engine/VersionValue.java @@ -30,18 +30,17 @@ class VersionValue implements Accountable { private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(VersionValue.class); /** the version of the document. used for versioned indexed operations and as a BWC layer, where no seq# are set yet */ - private final long version; + final long version; - VersionValue(long version) { + /** the seq number of the operation that last changed the associated uuid */ + final long seqNo; + /** the the term of the operation that last changed the associated uuid */ + final long term; + + VersionValue(long version, long seqNo, long term) { this.version = version; - } - - public long getTime() { - throw new UnsupportedOperationException(); - } - - public long getVersion() { - return version; + this.seqNo = seqNo; + this.term = term; } public boolean isDelete() { @@ -61,6 +60,9 @@ class VersionValue implements Accountable { @Override public String toString() { return "VersionValue{" + - "version=" + version + "}"; + "version=" + version + + ", seqNo=" + seqNo + + ", term=" + term + + '}'; } } diff --git a/core/src/main/java/org/elasticsearch/index/get/ShardGetService.java b/core/src/main/java/org/elasticsearch/index/get/ShardGetService.java index 33f55c7a916..6d3e1e3ab6a 100644 --- a/core/src/main/java/org/elasticsearch/index/get/ShardGetService.java +++ b/core/src/main/java/org/elasticsearch/index/get/ShardGetService.java @@ -24,7 +24,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion; +import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.util.set.Sets; diff --git a/core/src/main/java/org/elasticsearch/index/mapper/ParseContext.java b/core/src/main/java/org/elasticsearch/index/mapper/ParseContext.java index f1b5760e901..64c4932e470 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/ParseContext.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/ParseContext.java @@ -254,12 +254,12 @@ public abstract class ParseContext { } @Override - public SeqNoFieldMapper.SequenceID seqID() { + public SeqNoFieldMapper.SequenceIDFields seqID() { return in.seqID(); } @Override - public void seqID(SeqNoFieldMapper.SequenceID seqID) { + public void seqID(SeqNoFieldMapper.SequenceIDFields seqID) { in.seqID(seqID); } @@ -310,7 +310,7 @@ public abstract class ParseContext { private Field version; - private SeqNoFieldMapper.SequenceID seqID; + private SeqNoFieldMapper.SequenceIDFields seqID; private final AllEntries allEntries; @@ -404,12 +404,12 @@ public abstract class ParseContext { } @Override - public SeqNoFieldMapper.SequenceID seqID() { + public SeqNoFieldMapper.SequenceIDFields seqID() { return this.seqID; } @Override - public void seqID(SeqNoFieldMapper.SequenceID seqID) { + public void seqID(SeqNoFieldMapper.SequenceIDFields seqID) { this.seqID = seqID; } @@ -539,9 +539,9 @@ public abstract class ParseContext { public abstract void version(Field version); - public abstract SeqNoFieldMapper.SequenceID seqID(); + public abstract SeqNoFieldMapper.SequenceIDFields seqID(); - public abstract void seqID(SeqNoFieldMapper.SequenceID seqID); + public abstract void seqID(SeqNoFieldMapper.SequenceIDFields seqID); public final boolean includeInAll(Boolean includeInAll, FieldMapper mapper) { return includeInAll(includeInAll, mapper.fieldType().indexOptions() != IndexOptions.NONE); diff --git a/core/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java b/core/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java index f7d5804be0d..91cf2aa4fa4 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java @@ -36,7 +36,7 @@ public class ParsedDocument { private final String id, type; private final BytesRef uid; - private final SeqNoFieldMapper.SequenceID seqID; + private final SeqNoFieldMapper.SequenceIDFields seqID; private final String routing; @@ -50,7 +50,7 @@ public class ParsedDocument { private String parent; public ParsedDocument(Field version, - SeqNoFieldMapper.SequenceID seqID, + SeqNoFieldMapper.SequenceIDFields seqID, String id, String type, String routing, diff --git a/core/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java b/core/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java index 9f844a3371e..9612d94e661 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java @@ -22,7 +22,6 @@ package org.elasticsearch.index.mapper; import org.apache.lucene.document.Field; import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; -import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.index.DocValuesType; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexableField; @@ -66,13 +65,13 @@ public class SeqNoFieldMapper extends MetadataFieldMapper { * A sequence ID, which is made up of a sequence number (both the searchable * and doc_value version of the field) and the primary term. */ - public static class SequenceID { + public static class SequenceIDFields { public final Field seqNo; public final Field seqNoDocValue; public final Field primaryTerm; - public SequenceID(Field seqNo, Field seqNoDocValue, Field primaryTerm) { + public SequenceIDFields(Field seqNo, Field seqNoDocValue, Field primaryTerm) { Objects.requireNonNull(seqNo, "sequence number field cannot be null"); Objects.requireNonNull(seqNoDocValue, "sequence number dv field cannot be null"); Objects.requireNonNull(primaryTerm, "primary term field cannot be null"); @@ -81,9 +80,9 @@ public class SeqNoFieldMapper extends MetadataFieldMapper { this.primaryTerm = primaryTerm; } - public static SequenceID emptySeqID() { - return new SequenceID(new LongPoint(NAME, SequenceNumbersService.UNASSIGNED_SEQ_NO), - new SortedNumericDocValuesField(NAME, SequenceNumbersService.UNASSIGNED_SEQ_NO), + public static SequenceIDFields emptySeqID() { + return new SequenceIDFields(new LongPoint(NAME, SequenceNumbersService.UNASSIGNED_SEQ_NO), + new NumericDocValuesField(NAME, SequenceNumbersService.UNASSIGNED_SEQ_NO), new NumericDocValuesField(PRIMARY_TERM_NAME, 0)); } } @@ -242,7 +241,7 @@ public class SeqNoFieldMapper extends MetadataFieldMapper { protected void parseCreateField(ParseContext context, List fields) throws IOException { // see InternalEngine.innerIndex to see where the real version value is set // also see ParsedDocument.updateSeqID (called by innerIndex) - SequenceID seqID = SequenceID.emptySeqID(); + SequenceIDFields seqID = SequenceIDFields.emptySeqID(); context.seqID(seqID); fields.add(seqID.seqNo); fields.add(seqID.seqNoDocValue); @@ -264,7 +263,7 @@ public class SeqNoFieldMapper extends MetadataFieldMapper { for (int i = 1; i < context.docs().size(); i++) { final Document doc = context.docs().get(i); doc.add(new LongPoint(NAME, 1)); - doc.add(new SortedNumericDocValuesField(NAME, 1L)); + doc.add(new NumericDocValuesField(NAME, 1L)); doc.add(new NumericDocValuesField(PRIMARY_TERM_NAME, 0L)); } } diff --git a/core/src/main/java/org/elasticsearch/index/termvectors/TermVectorsService.java b/core/src/main/java/org/elasticsearch/index/termvectors/TermVectorsService.java index 77d8204e45d..6351282a38a 100644 --- a/core/src/main/java/org/elasticsearch/index/termvectors/TermVectorsService.java +++ b/core/src/main/java/org/elasticsearch/index/termvectors/TermVectorsService.java @@ -34,7 +34,7 @@ import org.elasticsearch.action.termvectors.TermVectorsResponse; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion; +import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; diff --git a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java index d771ced56ff..8b68e769570 100644 --- a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java +++ b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java @@ -31,7 +31,7 @@ import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.FixedBitSet; import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion; +import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion; import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.test.ESTestCase; @@ -53,7 +53,7 @@ public class VersionLookupTests extends ESTestCase { writer.addDocument(doc); DirectoryReader reader = DirectoryReader.open(writer); LeafReaderContext segment = reader.leaves().get(0); - PerThreadIDAndVersionLookup lookup = new PerThreadIDAndVersionLookup(segment.reader()); + PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader()); // found doc DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), null, segment); assertNotNull(result); @@ -81,7 +81,7 @@ public class VersionLookupTests extends ESTestCase { writer.addDocument(doc); DirectoryReader reader = DirectoryReader.open(writer); LeafReaderContext segment = reader.leaves().get(0); - PerThreadIDAndVersionLookup lookup = new PerThreadIDAndVersionLookup(segment.reader()); + PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader()); // return the last doc when there are duplicates DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), null, segment); assertNotNull(result); diff --git a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java index 6b9960294e4..c5e66a3bf2a 100644 --- a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java +++ b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java @@ -38,8 +38,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import static org.elasticsearch.common.lucene.uid.VersionsResolver.loadDocIdAndVersion; -import static org.elasticsearch.common.lucene.uid.VersionsResolver.loadVersion; +import static org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.loadDocIdAndVersion; +import static org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.loadVersion; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; @@ -145,7 +145,7 @@ public class VersionsTests extends ESTestCase { /** Test that version map cache works, is evicted on close, etc */ public void testCache() throws Exception { - int size = VersionsResolver.lookupStates.size(); + int size = VersionsAndSeqNoResolver.lookupStates.size(); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)); @@ -156,21 +156,21 @@ public class VersionsTests extends ESTestCase { DirectoryReader reader = DirectoryReader.open(writer); // should increase cache size by 1 assertEquals(87, loadVersion(reader, new Term(UidFieldMapper.NAME, "6"))); - assertEquals(size+1, VersionsResolver.lookupStates.size()); + assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size()); // should be cache hit assertEquals(87, loadVersion(reader, new Term(UidFieldMapper.NAME, "6"))); - assertEquals(size+1, VersionsResolver.lookupStates.size()); + assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size()); reader.close(); writer.close(); // core should be evicted from the map - assertEquals(size, VersionsResolver.lookupStates.size()); + assertEquals(size, VersionsAndSeqNoResolver.lookupStates.size()); dir.close(); } /** Test that version map cache behaves properly with a filtered reader */ public void testCacheFilterReader() throws Exception { - int size = VersionsResolver.lookupStates.size(); + int size = VersionsAndSeqNoResolver.lookupStates.size(); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)); @@ -180,17 +180,17 @@ public class VersionsTests extends ESTestCase { writer.addDocument(doc); DirectoryReader reader = DirectoryReader.open(writer); assertEquals(87, loadVersion(reader, new Term(UidFieldMapper.NAME, "6"))); - assertEquals(size+1, VersionsResolver.lookupStates.size()); + assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size()); // now wrap the reader DirectoryReader wrapped = ElasticsearchDirectoryReader.wrap(reader, new ShardId("bogus", "_na_", 5)); assertEquals(87, loadVersion(wrapped, new Term(UidFieldMapper.NAME, "6"))); // same size map: core cache key is shared - assertEquals(size+1, VersionsResolver.lookupStates.size()); + assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size()); reader.close(); writer.close(); // core should be evicted from the map - assertEquals(size, VersionsResolver.lookupStates.size()); + assertEquals(size, VersionsAndSeqNoResolver.lookupStates.size()); dir.close(); } } diff --git a/core/src/test/java/org/elasticsearch/index/IndexingSlowLogTests.java b/core/src/test/java/org/elasticsearch/index/IndexingSlowLogTests.java index c815b2b55f9..daf196da7ce 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexingSlowLogTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexingSlowLogTests.java @@ -41,7 +41,7 @@ import static org.hamcrest.Matchers.startsWith; public class IndexingSlowLogTests extends ESTestCase { public void testSlowLogParsedDocumentPrinterSourceToLog() throws IOException { BytesReference source = JsonXContent.contentBuilder().startObject().field("foo", "bar").endObject().bytes(); - ParsedDocument pd = new ParsedDocument(new NumericDocValuesField("version", 1), SeqNoFieldMapper.SequenceID.emptySeqID(), "id", + ParsedDocument pd = new ParsedDocument(new NumericDocValuesField("version", 1), SeqNoFieldMapper.SequenceIDFields.emptySeqID(), "id", "test", null, null, source, XContentType.JSON, null); Index index = new Index("foo", "123"); // Turning off document logging doesn't log source[] diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index c78374a0e9e..ba84c1b70b9 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -83,7 +83,8 @@ import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.uid.Versions; -import org.elasticsearch.common.lucene.uid.VersionsResolver; +import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; +import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; @@ -292,7 +293,7 @@ public class InternalEngineTests extends ESTestCase { private static ParsedDocument testParsedDocument(String id, String type, String routing, Document document, BytesReference source, Mapping mappingUpdate) { Field uidField = new Field("_uid", Uid.createUid(type, id), UidFieldMapper.Defaults.FIELD_TYPE); Field versionField = new NumericDocValuesField("_version", 0); - SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID(); + SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); document.add(uidField); document.add(versionField); document.add(seqID.seqNo); @@ -1369,19 +1370,10 @@ public class InternalEngineTests extends ESTestCase { public void testOutOfOrderDocsOnReplica() throws IOException { final List ops = generateSingleDocHistory(true, - randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 2, 20); + randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE, VersionType.FORCE), false, 2, 2, 20); assertOpsOnReplica(ops, replicaEngine, true); } - public void testNonStandardVersioningOnReplica() throws IOException { - // TODO: this can be folded into testOutOfOrderDocsOnReplica once out of order - // is detected using seq# - final List ops = generateSingleDocHistory(true, - randomFrom(VersionType.EXTERNAL_GTE, VersionType.FORCE), false, 2, 2, 20); - assertOpsOnReplica(ops, replicaEngine, false); - } - - public void testOutOfOrderDocsOnReplicaOldPrimary() throws IOException { IndexSettings oldSettings = IndexSettingsModule.newIndexSettings("testOld", Settings.builder() .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), "1h") // make sure this doesn't kick in on us @@ -3601,9 +3593,17 @@ public class InternalEngineTests extends ESTestCase { */ private Tuple getSequenceID(Engine engine, Engine.Get get) throws EngineException { try (Searcher searcher = engine.acquireSearcher("get")) { - long seqNum = VersionsResolver.loadSeqNo(searcher.reader(), get.uid()); - long primaryTerm = VersionsResolver.loadPrimaryTerm(searcher.reader(), get.uid()); - return new Tuple<>(seqNum, primaryTerm); + final long primaryTerm; + final long seqNo; + DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), get.uid()); + if (docIdAndSeqNo == null) { + primaryTerm = 0; + seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; + } else { + seqNo = docIdAndSeqNo.seqNo; + primaryTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docIdAndSeqNo); + } + return new Tuple<>(seqNo, primaryTerm); } catch (Exception e) { throw new EngineException(shardId, "unable to retrieve sequence id", e); } diff --git a/core/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java b/core/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java index 9161bc413c8..97799f8c46a 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java @@ -33,7 +33,7 @@ public class LiveVersionMapTests extends ESTestCase { for (int i = 0; i < 100000; ++i) { BytesRefBuilder uid = new BytesRefBuilder(); uid.copyChars(TestUtil.randomSimpleString(random(), 10, 20)); - VersionValue version = new VersionValue(randomLong()); + VersionValue version = new VersionValue(randomLong(), randomLong(), randomLong()); map.putUnderLock(uid.toBytesRef(), version); } long actualRamBytesUsed = RamUsageTester.sizeOf(map); @@ -48,7 +48,7 @@ public class LiveVersionMapTests extends ESTestCase { for (int i = 0; i < 100000; ++i) { BytesRefBuilder uid = new BytesRefBuilder(); uid.copyChars(TestUtil.randomSimpleString(random(), 10, 20)); - VersionValue version = new VersionValue(randomLong()); + VersionValue version = new VersionValue(randomLong(), randomLong(), randomLong()); map.putUnderLock(uid.toBytesRef(), version); } actualRamBytesUsed = RamUsageTester.sizeOf(map); diff --git a/core/src/test/java/org/elasticsearch/index/engine/VersionValueTests.java b/core/src/test/java/org/elasticsearch/index/engine/VersionValueTests.java index 7af8ebc7580..3b953edece1 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/VersionValueTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/VersionValueTests.java @@ -25,12 +25,12 @@ import org.elasticsearch.test.ESTestCase; public class VersionValueTests extends ESTestCase { public void testRamBytesUsed() { - VersionValue versionValue = new VersionValue(randomLong()); + VersionValue versionValue = new VersionValue(randomLong(), randomLong(), randomLong()); assertEquals(RamUsageTester.sizeOf(versionValue), versionValue.ramBytesUsed()); } public void testDeleteRamBytesUsed() { - DeleteVersionValue versionValue = new DeleteVersionValue(randomLong(), randomLong()); + DeleteVersionValue versionValue = new DeleteVersionValue(randomLong(), randomLong(), randomLong(), randomLong()); assertEquals(RamUsageTester.sizeOf(versionValue), versionValue.ramBytesUsed()); } diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 1d1af2b2fc5..c35f72d2085 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -27,13 +27,9 @@ import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.bulk.BulkItemRequest; import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.bulk.BulkShardResponse; import org.elasticsearch.action.bulk.TransportShardBulkActionTests; -import org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction; -import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.PlainActionFuture; @@ -98,6 +94,10 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase } protected IndexMetaData buildIndexMetaData(int replicas) throws IOException { + return buildIndexMetaData(replicas, indexMapping); + } + + protected IndexMetaData buildIndexMetaData(int replicas, Map mappings) throws IOException { Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, replicas) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) @@ -105,7 +105,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase IndexMetaData.Builder metaData = IndexMetaData.builder(index.getName()) .settings(settings) .primaryTerm(0, 1); - for (Map.Entry typeMapping : indexMapping.entrySet()) { + for (Map.Entry typeMapping : mappings.entrySet()) { metaData.putMapping(typeMapping.getKey(), typeMapping.getValue()); } return metaData.build(); @@ -224,15 +224,24 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase updateAllocationIDsOnPrimary(); } - public synchronized IndexShard addReplica() throws IOException { + public IndexShard addReplica() throws IOException { final ShardRouting replicaRouting = createShardRouting("s" + replicaId.incrementAndGet(), false); final IndexShard replica = newShard(replicaRouting, indexMetaData, null, this::syncGlobalCheckpoint, getEngineFactory(replicaRouting)); - replicas.add(replica); - updateAllocationIDsOnPrimary(); + addReplica(replica); return replica; } + public synchronized void addReplica(IndexShard replica) { + assert shardRoutings().stream() + .filter(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())).findFirst().isPresent() == false : + "replica with aId [" + replica.routingEntry().allocationId() + "] already exists"; + replica.updatePrimaryTerm(primary.getPrimaryTerm()); + replicas.add(replica); + updateAllocationIDsOnPrimary(); + } + + public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardPath, final String nodeId) throws IOException { final ShardRouting shardRouting = TestShardRouting.newShardRouting( shardId, @@ -264,6 +273,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase } boolean found = replicas.remove(replica); assert found; + closeShards(primary); primary = replica; replica.updateRoutingEntry(replica.routingEntry().moveActiveReplicaToPrimary()); updateAllocationIDsOnPrimary(); diff --git a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 7a11f89b73b..5f69370a5ca 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -18,6 +18,9 @@ */ package org.elasticsearch.index.replication; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; @@ -37,6 +40,7 @@ import org.elasticsearch.indices.recovery.RecoveryTarget; import java.io.IOException; import java.util.Collections; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; @@ -152,4 +156,28 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase } } + public void testConflictingOpsOnReplica() throws Exception { + Map mappings = + Collections.singletonMap("type", "{ \"type\": { \"properties\": { \"f\": { \"type\": \"keyword\"} }}}"); + try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(2, mappings))) { + shards.startAll(); + IndexShard replica1 = shards.getReplicas().get(0); + logger.info("--> isolated replica " + replica1.routingEntry()); + shards.removeReplica(replica1); + IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "1").source("{ \"f\": \"1\"}", XContentType.JSON); + shards.index(indexRequest); + shards.addReplica(replica1); + logger.info("--> promoting replica to primary " + replica1.routingEntry()); + shards.promoteReplicaToPrimary(replica1); + indexRequest = new IndexRequest(index.getName(), "type", "1").source("{ \"f\": \"2\"}", XContentType.JSON); + shards.index(indexRequest); + shards.refresh("test"); + for (IndexShard shard : shards) { + try (Engine.Searcher searcher = shard.acquireSearcher("test")) { + TopDocs search = searcher.searcher().search(new TermQuery(new Term("f", "2")), 10); + assertEquals("shard " + shard.routingEntry() + " misses new version", 1, search.totalHits); + } + } + } + } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index d203832fb15..fec0b766d34 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -107,7 +107,7 @@ public class IndexShardIT extends ESSingleNodeTestCase { Mapping mappingUpdate) { Field uidField = new Field("_uid", Uid.createUid(type, id), UidFieldMapper.Defaults.FIELD_TYPE); Field versionField = new NumericDocValuesField("_version", 0); - SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID(); + SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); document.add(uidField); document.add(versionField); document.add(seqID.seqNo); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index b328e86e58d..629a8af3e0d 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -551,7 +551,7 @@ public class IndexShardTests extends IndexShardTestCase { ParseContext.Document document, BytesReference source, Mapping mappingUpdate) { Field uidField = new Field("_uid", Uid.createUid(type, id), UidFieldMapper.Defaults.FIELD_TYPE); Field versionField = new NumericDocValuesField("_version", 0); - SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID(); + SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); document.add(uidField); document.add(versionField); document.add(seqID.seqNo); diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 846ca6be201..b7e20cf75c8 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -332,7 +332,7 @@ public class RefreshListenersTests extends ESTestCase { document.add(new TextField("test", testFieldValue, Field.Store.YES)); Field uidField = new Field("_uid", Uid.createUid(type, id), UidFieldMapper.Defaults.FIELD_TYPE); Field versionField = new NumericDocValuesField("_version", Versions.MATCH_ANY); - SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID(); + SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); document.add(uidField); document.add(versionField); document.add(seqID.seqNo); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 6b2aa5e5921..ae430b72c4f 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -2048,7 +2048,7 @@ public class TranslogTests extends ESTestCase { public void testTranslogOpSerialization() throws Exception { BytesReference B_1 = new BytesArray(new byte[]{1}); - SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID(); + SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); assert Version.CURRENT.major <= 6 : "Using UNASSIGNED_SEQ_NO can be removed in 7.0, because 6.0+ nodes have actual sequence numbers"; long randomSeqNum = randomBoolean() ? SequenceNumbersService.UNASSIGNED_SEQ_NO : randomNonNegativeLong(); long randomPrimaryTerm = randomBoolean() ? 0 : randomNonNegativeLong(); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 743510e373c..40a92b11e73 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -204,7 +204,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { document.add(new TextField("test", "test", Field.Store.YES)); final Field uidField = new Field("_uid", Uid.createUid(type, id), UidFieldMapper.Defaults.FIELD_TYPE); final Field versionField = new NumericDocValuesField("_version", Versions.MATCH_ANY); - final SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID(); + final SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); document.add(uidField); document.add(versionField); document.add(seqID.seqNo);