Refactor InternalEngine's index/delete flow for better clarity (#23711)
The InternalEngine Index/Delete methods (plus satellites like version loading from Lucene) have accumulated some cruft over the years making it hard to clearly the code flows for various use cases (primary indexing/recovery/replicas etc). This PR refactors those methods for better readability. The methods are broken up into smaller sub methods, albeit at the price of less code I reused. To support the refactoring I have considerably beefed up the versioning tests. This PR is a spin-off from #23543 , which made it clear this is needed.
This commit is contained in:
parent
c89fdd938e
commit
75b4f408e0
|
@ -29,7 +29,7 @@ import org.apache.lucene.index.TermsEnum;
|
|||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.lucene.uid.Versions.DocIdAndVersion;
|
||||
import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion;
|
||||
import org.elasticsearch.index.mapper.UidFieldMapper;
|
||||
import org.elasticsearch.index.mapper.VersionFieldMapper;
|
||||
|
||||
|
@ -51,49 +51,66 @@ final class PerThreadIDAndVersionLookup {
|
|||
private final TermsEnum termsEnum;
|
||||
/** _version data */
|
||||
private final NumericDocValues versions;
|
||||
|
||||
/** Reused for iteration (when the term exists) */
|
||||
private PostingsEnum docsEnum;
|
||||
|
||||
/** used for assertions to make sure class usage meets assumptions */
|
||||
private final Object readerKey;
|
||||
|
||||
/**
|
||||
* Initialize lookup for the provided segment
|
||||
*/
|
||||
PerThreadIDAndVersionLookup(LeafReader reader) throws IOException {
|
||||
TermsEnum termsEnum = null;
|
||||
NumericDocValues versions = null;
|
||||
|
||||
Fields fields = reader.fields();
|
||||
if (fields != null) {
|
||||
Terms terms = fields.terms(UidFieldMapper.NAME);
|
||||
if (terms != null) {
|
||||
termsEnum = terms.iterator();
|
||||
assert termsEnum != null;
|
||||
versions = reader.getNumericDocValues(VersionFieldMapper.NAME);
|
||||
assert versions != null;
|
||||
}
|
||||
Terms terms = fields.terms(UidFieldMapper.NAME);
|
||||
termsEnum = terms.iterator();
|
||||
if (termsEnum == null) {
|
||||
throw new IllegalArgumentException("reader misses the [" + UidFieldMapper.NAME +
|
||||
"] field");
|
||||
}
|
||||
|
||||
this.versions = versions;
|
||||
this.termsEnum = termsEnum;
|
||||
versions = reader.getNumericDocValues(VersionFieldMapper.NAME);
|
||||
if (versions == null) {
|
||||
throw new IllegalArgumentException("reader misses the [" + VersionFieldMapper.NAME +
|
||||
"] field");
|
||||
}
|
||||
Object readerKey = null;
|
||||
assert (readerKey = reader.getCoreCacheKey()) != null;
|
||||
this.readerKey = readerKey;
|
||||
}
|
||||
|
||||
/** Return null if id is not found. */
|
||||
public DocIdAndVersion lookup(BytesRef id, Bits liveDocs, LeafReaderContext context) throws IOException {
|
||||
public DocIdAndVersion lookupVersion(BytesRef id, Bits liveDocs, LeafReaderContext context)
|
||||
throws IOException {
|
||||
assert context.reader().getCoreCacheKey().equals(readerKey) :
|
||||
"context's reader is not the same as the reader class was initialized on.";
|
||||
int docID = getDocID(id, liveDocs);
|
||||
|
||||
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
|
||||
return new DocIdAndVersion(docID, versions.get(docID), context);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* returns the internal lucene doc id for the given id bytes.
|
||||
* {@link DocIdSetIterator#NO_MORE_DOCS} is returned if not found
|
||||
* */
|
||||
private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
|
||||
if (termsEnum.seekExact(id)) {
|
||||
int docID = DocIdSetIterator.NO_MORE_DOCS;
|
||||
// there may be more than one matching docID, in the case of nested docs, so we want the last one:
|
||||
docsEnum = termsEnum.postings(docsEnum, 0);
|
||||
int docID = DocIdSetIterator.NO_MORE_DOCS;
|
||||
for (int d = docsEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) {
|
||||
if (liveDocs != null && liveDocs.get(d) == false) {
|
||||
continue;
|
||||
}
|
||||
docID = d;
|
||||
}
|
||||
|
||||
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
|
||||
return new DocIdAndVersion(docID, versions.get(docID), context);
|
||||
}
|
||||
return docID;
|
||||
} else {
|
||||
return DocIdSetIterator.NO_MORE_DOCS;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,32 +19,7 @@
|
|||
|
||||
package org.elasticsearch.common.lucene.uid;
|
||||
|
||||
import org.apache.lucene.index.Fields;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.LeafReader;
|
||||
import org.apache.lucene.index.LeafReader.CoreClosedListener;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.index.NumericDocValues;
|
||||
import org.apache.lucene.index.PostingsEnum;
|
||||
import org.apache.lucene.index.SortedNumericDocValues;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.index.Terms;
|
||||
import org.apache.lucene.index.TermsEnum;
|
||||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.CloseableThreadLocal;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||
import org.elasticsearch.index.mapper.UidFieldMapper;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
/** Utility class to resolve the Lucene doc ID and version for a given uid. */
|
||||
public class Versions {
|
||||
public final class Versions {
|
||||
|
||||
/** used to indicate the write operation should succeed regardless of current version **/
|
||||
public static final long MATCH_ANY = -3L;
|
||||
|
@ -59,210 +34,4 @@ public class Versions {
|
|||
* i.e., not found in the index and/or found as deleted (with version) in the version map
|
||||
*/
|
||||
public static final long MATCH_DELETED = -4L;
|
||||
|
||||
// TODO: is there somewhere else we can store these?
|
||||
static final ConcurrentMap<Object, CloseableThreadLocal<PerThreadIDAndVersionLookup>> lookupStates = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
|
||||
|
||||
// Evict this reader from lookupStates once it's closed:
|
||||
private static final CoreClosedListener removeLookupState = new CoreClosedListener() {
|
||||
@Override
|
||||
public void onClose(Object key) {
|
||||
CloseableThreadLocal<PerThreadIDAndVersionLookup> ctl = lookupStates.remove(key);
|
||||
if (ctl != null) {
|
||||
ctl.close();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
private static PerThreadIDAndVersionLookup getLookupState(LeafReader reader) throws IOException {
|
||||
Object key = reader.getCoreCacheKey();
|
||||
CloseableThreadLocal<PerThreadIDAndVersionLookup> ctl = lookupStates.get(key);
|
||||
if (ctl == null) {
|
||||
// First time we are seeing this reader's core; make a
|
||||
// new CTL:
|
||||
ctl = new CloseableThreadLocal<>();
|
||||
CloseableThreadLocal<PerThreadIDAndVersionLookup> other = lookupStates.putIfAbsent(key, ctl);
|
||||
if (other == null) {
|
||||
// Our CTL won, we must remove it when the
|
||||
// core is closed:
|
||||
reader.addCoreClosedListener(removeLookupState);
|
||||
} else {
|
||||
// Another thread beat us to it: just use
|
||||
// their CTL:
|
||||
ctl = other;
|
||||
}
|
||||
}
|
||||
|
||||
PerThreadIDAndVersionLookup lookupState = ctl.get();
|
||||
if (lookupState == null) {
|
||||
lookupState = new PerThreadIDAndVersionLookup(reader);
|
||||
ctl.set(lookupState);
|
||||
}
|
||||
|
||||
return lookupState;
|
||||
}
|
||||
|
||||
private Versions() {
|
||||
}
|
||||
|
||||
/** Wraps an {@link LeafReaderContext}, a doc ID <b>relative to the context doc base</b> and a version. */
|
||||
public static class DocIdAndVersion {
|
||||
public final int docId;
|
||||
public final long version;
|
||||
public final LeafReaderContext context;
|
||||
|
||||
public DocIdAndVersion(int docId, long version, LeafReaderContext context) {
|
||||
this.docId = docId;
|
||||
this.version = version;
|
||||
this.context = context;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Load the internal doc ID and version for the uid from the reader, returning<ul>
|
||||
* <li>null if the uid wasn't found,
|
||||
* <li>a doc ID and a version otherwise
|
||||
* </ul>
|
||||
*/
|
||||
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException {
|
||||
assert term.field().equals(UidFieldMapper.NAME);
|
||||
List<LeafReaderContext> leaves = reader.leaves();
|
||||
if (leaves.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
// iterate backwards to optimize for the frequently updated documents
|
||||
// which are likely to be in the last segments
|
||||
for (int i = leaves.size() - 1; i >= 0; i--) {
|
||||
LeafReaderContext context = leaves.get(i);
|
||||
LeafReader leaf = context.reader();
|
||||
PerThreadIDAndVersionLookup lookup = getLookupState(leaf);
|
||||
DocIdAndVersion result = lookup.lookup(term.bytes(), leaf.getLiveDocs(), context);
|
||||
if (result != null) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Load the version for the uid from the reader, returning<ul>
|
||||
* <li>{@link #NOT_FOUND} if no matching doc exists,
|
||||
* <li>the version associated with the provided uid otherwise
|
||||
* </ul>
|
||||
*/
|
||||
public static long loadVersion(IndexReader reader, Term term) throws IOException {
|
||||
final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term);
|
||||
return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns the sequence number for the given uid term, returning
|
||||
* {@code SequenceNumbersService.UNASSIGNED_SEQ_NO} if none is found.
|
||||
*/
|
||||
public static long loadSeqNo(IndexReader reader, Term term) throws IOException {
|
||||
assert term.field().equals(UidFieldMapper.NAME) : "can only load _seq_no by uid";
|
||||
List<LeafReaderContext> leaves = reader.leaves();
|
||||
if (leaves.isEmpty()) {
|
||||
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
|
||||
}
|
||||
|
||||
// iterate backwards to optimize for the frequently updated documents
|
||||
// which are likely to be in the last segments
|
||||
for (int i = leaves.size() - 1; i >= 0; i--) {
|
||||
LeafReader leaf = leaves.get(i).reader();
|
||||
Bits liveDocs = leaf.getLiveDocs();
|
||||
|
||||
TermsEnum termsEnum = null;
|
||||
SortedNumericDocValues dvField = null;
|
||||
PostingsEnum docsEnum = null;
|
||||
|
||||
final Fields fields = leaf.fields();
|
||||
if (fields != null) {
|
||||
Terms terms = fields.terms(UidFieldMapper.NAME);
|
||||
if (terms != null) {
|
||||
termsEnum = terms.iterator();
|
||||
assert termsEnum != null;
|
||||
dvField = leaf.getSortedNumericDocValues(SeqNoFieldMapper.NAME);
|
||||
assert dvField != null;
|
||||
|
||||
final BytesRef id = term.bytes();
|
||||
if (termsEnum.seekExact(id)) {
|
||||
// there may be more than one matching docID, in the
|
||||
// case of nested docs, so we want the last one:
|
||||
docsEnum = termsEnum.postings(docsEnum, 0);
|
||||
int docID = DocIdSetIterator.NO_MORE_DOCS;
|
||||
for (int d = docsEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) {
|
||||
if (liveDocs != null && liveDocs.get(d) == false) {
|
||||
continue;
|
||||
}
|
||||
docID = d;
|
||||
}
|
||||
|
||||
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
|
||||
dvField.setDocument(docID);
|
||||
assert dvField.count() == 1 : "expected only a single value for _seq_no but got " +
|
||||
dvField.count();
|
||||
return dvField.valueAt(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the primary term for the given uid term, returning {@code 0} if none is found.
|
||||
*/
|
||||
public static long loadPrimaryTerm(IndexReader reader, Term term) throws IOException {
|
||||
assert term.field().equals(UidFieldMapper.NAME) : "can only load _primary_term by uid";
|
||||
List<LeafReaderContext> leaves = reader.leaves();
|
||||
if (leaves.isEmpty()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// iterate backwards to optimize for the frequently updated documents
|
||||
// which are likely to be in the last segments
|
||||
for (int i = leaves.size() - 1; i >= 0; i--) {
|
||||
LeafReader leaf = leaves.get(i).reader();
|
||||
Bits liveDocs = leaf.getLiveDocs();
|
||||
|
||||
TermsEnum termsEnum = null;
|
||||
NumericDocValues dvField = null;
|
||||
PostingsEnum docsEnum = null;
|
||||
|
||||
final Fields fields = leaf.fields();
|
||||
if (fields != null) {
|
||||
Terms terms = fields.terms(UidFieldMapper.NAME);
|
||||
if (terms != null) {
|
||||
termsEnum = terms.iterator();
|
||||
assert termsEnum != null;
|
||||
dvField = leaf.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
|
||||
assert dvField != null;
|
||||
|
||||
final BytesRef id = term.bytes();
|
||||
if (termsEnum.seekExact(id)) {
|
||||
// there may be more than one matching docID, in the
|
||||
// case of nested docs, so we want the last one:
|
||||
docsEnum = termsEnum.postings(docsEnum, 0);
|
||||
int docID = DocIdSetIterator.NO_MORE_DOCS;
|
||||
for (int d = docsEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) {
|
||||
if (liveDocs != null && liveDocs.get(d) == false) {
|
||||
continue;
|
||||
}
|
||||
docID = d;
|
||||
}
|
||||
|
||||
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
|
||||
return dvField.get(docID);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,263 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.lucene.uid;
|
||||
|
||||
import org.apache.lucene.index.Fields;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.LeafReader;
|
||||
import org.apache.lucene.index.LeafReader.CoreClosedListener;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.index.NumericDocValues;
|
||||
import org.apache.lucene.index.PostingsEnum;
|
||||
import org.apache.lucene.index.SortedNumericDocValues;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.index.Terms;
|
||||
import org.apache.lucene.index.TermsEnum;
|
||||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.CloseableThreadLocal;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||
import org.elasticsearch.index.mapper.UidFieldMapper;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import static org.elasticsearch.common.lucene.uid.Versions.NOT_FOUND;
|
||||
|
||||
/** Utility class to resolve the Lucene doc ID and version for a given uid. */
|
||||
public class VersionsResolver {
|
||||
|
||||
static final ConcurrentMap<Object, CloseableThreadLocal<PerThreadIDAndVersionLookup>>
|
||||
lookupStates = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
|
||||
|
||||
// Evict this reader from lookupStates once it's closed:
|
||||
private static final CoreClosedListener removeLookupState = key -> {
|
||||
CloseableThreadLocal<PerThreadIDAndVersionLookup> ctl = lookupStates.remove(key);
|
||||
if (ctl != null) {
|
||||
ctl.close();
|
||||
}
|
||||
};
|
||||
|
||||
private static PerThreadIDAndVersionLookup getLookupState(LeafReader reader)
|
||||
throws IOException {
|
||||
Object key = reader.getCoreCacheKey();
|
||||
CloseableThreadLocal<PerThreadIDAndVersionLookup> ctl = lookupStates.get(key);
|
||||
if (ctl == null) {
|
||||
// First time we are seeing this reader's core; make a
|
||||
// new CTL:
|
||||
ctl = new CloseableThreadLocal<>();
|
||||
CloseableThreadLocal<PerThreadIDAndVersionLookup> other =
|
||||
lookupStates.putIfAbsent(key, ctl);
|
||||
if (other == null) {
|
||||
// Our CTL won, we must remove it when the
|
||||
// core is closed:
|
||||
reader.addCoreClosedListener(removeLookupState);
|
||||
} else {
|
||||
// Another thread beat us to it: just use
|
||||
// their CTL:
|
||||
ctl = other;
|
||||
}
|
||||
}
|
||||
|
||||
PerThreadIDAndVersionLookup lookupState = ctl.get();
|
||||
if (lookupState == null) {
|
||||
lookupState = new PerThreadIDAndVersionLookup(reader);
|
||||
ctl.set(lookupState);
|
||||
}
|
||||
|
||||
return lookupState;
|
||||
}
|
||||
|
||||
private VersionsResolver() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps an {@link LeafReaderContext}, a doc ID <b>relative to the context doc base</b> and
|
||||
* a version.
|
||||
**/
|
||||
public static class DocIdAndVersion {
|
||||
public final int docId;
|
||||
public final long version;
|
||||
public final LeafReaderContext context;
|
||||
|
||||
public DocIdAndVersion(int docId, long version, LeafReaderContext context) {
|
||||
this.docId = docId;
|
||||
this.version = version;
|
||||
this.context = context;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Load the internal doc ID and version for the uid from the reader, returning<ul>
|
||||
* <li>null if the uid wasn't found,
|
||||
* <li>a doc ID and a version otherwise
|
||||
* </ul>
|
||||
*/
|
||||
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term)
|
||||
throws IOException {
|
||||
assert term.field().equals(UidFieldMapper.NAME);
|
||||
List<LeafReaderContext> leaves = reader.leaves();
|
||||
if (leaves.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
// iterate backwards to optimize for the frequently updated documents
|
||||
// which are likely to be in the last segments
|
||||
for (int i = leaves.size() - 1; i >= 0; i--) {
|
||||
LeafReaderContext context = leaves.get(i);
|
||||
LeafReader leaf = context.reader();
|
||||
PerThreadIDAndVersionLookup lookup = getLookupState(leaf);
|
||||
DocIdAndVersion result =
|
||||
lookup.lookupVersion(term.bytes(), leaf.getLiveDocs(), context);
|
||||
if (result != null) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Load the version for the uid from the reader, returning<ul>
|
||||
* <li>{@link Versions#NOT_FOUND} if no matching doc exists,
|
||||
* <li>the version associated with the provided uid otherwise
|
||||
* </ul>
|
||||
*/
|
||||
public static long loadVersion(IndexReader reader, Term term) throws IOException {
|
||||
final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term);
|
||||
return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the sequence number for the given uid term, returning
|
||||
* {@code SequenceNumbersService.UNASSIGNED_SEQ_NO} if none is found.
|
||||
*/
|
||||
public static long loadSeqNo(IndexReader reader, Term term) throws IOException {
|
||||
assert term.field().equals(UidFieldMapper.NAME) : "can only load _seq_no by uid";
|
||||
List<LeafReaderContext> leaves = reader.leaves();
|
||||
if (leaves.isEmpty()) {
|
||||
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
|
||||
}
|
||||
|
||||
// iterate backwards to optimize for the frequently updated documents
|
||||
// which are likely to be in the last segments
|
||||
for (int i = leaves.size() - 1; i >= 0; i--) {
|
||||
LeafReader leaf = leaves.get(i).reader();
|
||||
Bits liveDocs = leaf.getLiveDocs();
|
||||
|
||||
TermsEnum termsEnum = null;
|
||||
SortedNumericDocValues dvField = null;
|
||||
PostingsEnum docsEnum = null;
|
||||
|
||||
final Fields fields = leaf.fields();
|
||||
if (fields != null) {
|
||||
Terms terms = fields.terms(UidFieldMapper.NAME);
|
||||
if (terms != null) {
|
||||
termsEnum = terms.iterator();
|
||||
assert termsEnum != null;
|
||||
dvField = leaf.getSortedNumericDocValues(SeqNoFieldMapper.NAME);
|
||||
assert dvField != null;
|
||||
|
||||
final BytesRef id = term.bytes();
|
||||
if (termsEnum.seekExact(id)) {
|
||||
// there may be more than one matching docID, in the
|
||||
// case of nested docs, so we want the last one:
|
||||
docsEnum = termsEnum.postings(docsEnum, 0);
|
||||
int docID = DocIdSetIterator.NO_MORE_DOCS;
|
||||
for (int d = docsEnum.nextDoc();
|
||||
d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) {
|
||||
if (liveDocs != null && liveDocs.get(d) == false) {
|
||||
continue;
|
||||
}
|
||||
docID = d;
|
||||
}
|
||||
|
||||
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
|
||||
dvField.setDocument(docID);
|
||||
assert dvField.count() == 1 :
|
||||
"expected only a single value for _seq_no but got " +
|
||||
dvField.count();
|
||||
return dvField.valueAt(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the primary term for the given uid term, returning {@code 0} if none is found.
|
||||
*/
|
||||
public static long loadPrimaryTerm(IndexReader reader, Term term) throws IOException {
|
||||
assert term.field().equals(UidFieldMapper.NAME) : "can only load _primary_term by uid";
|
||||
List<LeafReaderContext> leaves = reader.leaves();
|
||||
if (leaves.isEmpty()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// iterate backwards to optimize for the frequently updated documents
|
||||
// which are likely to be in the last segments
|
||||
for (int i = leaves.size() - 1; i >= 0; i--) {
|
||||
LeafReader leaf = leaves.get(i).reader();
|
||||
Bits liveDocs = leaf.getLiveDocs();
|
||||
|
||||
TermsEnum termsEnum = null;
|
||||
NumericDocValues dvField = null;
|
||||
PostingsEnum docsEnum = null;
|
||||
|
||||
final Fields fields = leaf.fields();
|
||||
if (fields != null) {
|
||||
Terms terms = fields.terms(UidFieldMapper.NAME);
|
||||
if (terms != null) {
|
||||
termsEnum = terms.iterator();
|
||||
assert termsEnum != null;
|
||||
dvField = leaf.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
|
||||
assert dvField != null;
|
||||
|
||||
final BytesRef id = term.bytes();
|
||||
if (termsEnum.seekExact(id)) {
|
||||
// there may be more than one matching docID, in the
|
||||
// case of nested docs, so we want the last one:
|
||||
docsEnum = termsEnum.postings(docsEnum, 0);
|
||||
int docID = DocIdSetIterator.NO_MORE_DOCS;
|
||||
for (int d = docsEnum.nextDoc();
|
||||
d != DocIdSetIterator.NO_MORE_DOCS;
|
||||
d = docsEnum.nextDoc()) {
|
||||
if (liveDocs != null && liveDocs.get(d) == false) {
|
||||
continue;
|
||||
}
|
||||
docID = d;
|
||||
}
|
||||
|
||||
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
|
||||
return dvField.get(docID);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
|
@ -35,12 +35,12 @@ class DeleteVersionValue extends VersionValue {
|
|||
}
|
||||
|
||||
@Override
|
||||
public long time() {
|
||||
public long getTime() {
|
||||
return this.time;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean delete() {
|
||||
public boolean isDelete() {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -52,8 +52,8 @@ class DeleteVersionValue extends VersionValue {
|
|||
@Override
|
||||
public String toString() {
|
||||
return "DeleteVersionValue{" +
|
||||
"version=" + version() + ", " +
|
||||
"time=" + time +
|
||||
"version=" + getVersion() +
|
||||
",time=" + time +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,6 +55,8 @@ import org.elasticsearch.common.lease.Releasables;
|
|||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.lucene.uid.VersionsResolver;
|
||||
import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion;
|
||||
import org.elasticsearch.common.metrics.CounterMetric;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
||||
|
@ -419,9 +421,9 @@ public abstract class Engine implements Closeable {
|
|||
this.found = found;
|
||||
}
|
||||
|
||||
public DeleteResult(Exception failure, long version, long seqNo) {
|
||||
public DeleteResult(Exception failure, long version, long seqNo, boolean found) {
|
||||
super(Operation.TYPE.DELETE, failure, version, seqNo);
|
||||
this.found = false;
|
||||
this.found = found;
|
||||
}
|
||||
|
||||
public boolean isFound() {
|
||||
|
@ -460,9 +462,9 @@ public abstract class Engine implements Closeable {
|
|||
|
||||
protected final GetResult getFromSearcher(Get get, Function<String, Searcher> searcherFactory) throws EngineException {
|
||||
final Searcher searcher = searcherFactory.apply("get");
|
||||
final Versions.DocIdAndVersion docIdAndVersion;
|
||||
final DocIdAndVersion docIdAndVersion;
|
||||
try {
|
||||
docIdAndVersion = Versions.loadDocIdAndVersion(searcher.reader(), get.uid());
|
||||
docIdAndVersion = VersionsResolver.loadDocIdAndVersion(searcher.reader(), get.uid());
|
||||
} catch (Exception e) {
|
||||
Releasables.closeWhileHandlingException(searcher);
|
||||
//TODO: A better exception goes here
|
||||
|
@ -1037,9 +1039,10 @@ public abstract class Engine implements Closeable {
|
|||
} // TEST ONLY
|
||||
|
||||
Index(Term uid, ParsedDocument doc, long version) {
|
||||
this(uid, doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, version, VersionType.INTERNAL,
|
||||
// use a primary term of 2 to allow tests to reduce it to a valid >0 term
|
||||
this(uid, doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 2, version, VersionType.INTERNAL,
|
||||
Origin.PRIMARY, System.nanoTime(), -1, false);
|
||||
}
|
||||
} // TEST ONLY
|
||||
|
||||
public ParsedDocument parsedDoc() {
|
||||
return this.doc;
|
||||
|
@ -1227,12 +1230,12 @@ public abstract class Engine implements Closeable {
|
|||
public static class GetResult implements Releasable {
|
||||
private final boolean exists;
|
||||
private final long version;
|
||||
private final Versions.DocIdAndVersion docIdAndVersion;
|
||||
private final DocIdAndVersion docIdAndVersion;
|
||||
private final Searcher searcher;
|
||||
|
||||
public static final GetResult NOT_EXISTS = new GetResult(false, Versions.NOT_FOUND, null, null);
|
||||
|
||||
private GetResult(boolean exists, long version, Versions.DocIdAndVersion docIdAndVersion, Searcher searcher) {
|
||||
private GetResult(boolean exists, long version, DocIdAndVersion docIdAndVersion, Searcher searcher) {
|
||||
this.exists = exists;
|
||||
this.version = version;
|
||||
this.docIdAndVersion = docIdAndVersion;
|
||||
|
@ -1242,7 +1245,7 @@ public abstract class Engine implements Closeable {
|
|||
/**
|
||||
* Build a non-realtime get result from the searcher.
|
||||
*/
|
||||
public GetResult(Searcher searcher, Versions.DocIdAndVersion docIdAndVersion) {
|
||||
public GetResult(Searcher searcher, DocIdAndVersion docIdAndVersion) {
|
||||
this(true, docIdAndVersion.version, docIdAndVersion, searcher);
|
||||
}
|
||||
|
||||
|
@ -1258,7 +1261,7 @@ public abstract class Engine implements Closeable {
|
|||
return this.searcher;
|
||||
}
|
||||
|
||||
public Versions.DocIdAndVersion docIdAndVersion() {
|
||||
public DocIdAndVersion docIdAndVersion() {
|
||||
return docIdAndVersion;
|
||||
}
|
||||
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.elasticsearch.common.lucene.LoggerInfoStream;
|
|||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.lucene.uid.VersionsResolver;
|
||||
import org.elasticsearch.common.metrics.CounterMetric;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
|
@ -385,13 +386,13 @@ public class InternalEngine extends Engine {
|
|||
if (get.realtime()) {
|
||||
VersionValue versionValue = versionMap.getUnderLock(get.uid());
|
||||
if (versionValue != null) {
|
||||
if (versionValue.delete()) {
|
||||
if (versionValue.isDelete()) {
|
||||
return GetResult.NOT_EXISTS;
|
||||
}
|
||||
if (get.versionType().isVersionConflictForReads(versionValue.version(), get.version())) {
|
||||
if (get.versionType().isVersionConflictForReads(versionValue.getVersion(), get.version())) {
|
||||
Uid uid = Uid.createUid(get.uid().text());
|
||||
throw new VersionConflictEngineException(shardId, uid.type(), uid.id(),
|
||||
get.versionType().explainConflictForReads(versionValue.version(), get.version()));
|
||||
get.versionType().explainConflictForReads(versionValue.getVersion(), get.version()));
|
||||
}
|
||||
refresh("realtime_get");
|
||||
}
|
||||
|
@ -403,55 +404,46 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
/**
|
||||
* Checks for version conflicts. If a non-critical version conflict exists <code>true</code> is returned. In the case of a critical
|
||||
* version conflict (if operation origin is primary) a {@link VersionConflictEngineException} is thrown.
|
||||
*
|
||||
* @param op the operation
|
||||
* @param currentVersion the current version
|
||||
* @param expectedVersion the expected version
|
||||
* @param deleted {@code true} if the current version is not found or represents a delete
|
||||
* @return <code>true</code> iff a non-critical version conflict (origin recovery or replica) is found otherwise <code>false</code>
|
||||
* @throws VersionConflictEngineException if a critical version conflict was found where the operation origin is primary
|
||||
* @throws IllegalArgumentException if an unsupported version type is used.
|
||||
* the status of the current doc version in lucene, compared to the version in an incoming
|
||||
* operation
|
||||
*/
|
||||
private boolean checkVersionConflict(final Operation op, final long currentVersion, final long expectedVersion, final boolean deleted) {
|
||||
if (op.versionType() == VersionType.FORCE) {
|
||||
if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
|
||||
// If index was created in 5.0 or later, 'force' is not allowed at all
|
||||
throw new IllegalArgumentException("version type [FORCE] may not be used for indices created after 6.0");
|
||||
} else if (op.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
|
||||
// For earlier indices, 'force' is only allowed for translog recovery
|
||||
throw new IllegalArgumentException("version type [FORCE] may not be used for non-translog operations");
|
||||
}
|
||||
}
|
||||
|
||||
if (op.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) {
|
||||
if (op.origin() == Operation.Origin.PRIMARY) {
|
||||
// fatal version conflict
|
||||
throw new VersionConflictEngineException(
|
||||
shardId,
|
||||
op.type(),
|
||||
op.id(),
|
||||
op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
|
||||
|
||||
} else {
|
||||
/* Version conflicts during recovery and on replicas are normal due to asynchronous execution; as such, we should return a
|
||||
* successful result.*/
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
enum OpVsLuceneDocStatus {
|
||||
/** the op is more recent than the one that last modified the doc found in lucene*/
|
||||
OP_NEWER,
|
||||
/** the op is older or the same as the one that last modified the doc found in lucene*/
|
||||
OP_STALE_OR_EQUAL,
|
||||
/** no doc was found in lucene */
|
||||
LUCENE_DOC_NOT_FOUND
|
||||
}
|
||||
|
||||
private long checkDeletedAndGCed(VersionValue versionValue) {
|
||||
long currentVersion;
|
||||
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().relativeTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) {
|
||||
currentVersion = Versions.NOT_FOUND; // deleted, and GC
|
||||
} else {
|
||||
currentVersion = versionValue.version();
|
||||
/** resolves the current version of the document, returning null if not found */
|
||||
private VersionValue resolveDocVersion(final Operation op) throws IOException {
|
||||
assert incrementVersionLookup(); // used for asserting in tests
|
||||
VersionValue versionValue = versionMap.getUnderLock(op.uid());
|
||||
if (versionValue == null) {
|
||||
assert incrementIndexVersionLookup(); // used for asserting in tests
|
||||
final long currentVersion = loadCurrentVersionFromIndex(op.uid());
|
||||
if (currentVersion != Versions.NOT_FOUND) {
|
||||
versionValue = new VersionValue(currentVersion);
|
||||
}
|
||||
} else if (engineConfig.isEnableGcDeletes() && versionValue.isDelete() &&
|
||||
(engineConfig.getThreadPool().relativeTimeInMillis() - versionValue.getTime()) >
|
||||
getGcDeletesInMillis()) {
|
||||
versionValue = null;
|
||||
}
|
||||
return versionValue;
|
||||
}
|
||||
|
||||
private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnVersions(final Operation op)
|
||||
throws IOException {
|
||||
assert op.version() >= 0 : "versions should be non-negative. got " + op.version();
|
||||
final VersionValue versionValue = resolveDocVersion(op);
|
||||
if (versionValue == null) {
|
||||
return OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
|
||||
} else {
|
||||
return op.version() > versionValue.getVersion() ?
|
||||
OpVsLuceneDocStatus.OP_NEWER : OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
|
||||
}
|
||||
return currentVersion;
|
||||
}
|
||||
|
||||
private boolean canOptimizeAddDocument(Index index) {
|
||||
|
@ -492,7 +484,7 @@ public class InternalEngine extends Engine {
|
|||
return true;
|
||||
}
|
||||
|
||||
private boolean assertSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) {
|
||||
private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) {
|
||||
if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1_UNRELEASED) && origin == Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
|
||||
// legacy support
|
||||
assert seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO : "old op recovering but it already has a seq no.;" +
|
||||
|
@ -507,20 +499,29 @@ public class InternalEngine extends Engine {
|
|||
return true;
|
||||
}
|
||||
|
||||
private boolean assertSequenceNumberBeforeIndexing(final Engine.Operation.Origin origin, final long seqNo) {
|
||||
if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED) ||
|
||||
origin == Operation.Origin.PRIMARY) {
|
||||
// sequence number should be set when operation origin is primary or when all shards are on new nodes
|
||||
assert seqNo >= 0 : "ops should have an assigned seq no.; origin: " + origin;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public IndexResult index(Index index) throws IOException {
|
||||
final boolean doThrottle = index.origin().isRecovery() == false;
|
||||
try (ReleasableLock releasableLock = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
assert assertSequenceNumber(index.origin(), index.seqNo());
|
||||
assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
|
||||
assert assertVersionType(index);
|
||||
final Translog.Location location;
|
||||
long seqNo = index.seqNo();
|
||||
try (Releasable ignored = acquireLock(index.uid());
|
||||
Releasable indexThrottle = doThrottle ? () -> {} : throttle.acquireThrottle()) {
|
||||
lastWriteNanos = index.startTime();
|
||||
/* if we have an autoGeneratedID that comes into the engine we can potentially optimize
|
||||
* and just use addDocument instead of updateDocument and skip the entire version and index lookup across the board.
|
||||
/* A NOTE ABOUT APPEND ONLY OPTIMIZATIONS:
|
||||
* if we have an autoGeneratedID that comes into the engine we can potentially optimize
|
||||
* and just use addDocument instead of updateDocument and skip the entire version and index lookupVersion across the board.
|
||||
* Yet, we have to deal with multiple document delivery, for this we use a property of the document that is added
|
||||
* to detect if it has potentially been added before. We use the documents timestamp for this since it's something
|
||||
* that:
|
||||
|
@ -543,62 +544,37 @@ public class InternalEngine extends Engine {
|
|||
* if A arrives on the shard first we use addDocument since maxUnsafeAutoIdTimestamp is < 10. A` will then just be skipped or calls
|
||||
* updateDocument.
|
||||
*/
|
||||
long currentVersion;
|
||||
final boolean deleted;
|
||||
// if anything is fishy here ie. there is a retry we go and force updateDocument below so we are updating the document in the
|
||||
// lucene index without checking the version map but we still do the version check
|
||||
final boolean forceUpdateDocument;
|
||||
final boolean canOptimizeAddDocument = canOptimizeAddDocument(index);
|
||||
if (canOptimizeAddDocument) {
|
||||
forceUpdateDocument = isForceUpdateDocument(index);
|
||||
currentVersion = Versions.NOT_FOUND;
|
||||
deleted = true;
|
||||
final IndexingStrategy plan;
|
||||
|
||||
if (index.origin() == Operation.Origin.PRIMARY) {
|
||||
plan = planIndexingAsPrimary(index);
|
||||
} else {
|
||||
// update the document
|
||||
forceUpdateDocument = false; // we don't force it - it depends on the version
|
||||
final VersionValue versionValue = versionMap.getUnderLock(index.uid());
|
||||
assert incrementVersionLookup();
|
||||
if (versionValue == null) {
|
||||
currentVersion = loadCurrentVersionFromIndex(index.uid());
|
||||
deleted = currentVersion == Versions.NOT_FOUND;
|
||||
} else {
|
||||
currentVersion = checkDeletedAndGCed(versionValue);
|
||||
deleted = versionValue.delete();
|
||||
}
|
||||
}
|
||||
final long expectedVersion = index.version();
|
||||
Optional<IndexResult> resultOnVersionConflict;
|
||||
try {
|
||||
final boolean isVersionConflict = checkVersionConflict(index, currentVersion, expectedVersion, deleted);
|
||||
resultOnVersionConflict = isVersionConflict ? Optional.of(new IndexResult(currentVersion, index.seqNo(), false))
|
||||
: Optional.empty();
|
||||
} catch (IllegalArgumentException | VersionConflictEngineException ex) {
|
||||
resultOnVersionConflict = Optional.of(new IndexResult(ex, currentVersion, index.seqNo()));
|
||||
// non-primary mode (i.e., replica or recovery)
|
||||
plan = planIndexingAsNonPrimary(index);
|
||||
}
|
||||
|
||||
final IndexResult indexResult;
|
||||
if (resultOnVersionConflict.isPresent()) {
|
||||
indexResult = resultOnVersionConflict.get();
|
||||
if (plan.earlyResultOnPreFlightError.isPresent()) {
|
||||
indexResult = plan.earlyResultOnPreFlightError.get();
|
||||
assert indexResult.hasFailure();
|
||||
} else if (plan.indexIntoLucene) {
|
||||
indexResult = indexIntoLucene(index, plan);
|
||||
} else {
|
||||
// no version conflict
|
||||
if (index.origin() == Operation.Origin.PRIMARY) {
|
||||
seqNo = seqNoService().generateSeqNo();
|
||||
}
|
||||
indexResult = indexIntoLucene(index, seqNo, currentVersion, deleted, forceUpdateDocument, canOptimizeAddDocument, expectedVersion);
|
||||
indexResult = new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing,
|
||||
plan.currentNotFoundOrDeleted);
|
||||
}
|
||||
if (indexResult.hasFailure() == false) {
|
||||
location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
|
||||
? translog.add(new Translog.Index(index, indexResult))
|
||||
: null;
|
||||
if (indexResult.hasFailure() == false &&
|
||||
index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
|
||||
Translog.Location location =
|
||||
translog.add(new Translog.Index(index, indexResult));
|
||||
indexResult.setTranslogLocation(location);
|
||||
}
|
||||
if (indexResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
|
||||
seqNoService().markSeqNoAsCompleted(indexResult.getSeqNo());
|
||||
}
|
||||
indexResult.setTook(System.nanoTime() - index.startTime());
|
||||
indexResult.freeze();
|
||||
return indexResult;
|
||||
} finally {
|
||||
if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
|
||||
seqNoService().markSeqNoAsCompleted(seqNo);
|
||||
}
|
||||
}
|
||||
} catch (RuntimeException | IOException e) {
|
||||
try {
|
||||
|
@ -610,24 +586,94 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
}
|
||||
|
||||
private IndexResult indexIntoLucene(Index index, long seqNo, long currentVersion, boolean deleted, boolean forceUpdateDocument, boolean canOptimizeAddDocument, long expectedVersion) throws IOException {
|
||||
private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException {
|
||||
final IndexingStrategy plan;
|
||||
if (canOptimizeAddDocument(index) && mayHaveBeenIndexedBefore(index) == false) {
|
||||
// no need to deal with out of order delivery - we never saw this one
|
||||
assert index.version() == 1L :
|
||||
"can optimize on replicas but incoming version is [" + index.version() + "]";
|
||||
plan = IndexingStrategy.optimizedAppendOnly(index.seqNo());
|
||||
} else {
|
||||
// drop out of order operations
|
||||
assert index.versionType().versionTypeForReplicationAndRecovery() == index.versionType() :
|
||||
"resolving out of order delivery based on versioning but version type isn't fit for it. got ["
|
||||
+ index.versionType() + "]";
|
||||
// unlike the primary, replicas don't really care to about creation status of documents
|
||||
// this allows to ignore the case where a document was found in the live version maps in
|
||||
// a delete state and return false for the created flag in favor of code simplicity
|
||||
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnVersions(index);
|
||||
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
|
||||
plan = IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version());
|
||||
} else {
|
||||
plan = IndexingStrategy.processNormally(
|
||||
opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.seqNo(), index.version()
|
||||
);
|
||||
}
|
||||
}
|
||||
return plan;
|
||||
}
|
||||
|
||||
private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
|
||||
assert index.origin() == Operation.Origin.PRIMARY :
|
||||
"planing as primary but origin isn't. got " + index.origin();
|
||||
final IndexingStrategy plan;
|
||||
// resolve an external operation into an internal one which is safe to replay
|
||||
if (canOptimizeAddDocument(index)) {
|
||||
if (mayHaveBeenIndexedBefore(index)) {
|
||||
plan = IndexingStrategy.overrideExistingAsIfNotThere(seqNoService().generateSeqNo(), 1L);
|
||||
} else {
|
||||
plan = IndexingStrategy.optimizedAppendOnly(seqNoService().generateSeqNo());
|
||||
}
|
||||
} else {
|
||||
// resolves incoming version
|
||||
final VersionValue versionValue = resolveDocVersion(index);
|
||||
final long currentVersion;
|
||||
final boolean currentNotFoundOrDeleted;
|
||||
if (versionValue == null) {
|
||||
currentVersion = Versions.NOT_FOUND;
|
||||
currentNotFoundOrDeleted = true;
|
||||
} else {
|
||||
currentVersion = versionValue.getVersion();
|
||||
currentNotFoundOrDeleted = versionValue.isDelete();
|
||||
}
|
||||
if (index.versionType().isVersionConflictForWrites(
|
||||
currentVersion, index.version(), currentNotFoundOrDeleted)) {
|
||||
plan = IndexingStrategy.skipDueToVersionConflict(
|
||||
new VersionConflictEngineException(shardId, index, currentVersion,
|
||||
currentNotFoundOrDeleted),
|
||||
currentNotFoundOrDeleted, currentVersion);
|
||||
} else {
|
||||
plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted,
|
||||
seqNoService().generateSeqNo(),
|
||||
index.versionType().updateVersion(currentVersion, index.version())
|
||||
);
|
||||
}
|
||||
}
|
||||
return plan;
|
||||
}
|
||||
|
||||
private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
|
||||
throws IOException {
|
||||
assert assertSequenceNumberBeforeIndexing(index.origin(), plan.seqNoForIndexing);
|
||||
assert plan.versionForIndexing >= 0 : "version must be set. got " + plan.versionForIndexing;
|
||||
assert plan.indexIntoLucene;
|
||||
/* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence
|
||||
* number service if this is on the primary, or the existing document's sequence number if this is on the replica. The
|
||||
* primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created.
|
||||
*/
|
||||
index.parsedDoc().updateSeqID(seqNo, index.primaryTerm());
|
||||
final long updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
|
||||
index.parsedDoc().version().setLongValue(updatedVersion);
|
||||
index.parsedDoc().updateSeqID(plan.seqNoForIndexing, index.primaryTerm());
|
||||
index.parsedDoc().version().setLongValue(plan.versionForIndexing);
|
||||
try {
|
||||
if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) {
|
||||
// document does not exists, we can optimize for create, but double check if assertions are running
|
||||
assert assertDocDoesNotExist(index, canOptimizeAddDocument == false);
|
||||
index(index.docs(), indexWriter);
|
||||
} else {
|
||||
if (plan.useLuceneUpdateDocument) {
|
||||
update(index.uid(), index.docs(), indexWriter);
|
||||
} else {
|
||||
// document does not exists, we can optimize for create, but double check if assertions are running
|
||||
assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false);
|
||||
index(index.docs(), indexWriter);
|
||||
}
|
||||
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
|
||||
return new IndexResult(updatedVersion, seqNo, deleted);
|
||||
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(plan.versionForIndexing));
|
||||
return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing,
|
||||
plan.currentNotFoundOrDeleted);
|
||||
} catch (Exception ex) {
|
||||
if (indexWriter.getTragicException() == null) {
|
||||
/* There is no tragic event recorded so this must be a document failure.
|
||||
|
@ -639,19 +685,29 @@ public class InternalEngine extends Engine {
|
|||
* Bottom line is that we can only rely on the fact that if it's a document failure then
|
||||
* `indexWriter.getTragicException()` will be null otherwise we have to rethrow and treat it as fatal or rather
|
||||
* non-document failure
|
||||
*
|
||||
* we return a `MATCH_ANY` version to indicate no document was index. The value is
|
||||
* not used anyway
|
||||
*/
|
||||
return new IndexResult(ex, currentVersion, index.seqNo());
|
||||
return new IndexResult(ex, Versions.MATCH_ANY, index.seqNo());
|
||||
} else {
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isForceUpdateDocument(Index index) {
|
||||
boolean forceUpdateDocument;
|
||||
/**
|
||||
* returns true if the indexing operation may have already be processed by this engine.
|
||||
* Note that it is OK to rarely return true even if this is not the case. However a `false`
|
||||
* return value must always be correct.
|
||||
*
|
||||
*/
|
||||
private boolean mayHaveBeenIndexedBefore(Index index) {
|
||||
assert canOptimizeAddDocument(index);
|
||||
boolean mayHaveBeenIndexBefore;
|
||||
long deOptimizeTimestamp = maxUnsafeAutoIdTimestamp.get();
|
||||
if (index.isRetry()) {
|
||||
forceUpdateDocument = true;
|
||||
mayHaveBeenIndexBefore = true;
|
||||
do {
|
||||
deOptimizeTimestamp = maxUnsafeAutoIdTimestamp.get();
|
||||
if (deOptimizeTimestamp >= index.getAutoGeneratedIdTimestamp()) {
|
||||
|
@ -662,9 +718,9 @@ public class InternalEngine extends Engine {
|
|||
assert maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp();
|
||||
} else {
|
||||
// in this case we force
|
||||
forceUpdateDocument = deOptimizeTimestamp >= index.getAutoGeneratedIdTimestamp();
|
||||
mayHaveBeenIndexBefore = deOptimizeTimestamp >= index.getAutoGeneratedIdTimestamp();
|
||||
}
|
||||
return forceUpdateDocument;
|
||||
return mayHaveBeenIndexBefore;
|
||||
}
|
||||
|
||||
private static void index(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
|
||||
|
@ -675,13 +731,70 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
}
|
||||
|
||||
private static final class IndexingStrategy {
|
||||
final boolean currentNotFoundOrDeleted;
|
||||
final boolean useLuceneUpdateDocument;
|
||||
final long seqNoForIndexing;
|
||||
final long versionForIndexing;
|
||||
final boolean indexIntoLucene;
|
||||
final Optional<IndexResult> earlyResultOnPreFlightError;
|
||||
|
||||
private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpdateDocument,
|
||||
boolean indexIntoLucene, long seqNoForIndexing,
|
||||
long versionForIndexing, IndexResult earlyResultOnPreFlightError) {
|
||||
assert useLuceneUpdateDocument == false || indexIntoLucene :
|
||||
"use lucene update is set to true, but we're not indexing into lucene";
|
||||
assert (indexIntoLucene && earlyResultOnPreFlightError != null) == false :
|
||||
"can only index into lucene or have a preflight result but not both." +
|
||||
"indexIntoLucene: " + indexIntoLucene
|
||||
+ " earlyResultOnPreFlightError:" + earlyResultOnPreFlightError;
|
||||
this.currentNotFoundOrDeleted = currentNotFoundOrDeleted;
|
||||
this.useLuceneUpdateDocument = useLuceneUpdateDocument;
|
||||
this.seqNoForIndexing = seqNoForIndexing;
|
||||
this.versionForIndexing = versionForIndexing;
|
||||
this.indexIntoLucene = indexIntoLucene;
|
||||
this.earlyResultOnPreFlightError =
|
||||
earlyResultOnPreFlightError == null ? Optional.empty() :
|
||||
Optional.of(earlyResultOnPreFlightError);
|
||||
}
|
||||
|
||||
static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing) {
|
||||
return new IndexingStrategy(true, false, true, seqNoForIndexing, 1, null);
|
||||
}
|
||||
|
||||
static IndexingStrategy skipDueToVersionConflict(VersionConflictEngineException e,
|
||||
boolean currentNotFoundOrDeleted,
|
||||
long currentVersion) {
|
||||
return new IndexingStrategy(currentNotFoundOrDeleted, false,
|
||||
false, SequenceNumbersService.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND,
|
||||
new IndexResult(e, currentVersion));
|
||||
}
|
||||
|
||||
static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted,
|
||||
long seqNoForIndexing, long versionForIndexing) {
|
||||
return new IndexingStrategy(currentNotFoundOrDeleted, currentNotFoundOrDeleted == false,
|
||||
true, seqNoForIndexing, versionForIndexing, null);
|
||||
}
|
||||
|
||||
static IndexingStrategy overrideExistingAsIfNotThere(
|
||||
long seqNoForIndexing, long versionForIndexing) {
|
||||
return new IndexingStrategy(true, true, true, seqNoForIndexing, versionForIndexing, null);
|
||||
}
|
||||
|
||||
static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted,
|
||||
long seqNoForIndexing, long versionForIndexing) {
|
||||
return new IndexingStrategy(currentNotFoundOrDeleted, false,
|
||||
false, seqNoForIndexing, versionForIndexing, null);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts that the doc in the index operation really doesn't exist
|
||||
*/
|
||||
private boolean assertDocDoesNotExist(final Index index, final boolean allowDeleted) throws IOException {
|
||||
final VersionValue versionValue = versionMap.getUnderLock(index.uid());
|
||||
if (versionValue != null) {
|
||||
if (versionValue.delete() == false || allowDeleted == false) {
|
||||
if (versionValue.isDelete() == false || allowDeleted == false) {
|
||||
throw new AssertionError("doc [" + index.type() + "][" + index.id() + "] exists in version map (version " + versionValue + ")");
|
||||
}
|
||||
} else {
|
||||
|
@ -705,12 +818,39 @@ public class InternalEngine extends Engine {
|
|||
|
||||
@Override
|
||||
public DeleteResult delete(Delete delete) throws IOException {
|
||||
DeleteResult result;
|
||||
try (ReleasableLock ignored = readLock.acquire()) {
|
||||
assert assertVersionType(delete);
|
||||
assert assertVersionType(delete);
|
||||
assert assertIncomingSequenceNumber(delete.origin(), delete.seqNo());
|
||||
final DeleteResult deleteResult;
|
||||
// NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments:
|
||||
try (ReleasableLock ignored = readLock.acquire(); Releasable ignored2 = acquireLock(delete.uid())) {
|
||||
ensureOpen();
|
||||
// NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments:
|
||||
result = innerDelete(delete);
|
||||
lastWriteNanos = delete.startTime();
|
||||
final DeletionStrategy plan;
|
||||
if (delete.origin() == Operation.Origin.PRIMARY) {
|
||||
plan = planDeletionAsPrimary(delete);
|
||||
} else {
|
||||
plan = planDeletionAsNonPrimary(delete);
|
||||
}
|
||||
|
||||
if (plan.earlyResultOnPreflightError.isPresent()) {
|
||||
deleteResult = plan.earlyResultOnPreflightError.get();
|
||||
} else if (plan.deleteFromLucene) {
|
||||
deleteResult = deleteInLucene(delete, plan);
|
||||
} else {
|
||||
deleteResult = new DeleteResult(plan.versionOfDeletion, plan.seqNoOfDeletion,
|
||||
plan.currentlyDeleted == false);
|
||||
}
|
||||
if (!deleteResult.hasFailure() &&
|
||||
delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
|
||||
Translog.Location location =
|
||||
translog.add(new Translog.Delete(delete, deleteResult));
|
||||
deleteResult.setTranslogLocation(location);
|
||||
}
|
||||
if (deleteResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
|
||||
seqNoService().markSeqNoAsCompleted(deleteResult.getSeqNo());
|
||||
}
|
||||
deleteResult.setTook(System.nanoTime() - delete.startTime());
|
||||
deleteResult.freeze();
|
||||
} catch (RuntimeException | IOException e) {
|
||||
try {
|
||||
maybeFailEngine("index", e);
|
||||
|
@ -720,7 +860,128 @@ public class InternalEngine extends Engine {
|
|||
throw e;
|
||||
}
|
||||
maybePruneDeletedTombstones();
|
||||
return result;
|
||||
return deleteResult;
|
||||
}
|
||||
|
||||
private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException {
|
||||
assert delete.origin() != Operation.Origin.PRIMARY : "planing as primary but got "
|
||||
+ delete.origin();
|
||||
// drop out of order operations
|
||||
assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() :
|
||||
"resolving out of order delivery based on versioning but version type isn't fit for it. got ["
|
||||
+ delete.versionType() + "]";
|
||||
// unlike the primary, replicas don't really care to about found status of documents
|
||||
// this allows to ignore the case where a document was found in the live version maps in
|
||||
// a delete state and return true for the found flag in favor of code simplicity
|
||||
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnVersions(delete);
|
||||
|
||||
final DeletionStrategy plan;
|
||||
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
|
||||
plan = DeletionStrategy.processButSkipLucene(false, delete.seqNo(), delete.version());
|
||||
} else {
|
||||
plan = DeletionStrategy.processNormally(
|
||||
opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND,
|
||||
delete.seqNo(), delete.version());
|
||||
}
|
||||
return plan;
|
||||
}
|
||||
|
||||
private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException {
|
||||
assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got "
|
||||
+ delete.origin();
|
||||
// resolve operation from external to internal
|
||||
final VersionValue versionValue = resolveDocVersion(delete);
|
||||
assert incrementVersionLookup();
|
||||
final long currentVersion;
|
||||
final boolean currentlyDeleted;
|
||||
if (versionValue == null) {
|
||||
currentVersion = Versions.NOT_FOUND;
|
||||
currentlyDeleted = true;
|
||||
} else {
|
||||
currentVersion = versionValue.getVersion();
|
||||
currentlyDeleted = versionValue.isDelete();
|
||||
}
|
||||
final DeletionStrategy plan;
|
||||
if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) {
|
||||
plan = DeletionStrategy.skipDueToVersionConflict(
|
||||
new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted),
|
||||
currentVersion, currentlyDeleted);
|
||||
} else {
|
||||
plan = DeletionStrategy.processNormally(currentlyDeleted,
|
||||
seqNoService().generateSeqNo(),
|
||||
delete.versionType().updateVersion(currentVersion, delete.version()));
|
||||
}
|
||||
return plan;
|
||||
}
|
||||
|
||||
private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan)
|
||||
throws IOException {
|
||||
try {
|
||||
if (plan.currentlyDeleted == false) {
|
||||
// any exception that comes from this is a either an ACE or a fatal exception there
|
||||
// can't be any document failures coming from this
|
||||
indexWriter.deleteDocuments(delete.uid());
|
||||
}
|
||||
versionMap.putUnderLock(delete.uid().bytes(),
|
||||
new DeleteVersionValue(plan.versionOfDeletion,
|
||||
engineConfig.getThreadPool().relativeTimeInMillis()));
|
||||
return new DeleteResult(
|
||||
plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false);
|
||||
} catch (Exception ex) {
|
||||
if (indexWriter.getTragicException() == null) {
|
||||
// there is no tragic event and such it must be a document level failure
|
||||
return new DeleteResult(ex, plan.versionOfDeletion, plan.versionOfDeletion,
|
||||
plan.currentlyDeleted == false);
|
||||
} else {
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static final class DeletionStrategy {
|
||||
// of a rare double delete
|
||||
final boolean deleteFromLucene;
|
||||
final boolean currentlyDeleted;
|
||||
final long seqNoOfDeletion;
|
||||
final long versionOfDeletion;
|
||||
final Optional<DeleteResult> earlyResultOnPreflightError;
|
||||
|
||||
private DeletionStrategy(boolean deleteFromLucene, boolean currentlyDeleted,
|
||||
long seqNoOfDeletion, long versionOfDeletion,
|
||||
DeleteResult earlyResultOnPreflightError) {
|
||||
assert (deleteFromLucene && earlyResultOnPreflightError != null) == false :
|
||||
"can only delete from lucene or have a preflight result but not both." +
|
||||
"deleteFromLucene: " + deleteFromLucene
|
||||
+ " earlyResultOnPreFlightError:" + earlyResultOnPreflightError;
|
||||
this.deleteFromLucene = deleteFromLucene;
|
||||
this.currentlyDeleted = currentlyDeleted;
|
||||
this.seqNoOfDeletion = seqNoOfDeletion;
|
||||
this.versionOfDeletion = versionOfDeletion;
|
||||
this.earlyResultOnPreflightError = earlyResultOnPreflightError == null ?
|
||||
Optional.empty() : Optional.of(earlyResultOnPreflightError);
|
||||
}
|
||||
|
||||
static DeletionStrategy skipDueToVersionConflict(VersionConflictEngineException e,
|
||||
long currentVersion, boolean currentlyDeleted) {
|
||||
return new DeletionStrategy(false, currentlyDeleted,
|
||||
SequenceNumbersService.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND,
|
||||
new DeleteResult(e, currentVersion, SequenceNumbersService.UNASSIGNED_SEQ_NO,
|
||||
currentlyDeleted == false));
|
||||
}
|
||||
|
||||
static DeletionStrategy processNormally(boolean currentlyDeleted,
|
||||
long seqNoOfDeletion, long versionOfDeletion) {
|
||||
return new DeletionStrategy(true, currentlyDeleted, seqNoOfDeletion, versionOfDeletion,
|
||||
null);
|
||||
|
||||
}
|
||||
|
||||
public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted,
|
||||
long seqNoOfDeletion,
|
||||
long versionOfDeletion) {
|
||||
return new DeletionStrategy(false, currentlyDeleted, seqNoOfDeletion, versionOfDeletion,
|
||||
null);
|
||||
}
|
||||
}
|
||||
|
||||
private void maybePruneDeletedTombstones() {
|
||||
|
@ -731,84 +992,6 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
}
|
||||
|
||||
private DeleteResult innerDelete(Delete delete) throws IOException {
|
||||
assert assertSequenceNumber(delete.origin(), delete.seqNo());
|
||||
final Translog.Location location;
|
||||
final long updatedVersion;
|
||||
final boolean found;
|
||||
long seqNo = delete.seqNo();
|
||||
try (Releasable ignored = acquireLock(delete.uid())) {
|
||||
lastWriteNanos = delete.startTime();
|
||||
final long currentVersion;
|
||||
final boolean deleted;
|
||||
final VersionValue versionValue = versionMap.getUnderLock(delete.uid());
|
||||
assert incrementVersionLookup();
|
||||
if (versionValue == null) {
|
||||
currentVersion = loadCurrentVersionFromIndex(delete.uid());
|
||||
deleted = currentVersion == Versions.NOT_FOUND;
|
||||
} else {
|
||||
currentVersion = checkDeletedAndGCed(versionValue);
|
||||
deleted = versionValue.delete();
|
||||
}
|
||||
|
||||
final long expectedVersion = delete.version();
|
||||
Optional<DeleteResult> resultOnVersionConflict;
|
||||
try {
|
||||
final boolean isVersionConflict = checkVersionConflict(delete, currentVersion, expectedVersion, deleted);
|
||||
resultOnVersionConflict = isVersionConflict ? Optional.of(new DeleteResult(expectedVersion, delete.seqNo(), true))
|
||||
: Optional.empty();
|
||||
} catch (IllegalArgumentException | VersionConflictEngineException ex) {
|
||||
resultOnVersionConflict = Optional.of(new DeleteResult(ex, expectedVersion, delete.seqNo()));
|
||||
}
|
||||
final DeleteResult deleteResult;
|
||||
if (resultOnVersionConflict.isPresent()) {
|
||||
deleteResult = resultOnVersionConflict.get();
|
||||
} else {
|
||||
if (delete.origin() == Operation.Origin.PRIMARY) {
|
||||
seqNo = seqNoService().generateSeqNo();
|
||||
}
|
||||
updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);
|
||||
found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue);
|
||||
deleteResult = new DeleteResult(updatedVersion, seqNo, found);
|
||||
|
||||
versionMap.putUnderLock(delete.uid().bytes(),
|
||||
new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().relativeTimeInMillis()));
|
||||
}
|
||||
if (!deleteResult.hasFailure()) {
|
||||
location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
|
||||
? translog.add(new Translog.Delete(delete, deleteResult))
|
||||
: null;
|
||||
deleteResult.setTranslogLocation(location);
|
||||
}
|
||||
deleteResult.setTook(System.nanoTime() - delete.startTime());
|
||||
deleteResult.freeze();
|
||||
return deleteResult;
|
||||
} finally {
|
||||
if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
|
||||
seqNoService().markSeqNoAsCompleted(seqNo);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean deleteIfFound(Term uid, long currentVersion, boolean deleted, VersionValue versionValue) throws IOException {
|
||||
assert uid != null : "uid must not be null";
|
||||
final boolean found;
|
||||
if (currentVersion == Versions.NOT_FOUND) {
|
||||
// doc does not exist and no prior deletes
|
||||
found = false;
|
||||
} else if (versionValue != null && deleted) {
|
||||
// a "delete on delete", in this case, we still increment the version, log it, and return that version
|
||||
found = false;
|
||||
} else {
|
||||
// we deleted a currently existing document
|
||||
// any exception that comes from this is a either an ACE or a fatal exception there can't be any document failures coming
|
||||
// from this.
|
||||
indexWriter.deleteDocuments(uid);
|
||||
found = true;
|
||||
}
|
||||
return found;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NoOpResult noOp(final NoOp noOp) {
|
||||
NoOpResult noOpResult;
|
||||
|
@ -1059,7 +1242,7 @@ public class InternalEngine extends Engine {
|
|||
// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
|
||||
VersionValue versionValue = versionMap.getTombstoneUnderLock(uid);
|
||||
if (versionValue != null) {
|
||||
if (timeMSec - versionValue.time() > getGcDeletesInMillis()) {
|
||||
if (timeMSec - versionValue.getTime() > getGcDeletesInMillis()) {
|
||||
versionMap.removeTombstoneUnderLock(uid);
|
||||
}
|
||||
}
|
||||
|
@ -1069,6 +1252,11 @@ public class InternalEngine extends Engine {
|
|||
lastDeleteVersionPruneTimeMSec = timeMSec;
|
||||
}
|
||||
|
||||
// testing
|
||||
void clearDeletedTombstones() {
|
||||
versionMap.clearTombstones();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes,
|
||||
final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException, IOException {
|
||||
|
@ -1302,7 +1490,7 @@ public class InternalEngine extends Engine {
|
|||
private long loadCurrentVersionFromIndex(Term uid) throws IOException {
|
||||
assert incrementIndexVersionLookup();
|
||||
try (Searcher searcher = acquireSearcher("load_version")) {
|
||||
return Versions.loadVersion(searcher.reader(), uid);
|
||||
return VersionsResolver.loadVersion(searcher.reader(), uid);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -164,7 +164,7 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
|
|||
if (prev != null) {
|
||||
// Deduct RAM for the version we just replaced:
|
||||
long prevBytes = BASE_BYTES_PER_CHM_ENTRY;
|
||||
if (prev.delete() == false) {
|
||||
if (prev.isDelete() == false) {
|
||||
prevBytes += prev.ramBytesUsed() + uidRAMBytesUsed;
|
||||
}
|
||||
ramBytesUsedCurrent.addAndGet(-prevBytes);
|
||||
|
@ -172,13 +172,13 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
|
|||
|
||||
// Add RAM for the new version:
|
||||
long newBytes = BASE_BYTES_PER_CHM_ENTRY;
|
||||
if (version.delete() == false) {
|
||||
if (version.isDelete() == false) {
|
||||
newBytes += version.ramBytesUsed() + uidRAMBytesUsed;
|
||||
}
|
||||
ramBytesUsedCurrent.addAndGet(newBytes);
|
||||
|
||||
final VersionValue prevTombstone;
|
||||
if (version.delete()) {
|
||||
if (version.isDelete()) {
|
||||
// Also enroll the delete into tombstones, and account for its RAM too:
|
||||
prevTombstone = tombstones.put(uid, version);
|
||||
|
||||
|
@ -187,7 +187,7 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
|
|||
// the accounting to current:
|
||||
ramBytesUsedTombstones.addAndGet(BASE_BYTES_PER_CHM_ENTRY + version.ramBytesUsed() + uidRAMBytesUsed);
|
||||
|
||||
if (prevTombstone == null && prev != null && prev.delete()) {
|
||||
if (prevTombstone == null && prev != null && prev.isDelete()) {
|
||||
// If prev was a delete that had already been removed from tombstones, then current was already accounting for the
|
||||
// BytesRef/VersionValue RAM, so we now deduct that as well:
|
||||
ramBytesUsedCurrent.addAndGet(-(prev.ramBytesUsed() + uidRAMBytesUsed));
|
||||
|
@ -211,12 +211,12 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
|
|||
|
||||
final VersionValue prev = tombstones.remove(uid);
|
||||
if (prev != null) {
|
||||
assert prev.delete();
|
||||
assert prev.isDelete();
|
||||
long v = ramBytesUsedTombstones.addAndGet(-(BASE_BYTES_PER_CHM_ENTRY + prev.ramBytesUsed() + uidRAMBytesUsed));
|
||||
assert v >= 0: "bytes=" + v;
|
||||
}
|
||||
final VersionValue curVersion = maps.current.get(uid);
|
||||
if (curVersion != null && curVersion.delete()) {
|
||||
if (curVersion != null && curVersion.isDelete()) {
|
||||
// We now shift accounting of the BytesRef from tombstones to current, because a refresh would clear this RAM. This should be
|
||||
// uncommon, because with the default refresh=1s and gc_deletes=60s, deletes should be cleared from current long before we drop
|
||||
// them from tombstones:
|
||||
|
@ -234,6 +234,11 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
|
|||
return tombstones.entrySet();
|
||||
}
|
||||
|
||||
/** clears all tombstones ops */
|
||||
void clearTombstones() {
|
||||
tombstones.clear();
|
||||
}
|
||||
|
||||
/** Called when this index is closed. */
|
||||
synchronized void clear() {
|
||||
maps = new Maps();
|
||||
|
|
|
@ -26,6 +26,10 @@ import java.io.IOException;
|
|||
|
||||
public class VersionConflictEngineException extends EngineException {
|
||||
|
||||
public VersionConflictEngineException(ShardId shardId, Engine.Operation op, long currentVersion, boolean deleted) {
|
||||
this(shardId, op.type(), op.id(), op.versionType().explainConflictForWrites(currentVersion, op.version(), deleted));
|
||||
}
|
||||
|
||||
public VersionConflictEngineException(ShardId shardId, String type, String id, String explanation) {
|
||||
this(shardId, null, type, id, explanation);
|
||||
}
|
||||
|
|
|
@ -29,25 +29,25 @@ class VersionValue implements Accountable {
|
|||
|
||||
private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(VersionValue.class);
|
||||
|
||||
/** the version of the document. used for versioned indexed operations and as a BWC layer, where no seq# are set yet */
|
||||
private final long version;
|
||||
|
||||
VersionValue(long version) {
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
public long time() {
|
||||
public long getTime() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public long version() {
|
||||
public long getVersion() {
|
||||
return version;
|
||||
}
|
||||
|
||||
public boolean delete() {
|
||||
public boolean isDelete() {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public long ramBytesUsed() {
|
||||
return BASE_RAM_BYTES_USED;
|
||||
|
@ -61,7 +61,6 @@ class VersionValue implements Accountable {
|
|||
@Override
|
||||
public String toString() {
|
||||
return "VersionValue{" +
|
||||
"version=" + version +
|
||||
'}';
|
||||
"version=" + version + "}";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.elasticsearch.ElasticsearchException;
|
|||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion;
|
||||
import org.elasticsearch.common.metrics.CounterMetric;
|
||||
import org.elasticsearch.common.metrics.MeanMetric;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
|
@ -179,7 +179,7 @@ public final class ShardGetService extends AbstractIndexShardComponent {
|
|||
private GetResult innerGetLoadFromStoredFields(String type, String id, String[] gFields, FetchSourceContext fetchSourceContext, Engine.GetResult get, MapperService mapperService) {
|
||||
Map<String, GetField> fields = null;
|
||||
BytesReference source = null;
|
||||
Versions.DocIdAndVersion docIdAndVersion = get.docIdAndVersion();
|
||||
DocIdAndVersion docIdAndVersion = get.docIdAndVersion();
|
||||
FieldsVisitor fieldVisitor = buildFieldsVisitors(gFields, fetchSourceContext);
|
||||
if (fieldVisitor != null) {
|
||||
try {
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.elasticsearch.common.Nullable;
|
|||
import org.elasticsearch.common.lucene.all.AllEntries;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
|
|
|
@ -26,32 +26,19 @@ import org.apache.lucene.document.SortedNumericDocValuesField;
|
|||
import org.apache.lucene.index.DocValuesType;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.IndexableField;
|
||||
import org.apache.lucene.index.LeafReader;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.index.NumericDocValues;
|
||||
import org.apache.lucene.index.PointValues;
|
||||
import org.apache.lucene.search.BoostQuery;
|
||||
import org.apache.lucene.search.MatchNoDocsQuery;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.action.fieldstats.FieldStats;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldData;
|
||||
import org.elasticsearch.index.fielddata.IndexNumericFieldData.NumericType;
|
||||
import org.elasticsearch.index.fielddata.plain.DocValuesIndexFieldData;
|
||||
import org.elasticsearch.index.mapper.MappedFieldType;
|
||||
import org.elasticsearch.index.mapper.Mapper;
|
||||
import org.elasticsearch.index.mapper.MapperParsingException;
|
||||
import org.elasticsearch.index.mapper.MetadataFieldMapper;
|
||||
import org.elasticsearch.index.mapper.ParseContext;
|
||||
import org.elasticsearch.index.mapper.Mapper.TypeParser.ParserContext;
|
||||
import org.elasticsearch.index.mapper.ParseContext.Document;
|
||||
import org.elasticsearch.index.query.QueryShardContext;
|
||||
import org.elasticsearch.index.query.QueryShardException;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
||||
|
||||
import java.io.IOException;
|
||||
|
|
|
@ -34,7 +34,7 @@ import org.elasticsearch.action.termvectors.TermVectorsResponse;
|
|||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
|
@ -98,7 +98,7 @@ public class TermVectorsService {
|
|||
final Engine.Searcher searcher = indexShard.acquireSearcher("term_vector");
|
||||
try {
|
||||
Fields topLevelFields = MultiFields.getFields(get.searcher() != null ? get.searcher().reader() : searcher.reader());
|
||||
Versions.DocIdAndVersion docIdAndVersion = get.docIdAndVersion();
|
||||
DocIdAndVersion docIdAndVersion = get.docIdAndVersion();
|
||||
/* from an artificial document */
|
||||
if (request.doc() != null) {
|
||||
termVectorsByField = generateTermVectorsFromDoc(indexShard, request);
|
||||
|
|
|
@ -31,17 +31,17 @@ import org.apache.lucene.util.Bits;
|
|||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.FixedBitSet;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.uid.Versions.DocIdAndVersion;
|
||||
import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion;
|
||||
import org.elasticsearch.index.mapper.UidFieldMapper;
|
||||
import org.elasticsearch.index.mapper.VersionFieldMapper;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
/**
|
||||
* test per-segment lookup of version-related datastructures
|
||||
/**
|
||||
* test per-segment lookup of version-related data structures
|
||||
*/
|
||||
public class VersionLookupTests extends ESTestCase {
|
||||
|
||||
/**
|
||||
/**
|
||||
* test version lookup actually works
|
||||
*/
|
||||
public void testSimple() throws Exception {
|
||||
|
@ -55,20 +55,20 @@ public class VersionLookupTests extends ESTestCase {
|
|||
LeafReaderContext segment = reader.leaves().get(0);
|
||||
PerThreadIDAndVersionLookup lookup = new PerThreadIDAndVersionLookup(segment.reader());
|
||||
// found doc
|
||||
DocIdAndVersion result = lookup.lookup(new BytesRef("6"), null, segment);
|
||||
DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), null, segment);
|
||||
assertNotNull(result);
|
||||
assertEquals(87, result.version);
|
||||
assertEquals(0, result.docId);
|
||||
// not found doc
|
||||
assertNull(lookup.lookup(new BytesRef("7"), null, segment));
|
||||
assertNull(lookup.lookupVersion(new BytesRef("7"), null, segment));
|
||||
// deleted doc
|
||||
assertNull(lookup.lookup(new BytesRef("6"), new Bits.MatchNoBits(1), segment));
|
||||
assertNull(lookup.lookupVersion(new BytesRef("6"), new Bits.MatchNoBits(1), segment));
|
||||
reader.close();
|
||||
writer.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
/**
|
||||
* test version lookup with two documents matching the ID
|
||||
*/
|
||||
public void testTwoDocuments() throws Exception {
|
||||
|
@ -83,26 +83,26 @@ public class VersionLookupTests extends ESTestCase {
|
|||
LeafReaderContext segment = reader.leaves().get(0);
|
||||
PerThreadIDAndVersionLookup lookup = new PerThreadIDAndVersionLookup(segment.reader());
|
||||
// return the last doc when there are duplicates
|
||||
DocIdAndVersion result = lookup.lookup(new BytesRef("6"), null, segment);
|
||||
DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), null, segment);
|
||||
assertNotNull(result);
|
||||
assertEquals(87, result.version);
|
||||
assertEquals(1, result.docId);
|
||||
// delete the first doc only
|
||||
FixedBitSet live = new FixedBitSet(2);
|
||||
live.set(1);
|
||||
result = lookup.lookup(new BytesRef("6"), live, segment);
|
||||
result = lookup.lookupVersion(new BytesRef("6"), live, segment);
|
||||
assertNotNull(result);
|
||||
assertEquals(87, result.version);
|
||||
assertEquals(1, result.docId);
|
||||
// delete the second doc only
|
||||
live.clear(1);
|
||||
live.set(0);
|
||||
result = lookup.lookup(new BytesRef("6"), live, segment);
|
||||
result = lookup.lookupVersion(new BytesRef("6"), live, segment);
|
||||
assertNotNull(result);
|
||||
assertEquals(87, result.version);
|
||||
assertEquals(0, result.docId);
|
||||
// delete both docs
|
||||
assertNull(lookup.lookup(new BytesRef("6"), new Bits.MatchNoBits(2), segment));
|
||||
assertNull(lookup.lookupVersion(new BytesRef("6"), new Bits.MatchNoBits(2), segment));
|
||||
reader.close();
|
||||
writer.close();
|
||||
dir.close();
|
||||
|
|
|
@ -38,6 +38,8 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.common.lucene.uid.VersionsResolver.loadDocIdAndVersion;
|
||||
import static org.elasticsearch.common.lucene.uid.VersionsResolver.loadVersion;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
|
@ -61,15 +63,15 @@ public class VersionsTests extends ESTestCase {
|
|||
Directory dir = newDirectory();
|
||||
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
|
||||
DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "_na_", 1));
|
||||
MatcherAssert.assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND));
|
||||
MatcherAssert.assertThat(loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND));
|
||||
|
||||
Document doc = new Document();
|
||||
doc.add(new Field(UidFieldMapper.NAME, "1", UidFieldMapper.Defaults.FIELD_TYPE));
|
||||
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 1));
|
||||
writer.updateDocument(new Term(UidFieldMapper.NAME, "1"), doc);
|
||||
directoryReader = reopen(directoryReader);
|
||||
assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(1L));
|
||||
assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(1L));
|
||||
assertThat(loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(1L));
|
||||
assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(1L));
|
||||
|
||||
doc = new Document();
|
||||
Field uid = new Field(UidFieldMapper.NAME, "1", UidFieldMapper.Defaults.FIELD_TYPE);
|
||||
|
@ -78,8 +80,8 @@ public class VersionsTests extends ESTestCase {
|
|||
doc.add(version);
|
||||
writer.updateDocument(new Term(UidFieldMapper.NAME, "1"), doc);
|
||||
directoryReader = reopen(directoryReader);
|
||||
assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(2L));
|
||||
assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(2L));
|
||||
assertThat(loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(2L));
|
||||
assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(2L));
|
||||
|
||||
// test reuse of uid field
|
||||
doc = new Document();
|
||||
|
@ -89,13 +91,13 @@ public class VersionsTests extends ESTestCase {
|
|||
writer.updateDocument(new Term(UidFieldMapper.NAME, "1"), doc);
|
||||
|
||||
directoryReader = reopen(directoryReader);
|
||||
assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(3L));
|
||||
assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(3L));
|
||||
assertThat(loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(3L));
|
||||
assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(3L));
|
||||
|
||||
writer.deleteDocuments(new Term(UidFieldMapper.NAME, "1"));
|
||||
directoryReader = reopen(directoryReader);
|
||||
assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND));
|
||||
assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), nullValue());
|
||||
assertThat(loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND));
|
||||
assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), nullValue());
|
||||
directoryReader.close();
|
||||
writer.close();
|
||||
dir.close();
|
||||
|
@ -121,21 +123,21 @@ public class VersionsTests extends ESTestCase {
|
|||
|
||||
writer.updateDocuments(new Term(UidFieldMapper.NAME, "1"), docs);
|
||||
DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "_na_", 1));
|
||||
assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(5L));
|
||||
assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(5L));
|
||||
assertThat(loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(5L));
|
||||
assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(5L));
|
||||
|
||||
version.setLongValue(6L);
|
||||
writer.updateDocuments(new Term(UidFieldMapper.NAME, "1"), docs);
|
||||
version.setLongValue(7L);
|
||||
writer.updateDocuments(new Term(UidFieldMapper.NAME, "1"), docs);
|
||||
directoryReader = reopen(directoryReader);
|
||||
assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(7L));
|
||||
assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(7L));
|
||||
assertThat(loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(7L));
|
||||
assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(7L));
|
||||
|
||||
writer.deleteDocuments(new Term(UidFieldMapper.NAME, "1"));
|
||||
directoryReader = reopen(directoryReader);
|
||||
assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND));
|
||||
assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), nullValue());
|
||||
assertThat(loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND));
|
||||
assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), nullValue());
|
||||
directoryReader.close();
|
||||
writer.close();
|
||||
dir.close();
|
||||
|
@ -143,7 +145,7 @@ public class VersionsTests extends ESTestCase {
|
|||
|
||||
/** Test that version map cache works, is evicted on close, etc */
|
||||
public void testCache() throws Exception {
|
||||
int size = Versions.lookupStates.size();
|
||||
int size = VersionsResolver.lookupStates.size();
|
||||
|
||||
Directory dir = newDirectory();
|
||||
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
|
||||
|
@ -153,22 +155,22 @@ public class VersionsTests extends ESTestCase {
|
|||
writer.addDocument(doc);
|
||||
DirectoryReader reader = DirectoryReader.open(writer);
|
||||
// should increase cache size by 1
|
||||
assertEquals(87, Versions.loadVersion(reader, new Term(UidFieldMapper.NAME, "6")));
|
||||
assertEquals(size+1, Versions.lookupStates.size());
|
||||
assertEquals(87, loadVersion(reader, new Term(UidFieldMapper.NAME, "6")));
|
||||
assertEquals(size+1, VersionsResolver.lookupStates.size());
|
||||
// should be cache hit
|
||||
assertEquals(87, Versions.loadVersion(reader, new Term(UidFieldMapper.NAME, "6")));
|
||||
assertEquals(size+1, Versions.lookupStates.size());
|
||||
assertEquals(87, loadVersion(reader, new Term(UidFieldMapper.NAME, "6")));
|
||||
assertEquals(size+1, VersionsResolver.lookupStates.size());
|
||||
|
||||
reader.close();
|
||||
writer.close();
|
||||
// core should be evicted from the map
|
||||
assertEquals(size, Versions.lookupStates.size());
|
||||
assertEquals(size, VersionsResolver.lookupStates.size());
|
||||
dir.close();
|
||||
}
|
||||
|
||||
/** Test that version map cache behaves properly with a filtered reader */
|
||||
public void testCacheFilterReader() throws Exception {
|
||||
int size = Versions.lookupStates.size();
|
||||
int size = VersionsResolver.lookupStates.size();
|
||||
|
||||
Directory dir = newDirectory();
|
||||
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
|
||||
|
@ -177,18 +179,18 @@ public class VersionsTests extends ESTestCase {
|
|||
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
|
||||
writer.addDocument(doc);
|
||||
DirectoryReader reader = DirectoryReader.open(writer);
|
||||
assertEquals(87, Versions.loadVersion(reader, new Term(UidFieldMapper.NAME, "6")));
|
||||
assertEquals(size+1, Versions.lookupStates.size());
|
||||
assertEquals(87, loadVersion(reader, new Term(UidFieldMapper.NAME, "6")));
|
||||
assertEquals(size+1, VersionsResolver.lookupStates.size());
|
||||
// now wrap the reader
|
||||
DirectoryReader wrapped = ElasticsearchDirectoryReader.wrap(reader, new ShardId("bogus", "_na_", 5));
|
||||
assertEquals(87, Versions.loadVersion(wrapped, new Term(UidFieldMapper.NAME, "6")));
|
||||
assertEquals(87, loadVersion(wrapped, new Term(UidFieldMapper.NAME, "6")));
|
||||
// same size map: core cache key is shared
|
||||
assertEquals(size+1, Versions.lookupStates.size());
|
||||
assertEquals(size+1, VersionsResolver.lookupStates.size());
|
||||
|
||||
reader.close();
|
||||
writer.close();
|
||||
// core should be evicted from the map
|
||||
assertEquals(size, Versions.lookupStates.size());
|
||||
assertEquals(size, VersionsResolver.lookupStates.size());
|
||||
dir.close();
|
||||
}
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -20,8 +20,6 @@
|
|||
package org.elasticsearch.index.engine;
|
||||
|
||||
import org.apache.lucene.util.RamUsageTester;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.index.translog.TranslogTests;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
public class VersionValueTests extends ESTestCase {
|
||||
|
|
|
@ -98,13 +98,9 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
|
|||
future.get();
|
||||
thread.join();
|
||||
shards.assertAllEqual(numDocs);
|
||||
Engine engine = IndexShardTests.getEngineFromShard(replica);
|
||||
assertEquals("expected at no version lookups ", InternalEngineTests.getNumVersionLookups((InternalEngine) engine), 0);
|
||||
for (IndexShard shard : shards) {
|
||||
engine = IndexShardTests.getEngineFromShard(shard);
|
||||
assertEquals(0, InternalEngineTests.getNumIndexVersionsLookups((InternalEngine) engine));
|
||||
assertEquals(0, InternalEngineTests.getNumVersionLookups((InternalEngine) engine));
|
||||
}
|
||||
Engine engine = IndexShardTests.getEngineFromShard(shards.getPrimary());
|
||||
assertEquals(0, InternalEngineTests.getNumIndexVersionsLookups((InternalEngine) engine));
|
||||
assertEquals(0, InternalEngineTests.getNumVersionLookups((InternalEngine) engine));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -49,7 +49,6 @@ import org.elasticsearch.common.xcontent.XContentType;
|
|||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.SegmentsStats;
|
||||
import org.elasticsearch.index.mapper.Mapping;
|
||||
import org.elasticsearch.index.mapper.ParseContext;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||
|
|
Loading…
Reference in New Issue