Use sequence numbers to identify out of order delivery in replicas & recovery (#24060)

Internal indexing requests in Elasticsearch may be processed out of order and repeatedly. This is important during recovery and due to concurrency in replicating requests between primary and replicas. As such, a replica/recovering shard needs to be able to identify that an incoming request contains information that is old and thus need not be processed. The current logic is based on external version. This is sadly not sufficient. This PR moves the logic to rely on sequences numbers and primary terms which give the semantics we need.

Relates to #10708
This commit is contained in:
Boaz Leskes 2017-04-14 21:46:17 +02:00 committed by GitHub
parent 162ce85ff2
commit ecf81688fb
26 changed files with 414 additions and 378 deletions

View File

@ -29,9 +29,12 @@ import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import java.io.IOException;
@ -43,7 +46,7 @@ import java.io.IOException;
* in more than one document! It will only return the first one it
* finds. */
final class PerThreadIDAndVersionLookup {
final class PerThreadIDVersionAndSeqNoLookup {
// TODO: do we really need to store all this stuff? some if it might not speed up anything.
// we keep it around for now, to reduce the amount of e.g. hash lookups by field and stuff
@ -51,7 +54,10 @@ final class PerThreadIDAndVersionLookup {
private final TermsEnum termsEnum;
/** _version data */
private final NumericDocValues versions;
/** _seq_no data */
private final NumericDocValues seqNos;
/** _primary_term data */
private final NumericDocValues primaryTerms;
/** Reused for iteration (when the term exists) */
private PostingsEnum docsEnum;
@ -61,7 +67,7 @@ final class PerThreadIDAndVersionLookup {
/**
* Initialize lookup for the provided segment
*/
PerThreadIDAndVersionLookup(LeafReader reader) throws IOException {
PerThreadIDVersionAndSeqNoLookup(LeafReader reader) throws IOException {
Fields fields = reader.fields();
Terms terms = fields.terms(UidFieldMapper.NAME);
termsEnum = terms.iterator();
@ -74,6 +80,8 @@ final class PerThreadIDAndVersionLookup {
throw new IllegalArgumentException("reader misses the [" + VersionFieldMapper.NAME +
"] field");
}
seqNos = reader.getNumericDocValues(SeqNoFieldMapper.NAME);
primaryTerms = reader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
Object readerKey = null;
assert (readerKey = reader.getCoreCacheKey()) != null;
this.readerKey = readerKey;
@ -113,4 +121,25 @@ final class PerThreadIDAndVersionLookup {
return DocIdSetIterator.NO_MORE_DOCS;
}
}
/** Return null if id is not found. */
DocIdAndSeqNo lookupSeqNo(BytesRef id, Bits liveDocs, LeafReaderContext context) throws IOException {
assert context.reader().getCoreCacheKey().equals(readerKey) :
"context's reader is not the same as the reader class was initialized on.";
int docID = getDocID(id, liveDocs);
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
return new DocIdAndSeqNo(docID, seqNos == null ? SequenceNumbersService.UNASSIGNED_SEQ_NO : seqNos.get(docID), context);
} else {
return null;
}
}
/**
* returns 0 if the primary term is not found.
*
* Note that 0 is an illegal primary term. See {@link org.elasticsearch.cluster.metadata.IndexMetaData#primaryTerm(int)}
**/
long lookUpPrimaryTerm(int docID) throws IOException {
return primaryTerms == null ? 0 : primaryTerms.get(docID);
}
}

View File

@ -0,0 +1,180 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.lucene.uid;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReader.CoreClosedListener;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.Term;
import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.mapper.UidFieldMapper;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import static org.elasticsearch.common.lucene.uid.Versions.NOT_FOUND;
/** Utility class to resolve the Lucene doc ID, version, seqNo and primaryTerms for a given uid. */
public final class VersionsAndSeqNoResolver {
static final ConcurrentMap<Object, CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup>> lookupStates =
ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
// Evict this reader from lookupStates once it's closed:
private static final CoreClosedListener removeLookupState = key -> {
CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup> ctl = lookupStates.remove(key);
if (ctl != null) {
ctl.close();
}
};
private static PerThreadIDVersionAndSeqNoLookup getLookupState(LeafReader reader) throws IOException {
Object key = reader.getCoreCacheKey();
CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup> ctl = lookupStates.get(key);
if (ctl == null) {
// First time we are seeing this reader's core; make a new CTL:
ctl = new CloseableThreadLocal<>();
CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup> other = lookupStates.putIfAbsent(key, ctl);
if (other == null) {
// Our CTL won, we must remove it when the core is closed:
reader.addCoreClosedListener(removeLookupState);
} else {
// Another thread beat us to it: just use their CTL:
ctl = other;
}
}
PerThreadIDVersionAndSeqNoLookup lookupState = ctl.get();
if (lookupState == null) {
lookupState = new PerThreadIDVersionAndSeqNoLookup(reader);
ctl.set(lookupState);
}
return lookupState;
}
private VersionsAndSeqNoResolver() {
}
/** Wraps an {@link LeafReaderContext}, a doc ID <b>relative to the context doc base</b> and a version. */
public static class DocIdAndVersion {
public final int docId;
public final long version;
public final LeafReaderContext context;
DocIdAndVersion(int docId, long version, LeafReaderContext context) {
this.docId = docId;
this.version = version;
this.context = context;
}
}
/** Wraps an {@link LeafReaderContext}, a doc ID <b>relative to the context doc base</b> and a seqNo. */
public static class DocIdAndSeqNo {
public final int docId;
public final long seqNo;
public final LeafReaderContext context;
DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context) {
this.docId = docId;
this.seqNo = seqNo;
this.context = context;
}
}
/**
* Load the internal doc ID and version for the uid from the reader, returning<ul>
* <li>null if the uid wasn't found,
* <li>a doc ID and a version otherwise
* </ul>
*/
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException {
assert term.field().equals(UidFieldMapper.NAME) : "unexpected term field " + term.field();
List<LeafReaderContext> leaves = reader.leaves();
if (leaves.isEmpty()) {
return null;
}
// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
LeafReaderContext context = leaves.get(i);
LeafReader leaf = context.reader();
PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf);
DocIdAndVersion result = lookup.lookupVersion(term.bytes(), leaf.getLiveDocs(), context);
if (result != null) {
return result;
}
}
return null;
}
/**
* Load the internal doc ID and sequence number for the uid from the reader, returning<ul>
* <li>null if the uid wasn't found,
* <li>a doc ID and the associated seqNo otherwise
* </ul>
*/
public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) throws IOException {
assert term.field().equals(UidFieldMapper.NAME) : "unexpected term field " + term.field();
List<LeafReaderContext> leaves = reader.leaves();
if (leaves.isEmpty()) {
return null;
}
// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
LeafReaderContext context = leaves.get(i);
LeafReader leaf = context.reader();
PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf);
DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf.getLiveDocs(), context);
if (result != null) {
return result;
}
}
return null;
}
/**
* Load the primaryTerm associated with the given {@link DocIdAndSeqNo}
*/
public static long loadPrimaryTerm(DocIdAndSeqNo docIdAndSeqNo) throws IOException {
LeafReader leaf = docIdAndSeqNo.context.reader();
PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf);
long result = lookup.lookUpPrimaryTerm(docIdAndSeqNo.docId);
assert result > 0 : "should always resolve a primary term for a resolved sequence number. primary_term [" + result + "]"
+ " docId [" + docIdAndSeqNo.docId + "] seqNo [" + docIdAndSeqNo.seqNo + "]";
return result;
}
/**
* Load the version for the uid from the reader, returning<ul>
* <li>{@link Versions#NOT_FOUND} if no matching doc exists,
* <li>the version associated with the provided uid otherwise
* </ul>
*/
public static long loadVersion(IndexReader reader, Term term) throws IOException {
final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term);
return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version;
}
}

View File

@ -1,263 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.lucene.uid;
import org.apache.lucene.index.Fields;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReader.CoreClosedListener;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import static org.elasticsearch.common.lucene.uid.Versions.NOT_FOUND;
/** Utility class to resolve the Lucene doc ID and version for a given uid. */
public class VersionsResolver {
static final ConcurrentMap<Object, CloseableThreadLocal<PerThreadIDAndVersionLookup>>
lookupStates = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
// Evict this reader from lookupStates once it's closed:
private static final CoreClosedListener removeLookupState = key -> {
CloseableThreadLocal<PerThreadIDAndVersionLookup> ctl = lookupStates.remove(key);
if (ctl != null) {
ctl.close();
}
};
private static PerThreadIDAndVersionLookup getLookupState(LeafReader reader)
throws IOException {
Object key = reader.getCoreCacheKey();
CloseableThreadLocal<PerThreadIDAndVersionLookup> ctl = lookupStates.get(key);
if (ctl == null) {
// First time we are seeing this reader's core; make a
// new CTL:
ctl = new CloseableThreadLocal<>();
CloseableThreadLocal<PerThreadIDAndVersionLookup> other =
lookupStates.putIfAbsent(key, ctl);
if (other == null) {
// Our CTL won, we must remove it when the
// core is closed:
reader.addCoreClosedListener(removeLookupState);
} else {
// Another thread beat us to it: just use
// their CTL:
ctl = other;
}
}
PerThreadIDAndVersionLookup lookupState = ctl.get();
if (lookupState == null) {
lookupState = new PerThreadIDAndVersionLookup(reader);
ctl.set(lookupState);
}
return lookupState;
}
private VersionsResolver() {
}
/**
* Wraps an {@link LeafReaderContext}, a doc ID <b>relative to the context doc base</b> and
* a version.
**/
public static class DocIdAndVersion {
public final int docId;
public final long version;
public final LeafReaderContext context;
public DocIdAndVersion(int docId, long version, LeafReaderContext context) {
this.docId = docId;
this.version = version;
this.context = context;
}
}
/**
* Load the internal doc ID and version for the uid from the reader, returning<ul>
* <li>null if the uid wasn't found,
* <li>a doc ID and a version otherwise
* </ul>
*/
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term)
throws IOException {
assert term.field().equals(UidFieldMapper.NAME);
List<LeafReaderContext> leaves = reader.leaves();
if (leaves.isEmpty()) {
return null;
}
// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
LeafReaderContext context = leaves.get(i);
LeafReader leaf = context.reader();
PerThreadIDAndVersionLookup lookup = getLookupState(leaf);
DocIdAndVersion result =
lookup.lookupVersion(term.bytes(), leaf.getLiveDocs(), context);
if (result != null) {
return result;
}
}
return null;
}
/**
* Load the version for the uid from the reader, returning<ul>
* <li>{@link Versions#NOT_FOUND} if no matching doc exists,
* <li>the version associated with the provided uid otherwise
* </ul>
*/
public static long loadVersion(IndexReader reader, Term term) throws IOException {
final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term);
return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version;
}
/**
* Returns the sequence number for the given uid term, returning
* {@code SequenceNumbersService.UNASSIGNED_SEQ_NO} if none is found.
*/
public static long loadSeqNo(IndexReader reader, Term term) throws IOException {
assert term.field().equals(UidFieldMapper.NAME) : "can only load _seq_no by uid";
List<LeafReaderContext> leaves = reader.leaves();
if (leaves.isEmpty()) {
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
LeafReader leaf = leaves.get(i).reader();
Bits liveDocs = leaf.getLiveDocs();
TermsEnum termsEnum = null;
SortedNumericDocValues dvField = null;
PostingsEnum docsEnum = null;
final Fields fields = leaf.fields();
if (fields != null) {
Terms terms = fields.terms(UidFieldMapper.NAME);
if (terms != null) {
termsEnum = terms.iterator();
assert termsEnum != null;
dvField = leaf.getSortedNumericDocValues(SeqNoFieldMapper.NAME);
assert dvField != null;
final BytesRef id = term.bytes();
if (termsEnum.seekExact(id)) {
// there may be more than one matching docID, in the
// case of nested docs, so we want the last one:
docsEnum = termsEnum.postings(docsEnum, 0);
int docID = DocIdSetIterator.NO_MORE_DOCS;
for (int d = docsEnum.nextDoc();
d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) {
if (liveDocs != null && liveDocs.get(d) == false) {
continue;
}
docID = d;
}
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
dvField.setDocument(docID);
assert dvField.count() == 1 :
"expected only a single value for _seq_no but got " +
dvField.count();
return dvField.valueAt(0);
}
}
}
}
}
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
/**
* Returns the primary term for the given uid term, returning {@code 0} if none is found.
*/
public static long loadPrimaryTerm(IndexReader reader, Term term) throws IOException {
assert term.field().equals(UidFieldMapper.NAME) : "can only load _primary_term by uid";
List<LeafReaderContext> leaves = reader.leaves();
if (leaves.isEmpty()) {
return 0;
}
// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
LeafReader leaf = leaves.get(i).reader();
Bits liveDocs = leaf.getLiveDocs();
TermsEnum termsEnum = null;
NumericDocValues dvField = null;
PostingsEnum docsEnum = null;
final Fields fields = leaf.fields();
if (fields != null) {
Terms terms = fields.terms(UidFieldMapper.NAME);
if (terms != null) {
termsEnum = terms.iterator();
assert termsEnum != null;
dvField = leaf.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
assert dvField != null;
final BytesRef id = term.bytes();
if (termsEnum.seekExact(id)) {
// there may be more than one matching docID, in the
// case of nested docs, so we want the last one:
docsEnum = termsEnum.postings(docsEnum, 0);
int docID = DocIdSetIterator.NO_MORE_DOCS;
for (int d = docsEnum.nextDoc();
d != DocIdSetIterator.NO_MORE_DOCS;
d = docsEnum.nextDoc()) {
if (liveDocs != null && liveDocs.get(d) == false) {
continue;
}
docID = d;
}
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
return dvField.get(docID);
}
}
}
}
}
return 0;
}
}

View File

@ -27,18 +27,13 @@ class DeleteVersionValue extends VersionValue {
private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(DeleteVersionValue.class);
private final long time;
final long time;
DeleteVersionValue(long version, long time) {
super(version);
DeleteVersionValue(long version,long seqNo, long term, long time) {
super(version, seqNo, term);
this.time = time;
}
@Override
public long getTime() {
return this.time;
}
@Override
public boolean isDelete() {
return true;
@ -52,7 +47,9 @@ class DeleteVersionValue extends VersionValue {
@Override
public String toString() {
return "DeleteVersionValue{" +
"version=" + getVersion() +
"version=" + version +
", seqNo=" + seqNo +
", term=" + term +
",time=" + time +
'}';
}

View File

@ -55,8 +55,8 @@ import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.lucene.uid.VersionsResolver;
import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
@ -464,7 +464,7 @@ public abstract class Engine implements Closeable {
final Searcher searcher = searcherFactory.apply("get");
final DocIdAndVersion docIdAndVersion;
try {
docIdAndVersion = VersionsResolver.loadDocIdAndVersion(searcher.reader(), get.uid());
docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.reader(), get.uid());
} catch (Exception e) {
Releasables.closeWhileHandlingException(searcher);
//TODO: A better exception goes here

View File

@ -51,7 +51,8 @@ import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.lucene.uid.VersionsResolver;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@ -389,10 +390,10 @@ public class InternalEngine extends Engine {
if (versionValue.isDelete()) {
return GetResult.NOT_EXISTS;
}
if (get.versionType().isVersionConflictForReads(versionValue.getVersion(), get.version())) {
if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) {
Uid uid = Uid.createUid(get.uid().text());
throw new VersionConflictEngineException(shardId, uid.type(), uid.id(),
get.versionType().explainConflictForReads(versionValue.getVersion(), get.version()));
get.versionType().explainConflictForReads(versionValue.version, get.version()));
}
refresh("realtime_get");
}
@ -416,6 +417,43 @@ public class InternalEngine extends Engine {
LUCENE_DOC_NOT_FOUND
}
private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException {
assert op.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found";
final OpVsLuceneDocStatus status;
final VersionValue versionValue = versionMap.getUnderLock(op.uid());
assert incrementVersionLookup();
if (versionValue != null) {
if (op.seqNo() > versionValue.seqNo ||
(op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term))
status = OpVsLuceneDocStatus.OP_NEWER;
else {
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
}
} else {
// load from index
assert incrementIndexVersionLookup();
try (Searcher searcher = acquireSearcher("load_seq_no")) {
DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), op.uid());
if (docAndSeqNo == null) {
status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
} else if (op.seqNo() > docAndSeqNo.seqNo) {
status = OpVsLuceneDocStatus.OP_NEWER;
} else if (op.seqNo() == docAndSeqNo.seqNo) {
// load term to tie break
final long existingTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docAndSeqNo);
if (op.primaryTerm() > existingTerm) {
status = OpVsLuceneDocStatus.OP_NEWER;
} else {
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
}
} else {
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
}
}
}
return status;
}
/** resolves the current version of the document, returning null if not found */
private VersionValue resolveDocVersion(final Operation op) throws IOException {
assert incrementVersionLookup(); // used for asserting in tests
@ -424,11 +462,10 @@ public class InternalEngine extends Engine {
assert incrementIndexVersionLookup(); // used for asserting in tests
final long currentVersion = loadCurrentVersionFromIndex(op.uid());
if (currentVersion != Versions.NOT_FOUND) {
versionValue = new VersionValue(currentVersion);
versionValue = new VersionValue(currentVersion, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0L);
}
} else if (engineConfig.isEnableGcDeletes() && versionValue.isDelete() &&
(engineConfig.getThreadPool().relativeTimeInMillis() - versionValue.getTime()) >
getGcDeletesInMillis()) {
(engineConfig.getThreadPool().relativeTimeInMillis() - ((DeleteVersionValue)versionValue).time) > getGcDeletesInMillis()) {
versionValue = null;
}
return versionValue;
@ -436,12 +473,13 @@ public class InternalEngine extends Engine {
private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnVersions(final Operation op)
throws IOException {
assert op.seqNo() == SequenceNumbersService.UNASSIGNED_SEQ_NO : "op is resolved based on versions but have a seq#";
assert op.version() >= 0 : "versions should be non-negative. got " + op.version();
final VersionValue versionValue = resolveDocVersion(op);
if (versionValue == null) {
return OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
} else {
return op.versionType().isVersionConflictForWrites(versionValue.getVersion(), op.version(), versionValue.isDelete()) ?
return op.versionType().isVersionConflictForWrites(versionValue.version, op.version(), versionValue.isDelete()) ?
OpVsLuceneDocStatus.OP_STALE_OR_EQUAL : OpVsLuceneDocStatus.OP_NEWER;
}
}
@ -601,7 +639,16 @@ public class InternalEngine extends Engine {
// unlike the primary, replicas don't really care to about creation status of documents
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return false for the created flag in favor of code simplicity
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnVersions(index);
final OpVsLuceneDocStatus opVsLucene;
if (index.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
} else {
// This can happen if the primary is still on an old node and send traffic without seq# or we recover from translog
// created by an old version.
assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1_UNRELEASED) :
"index is newly created but op has no sequence numbers. op: " + index;
opVsLucene = compareOpToLuceneDocBasedOnVersions(index);
}
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version());
} else {
@ -633,7 +680,7 @@ public class InternalEngine extends Engine {
currentVersion = Versions.NOT_FOUND;
currentNotFoundOrDeleted = true;
} else {
currentVersion = versionValue.getVersion();
currentVersion = versionValue.version;
currentNotFoundOrDeleted = versionValue.isDelete();
}
if (index.versionType().isVersionConflictForWrites(
@ -671,9 +718,9 @@ public class InternalEngine extends Engine {
assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false);
index(index.docs(), indexWriter);
}
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(plan.versionForIndexing));
return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing,
plan.currentNotFoundOrDeleted);
versionMap.putUnderLock(index.uid().bytes(),
new VersionValue(plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm()));
return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
} catch (Exception ex) {
if (indexWriter.getTragicException() == null) {
/* There is no tragic event recorded so this must be a document failure.
@ -873,7 +920,14 @@ public class InternalEngine extends Engine {
// unlike the primary, replicas don't really care to about found status of documents
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return true for the found flag in favor of code simplicity
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnVersions(delete);
final OpVsLuceneDocStatus opVsLucene;
if (delete.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete);
} else {
assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1_UNRELEASED) :
"index is newly created but op has no sequence numbers. op: " + delete;
opVsLucene = compareOpToLuceneDocBasedOnVersions(delete);
}
final DeletionStrategy plan;
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
@ -898,7 +952,7 @@ public class InternalEngine extends Engine {
currentVersion = Versions.NOT_FOUND;
currentlyDeleted = true;
} else {
currentVersion = versionValue.getVersion();
currentVersion = versionValue.version;
currentlyDeleted = versionValue.isDelete();
}
final DeletionStrategy plan;
@ -923,7 +977,7 @@ public class InternalEngine extends Engine {
indexWriter.deleteDocuments(delete.uid());
}
versionMap.putUnderLock(delete.uid().bytes(),
new DeleteVersionValue(plan.versionOfDeletion,
new DeleteVersionValue(plan.versionOfDeletion, plan.seqNoOfDeletion, delete.primaryTerm(),
engineConfig.getThreadPool().relativeTimeInMillis()));
return new DeleteResult(
plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false);
@ -1235,14 +1289,14 @@ public class InternalEngine extends Engine {
// TODO: not good that we reach into LiveVersionMap here; can we move this inside VersionMap instead? problem is the dirtyLock...
// we only need to prune the deletes map; the current/old version maps are cleared on refresh:
for (Map.Entry<BytesRef, VersionValue> entry : versionMap.getAllTombstones()) {
for (Map.Entry<BytesRef, DeleteVersionValue> entry : versionMap.getAllTombstones()) {
BytesRef uid = entry.getKey();
try (Releasable ignored = acquireLock(uid)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set?
// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
VersionValue versionValue = versionMap.getTombstoneUnderLock(uid);
DeleteVersionValue versionValue = versionMap.getTombstoneUnderLock(uid);
if (versionValue != null) {
if (timeMSec - versionValue.getTime() > getGcDeletesInMillis()) {
if (timeMSec - versionValue.time > getGcDeletesInMillis()) {
versionMap.removeTombstoneUnderLock(uid);
}
}
@ -1490,7 +1544,7 @@ public class InternalEngine extends Engine {
private long loadCurrentVersionFromIndex(Term uid) throws IOException {
assert incrementIndexVersionLookup();
try (Searcher searcher = acquireSearcher("load_version")) {
return VersionsResolver.loadVersion(searcher.reader(), uid);
return VersionsAndSeqNoResolver.loadVersion(searcher.reader(), uid);
}
}

View File

@ -55,7 +55,7 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
}
// All deletes also go here, and delete "tombstones" are retained after refresh:
private final Map<BytesRef,VersionValue> tombstones = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
private final Map<BytesRef,DeleteVersionValue> tombstones = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
private volatile Maps maps = new Maps();
@ -180,7 +180,7 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
final VersionValue prevTombstone;
if (version.isDelete()) {
// Also enroll the delete into tombstones, and account for its RAM too:
prevTombstone = tombstones.put(uid, version);
prevTombstone = tombstones.put(uid, (DeleteVersionValue)version);
// We initially account for BytesRef/VersionValue RAM for a delete against the tombstones, because this RAM will not be freed up
// on refresh. Later, in removeTombstoneUnderLock, if we clear the tombstone entry but the delete remains in current, we shift
@ -225,12 +225,12 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
}
/** Caller has a lock, so that this uid will not be concurrently added/deleted by another thread. */
VersionValue getTombstoneUnderLock(BytesRef uid) {
DeleteVersionValue getTombstoneUnderLock(BytesRef uid) {
return tombstones.get(uid);
}
/** Iterates over all deleted versions, including new ones (not yet exposed via reader) and old ones (exposed via reader but not yet GC'd). */
Iterable<Map.Entry<BytesRef,VersionValue>> getAllTombstones() {
Iterable<Map.Entry<BytesRef, DeleteVersionValue>> getAllTombstones() {
return tombstones.entrySet();
}

View File

@ -30,18 +30,17 @@ class VersionValue implements Accountable {
private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(VersionValue.class);
/** the version of the document. used for versioned indexed operations and as a BWC layer, where no seq# are set yet */
private final long version;
final long version;
VersionValue(long version) {
/** the seq number of the operation that last changed the associated uuid */
final long seqNo;
/** the the term of the operation that last changed the associated uuid */
final long term;
VersionValue(long version, long seqNo, long term) {
this.version = version;
}
public long getTime() {
throw new UnsupportedOperationException();
}
public long getVersion() {
return version;
this.seqNo = seqNo;
this.term = term;
}
public boolean isDelete() {
@ -61,6 +60,9 @@ class VersionValue implements Accountable {
@Override
public String toString() {
return "VersionValue{" +
"version=" + version + "}";
"version=" + version +
", seqNo=" + seqNo +
", term=" + term +
'}';
}
}

View File

@ -24,7 +24,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.util.set.Sets;

View File

@ -254,12 +254,12 @@ public abstract class ParseContext {
}
@Override
public SeqNoFieldMapper.SequenceID seqID() {
public SeqNoFieldMapper.SequenceIDFields seqID() {
return in.seqID();
}
@Override
public void seqID(SeqNoFieldMapper.SequenceID seqID) {
public void seqID(SeqNoFieldMapper.SequenceIDFields seqID) {
in.seqID(seqID);
}
@ -310,7 +310,7 @@ public abstract class ParseContext {
private Field version;
private SeqNoFieldMapper.SequenceID seqID;
private SeqNoFieldMapper.SequenceIDFields seqID;
private final AllEntries allEntries;
@ -404,12 +404,12 @@ public abstract class ParseContext {
}
@Override
public SeqNoFieldMapper.SequenceID seqID() {
public SeqNoFieldMapper.SequenceIDFields seqID() {
return this.seqID;
}
@Override
public void seqID(SeqNoFieldMapper.SequenceID seqID) {
public void seqID(SeqNoFieldMapper.SequenceIDFields seqID) {
this.seqID = seqID;
}
@ -539,9 +539,9 @@ public abstract class ParseContext {
public abstract void version(Field version);
public abstract SeqNoFieldMapper.SequenceID seqID();
public abstract SeqNoFieldMapper.SequenceIDFields seqID();
public abstract void seqID(SeqNoFieldMapper.SequenceID seqID);
public abstract void seqID(SeqNoFieldMapper.SequenceIDFields seqID);
public final boolean includeInAll(Boolean includeInAll, FieldMapper mapper) {
return includeInAll(includeInAll, mapper.fieldType().indexOptions() != IndexOptions.NONE);

View File

@ -36,7 +36,7 @@ public class ParsedDocument {
private final String id, type;
private final BytesRef uid;
private final SeqNoFieldMapper.SequenceID seqID;
private final SeqNoFieldMapper.SequenceIDFields seqID;
private final String routing;
@ -50,7 +50,7 @@ public class ParsedDocument {
private String parent;
public ParsedDocument(Field version,
SeqNoFieldMapper.SequenceID seqID,
SeqNoFieldMapper.SequenceIDFields seqID,
String id,
String type,
String routing,

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index.mapper;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexableField;
@ -66,13 +65,13 @@ public class SeqNoFieldMapper extends MetadataFieldMapper {
* A sequence ID, which is made up of a sequence number (both the searchable
* and doc_value version of the field) and the primary term.
*/
public static class SequenceID {
public static class SequenceIDFields {
public final Field seqNo;
public final Field seqNoDocValue;
public final Field primaryTerm;
public SequenceID(Field seqNo, Field seqNoDocValue, Field primaryTerm) {
public SequenceIDFields(Field seqNo, Field seqNoDocValue, Field primaryTerm) {
Objects.requireNonNull(seqNo, "sequence number field cannot be null");
Objects.requireNonNull(seqNoDocValue, "sequence number dv field cannot be null");
Objects.requireNonNull(primaryTerm, "primary term field cannot be null");
@ -81,9 +80,9 @@ public class SeqNoFieldMapper extends MetadataFieldMapper {
this.primaryTerm = primaryTerm;
}
public static SequenceID emptySeqID() {
return new SequenceID(new LongPoint(NAME, SequenceNumbersService.UNASSIGNED_SEQ_NO),
new SortedNumericDocValuesField(NAME, SequenceNumbersService.UNASSIGNED_SEQ_NO),
public static SequenceIDFields emptySeqID() {
return new SequenceIDFields(new LongPoint(NAME, SequenceNumbersService.UNASSIGNED_SEQ_NO),
new NumericDocValuesField(NAME, SequenceNumbersService.UNASSIGNED_SEQ_NO),
new NumericDocValuesField(PRIMARY_TERM_NAME, 0));
}
}
@ -242,7 +241,7 @@ public class SeqNoFieldMapper extends MetadataFieldMapper {
protected void parseCreateField(ParseContext context, List<IndexableField> fields) throws IOException {
// see InternalEngine.innerIndex to see where the real version value is set
// also see ParsedDocument.updateSeqID (called by innerIndex)
SequenceID seqID = SequenceID.emptySeqID();
SequenceIDFields seqID = SequenceIDFields.emptySeqID();
context.seqID(seqID);
fields.add(seqID.seqNo);
fields.add(seqID.seqNoDocValue);
@ -264,7 +263,7 @@ public class SeqNoFieldMapper extends MetadataFieldMapper {
for (int i = 1; i < context.docs().size(); i++) {
final Document doc = context.docs().get(i);
doc.add(new LongPoint(NAME, 1));
doc.add(new SortedNumericDocValuesField(NAME, 1L));
doc.add(new NumericDocValuesField(NAME, 1L));
doc.add(new NumericDocValuesField(PRIMARY_TERM_NAME, 0L));
}
}

View File

@ -34,7 +34,7 @@ import org.elasticsearch.action.termvectors.TermVectorsResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;

View File

@ -31,7 +31,7 @@ import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.test.ESTestCase;
@ -53,7 +53,7 @@ public class VersionLookupTests extends ESTestCase {
writer.addDocument(doc);
DirectoryReader reader = DirectoryReader.open(writer);
LeafReaderContext segment = reader.leaves().get(0);
PerThreadIDAndVersionLookup lookup = new PerThreadIDAndVersionLookup(segment.reader());
PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader());
// found doc
DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), null, segment);
assertNotNull(result);
@ -81,7 +81,7 @@ public class VersionLookupTests extends ESTestCase {
writer.addDocument(doc);
DirectoryReader reader = DirectoryReader.open(writer);
LeafReaderContext segment = reader.leaves().get(0);
PerThreadIDAndVersionLookup lookup = new PerThreadIDAndVersionLookup(segment.reader());
PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader());
// return the last doc when there are duplicates
DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), null, segment);
assertNotNull(result);

View File

@ -38,8 +38,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.common.lucene.uid.VersionsResolver.loadDocIdAndVersion;
import static org.elasticsearch.common.lucene.uid.VersionsResolver.loadVersion;
import static org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.loadDocIdAndVersion;
import static org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.loadVersion;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
@ -145,7 +145,7 @@ public class VersionsTests extends ESTestCase {
/** Test that version map cache works, is evicted on close, etc */
public void testCache() throws Exception {
int size = VersionsResolver.lookupStates.size();
int size = VersionsAndSeqNoResolver.lookupStates.size();
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
@ -156,21 +156,21 @@ public class VersionsTests extends ESTestCase {
DirectoryReader reader = DirectoryReader.open(writer);
// should increase cache size by 1
assertEquals(87, loadVersion(reader, new Term(UidFieldMapper.NAME, "6")));
assertEquals(size+1, VersionsResolver.lookupStates.size());
assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size());
// should be cache hit
assertEquals(87, loadVersion(reader, new Term(UidFieldMapper.NAME, "6")));
assertEquals(size+1, VersionsResolver.lookupStates.size());
assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size());
reader.close();
writer.close();
// core should be evicted from the map
assertEquals(size, VersionsResolver.lookupStates.size());
assertEquals(size, VersionsAndSeqNoResolver.lookupStates.size());
dir.close();
}
/** Test that version map cache behaves properly with a filtered reader */
public void testCacheFilterReader() throws Exception {
int size = VersionsResolver.lookupStates.size();
int size = VersionsAndSeqNoResolver.lookupStates.size();
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
@ -180,17 +180,17 @@ public class VersionsTests extends ESTestCase {
writer.addDocument(doc);
DirectoryReader reader = DirectoryReader.open(writer);
assertEquals(87, loadVersion(reader, new Term(UidFieldMapper.NAME, "6")));
assertEquals(size+1, VersionsResolver.lookupStates.size());
assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size());
// now wrap the reader
DirectoryReader wrapped = ElasticsearchDirectoryReader.wrap(reader, new ShardId("bogus", "_na_", 5));
assertEquals(87, loadVersion(wrapped, new Term(UidFieldMapper.NAME, "6")));
// same size map: core cache key is shared
assertEquals(size+1, VersionsResolver.lookupStates.size());
assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size());
reader.close();
writer.close();
// core should be evicted from the map
assertEquals(size, VersionsResolver.lookupStates.size());
assertEquals(size, VersionsAndSeqNoResolver.lookupStates.size());
dir.close();
}
}

View File

@ -41,7 +41,7 @@ import static org.hamcrest.Matchers.startsWith;
public class IndexingSlowLogTests extends ESTestCase {
public void testSlowLogParsedDocumentPrinterSourceToLog() throws IOException {
BytesReference source = JsonXContent.contentBuilder().startObject().field("foo", "bar").endObject().bytes();
ParsedDocument pd = new ParsedDocument(new NumericDocValuesField("version", 1), SeqNoFieldMapper.SequenceID.emptySeqID(), "id",
ParsedDocument pd = new ParsedDocument(new NumericDocValuesField("version", 1), SeqNoFieldMapper.SequenceIDFields.emptySeqID(), "id",
"test", null, null, source, XContentType.JSON, null);
Index index = new Index("foo", "123");
// Turning off document logging doesn't log source[]

View File

@ -83,7 +83,8 @@ import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.lucene.uid.VersionsResolver;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
@ -292,7 +293,7 @@ public class InternalEngineTests extends ESTestCase {
private static ParsedDocument testParsedDocument(String id, String type, String routing, Document document, BytesReference source, Mapping mappingUpdate) {
Field uidField = new Field("_uid", Uid.createUid(type, id), UidFieldMapper.Defaults.FIELD_TYPE);
Field versionField = new NumericDocValuesField("_version", 0);
SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID();
SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
document.add(uidField);
document.add(versionField);
document.add(seqID.seqNo);
@ -1369,19 +1370,10 @@ public class InternalEngineTests extends ESTestCase {
public void testOutOfOrderDocsOnReplica() throws IOException {
final List<Engine.Operation> ops = generateSingleDocHistory(true,
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 2, 20);
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE, VersionType.FORCE), false, 2, 2, 20);
assertOpsOnReplica(ops, replicaEngine, true);
}
public void testNonStandardVersioningOnReplica() throws IOException {
// TODO: this can be folded into testOutOfOrderDocsOnReplica once out of order
// is detected using seq#
final List<Engine.Operation> ops = generateSingleDocHistory(true,
randomFrom(VersionType.EXTERNAL_GTE, VersionType.FORCE), false, 2, 2, 20);
assertOpsOnReplica(ops, replicaEngine, false);
}
public void testOutOfOrderDocsOnReplicaOldPrimary() throws IOException {
IndexSettings oldSettings = IndexSettingsModule.newIndexSettings("testOld", Settings.builder()
.put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), "1h") // make sure this doesn't kick in on us
@ -3601,9 +3593,17 @@ public class InternalEngineTests extends ESTestCase {
*/
private Tuple<Long, Long> getSequenceID(Engine engine, Engine.Get get) throws EngineException {
try (Searcher searcher = engine.acquireSearcher("get")) {
long seqNum = VersionsResolver.loadSeqNo(searcher.reader(), get.uid());
long primaryTerm = VersionsResolver.loadPrimaryTerm(searcher.reader(), get.uid());
return new Tuple<>(seqNum, primaryTerm);
final long primaryTerm;
final long seqNo;
DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), get.uid());
if (docIdAndSeqNo == null) {
primaryTerm = 0;
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
} else {
seqNo = docIdAndSeqNo.seqNo;
primaryTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docIdAndSeqNo);
}
return new Tuple<>(seqNo, primaryTerm);
} catch (Exception e) {
throw new EngineException(shardId, "unable to retrieve sequence id", e);
}

View File

@ -33,7 +33,7 @@ public class LiveVersionMapTests extends ESTestCase {
for (int i = 0; i < 100000; ++i) {
BytesRefBuilder uid = new BytesRefBuilder();
uid.copyChars(TestUtil.randomSimpleString(random(), 10, 20));
VersionValue version = new VersionValue(randomLong());
VersionValue version = new VersionValue(randomLong(), randomLong(), randomLong());
map.putUnderLock(uid.toBytesRef(), version);
}
long actualRamBytesUsed = RamUsageTester.sizeOf(map);
@ -48,7 +48,7 @@ public class LiveVersionMapTests extends ESTestCase {
for (int i = 0; i < 100000; ++i) {
BytesRefBuilder uid = new BytesRefBuilder();
uid.copyChars(TestUtil.randomSimpleString(random(), 10, 20));
VersionValue version = new VersionValue(randomLong());
VersionValue version = new VersionValue(randomLong(), randomLong(), randomLong());
map.putUnderLock(uid.toBytesRef(), version);
}
actualRamBytesUsed = RamUsageTester.sizeOf(map);

View File

@ -25,12 +25,12 @@ import org.elasticsearch.test.ESTestCase;
public class VersionValueTests extends ESTestCase {
public void testRamBytesUsed() {
VersionValue versionValue = new VersionValue(randomLong());
VersionValue versionValue = new VersionValue(randomLong(), randomLong(), randomLong());
assertEquals(RamUsageTester.sizeOf(versionValue), versionValue.ramBytesUsed());
}
public void testDeleteRamBytesUsed() {
DeleteVersionValue versionValue = new DeleteVersionValue(randomLong(), randomLong());
DeleteVersionValue versionValue = new DeleteVersionValue(randomLong(), randomLong(), randomLong(), randomLong());
assertEquals(RamUsageTester.sizeOf(versionValue), versionValue.ramBytesUsed());
}

View File

@ -27,13 +27,9 @@ import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.bulk.BulkItemRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.bulk.BulkShardResponse;
import org.elasticsearch.action.bulk.TransportShardBulkActionTests;
import org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.PlainActionFuture;
@ -98,6 +94,10 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
}
protected IndexMetaData buildIndexMetaData(int replicas) throws IOException {
return buildIndexMetaData(replicas, indexMapping);
}
protected IndexMetaData buildIndexMetaData(int replicas, Map<String, String> mappings) throws IOException {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, replicas)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
@ -105,7 +105,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
IndexMetaData.Builder metaData = IndexMetaData.builder(index.getName())
.settings(settings)
.primaryTerm(0, 1);
for (Map.Entry<String, String> typeMapping : indexMapping.entrySet()) {
for (Map.Entry<String, String> typeMapping : mappings.entrySet()) {
metaData.putMapping(typeMapping.getKey(), typeMapping.getValue());
}
return metaData.build();
@ -224,15 +224,24 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
updateAllocationIDsOnPrimary();
}
public synchronized IndexShard addReplica() throws IOException {
public IndexShard addReplica() throws IOException {
final ShardRouting replicaRouting = createShardRouting("s" + replicaId.incrementAndGet(), false);
final IndexShard replica =
newShard(replicaRouting, indexMetaData, null, this::syncGlobalCheckpoint, getEngineFactory(replicaRouting));
replicas.add(replica);
updateAllocationIDsOnPrimary();
addReplica(replica);
return replica;
}
public synchronized void addReplica(IndexShard replica) {
assert shardRoutings().stream()
.filter(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())).findFirst().isPresent() == false :
"replica with aId [" + replica.routingEntry().allocationId() + "] already exists";
replica.updatePrimaryTerm(primary.getPrimaryTerm());
replicas.add(replica);
updateAllocationIDsOnPrimary();
}
public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardPath, final String nodeId) throws IOException {
final ShardRouting shardRouting = TestShardRouting.newShardRouting(
shardId,
@ -264,6 +273,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
}
boolean found = replicas.remove(replica);
assert found;
closeShards(primary);
primary = replica;
replica.updateRoutingEntry(replica.routingEntry().moveActiveReplicaToPrimary());
updateAllocationIDsOnPrimary();

View File

@ -18,6 +18,9 @@
*/
package org.elasticsearch.index.replication;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
@ -37,6 +40,7 @@ import org.elasticsearch.indices.recovery.RecoveryTarget;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
@ -152,4 +156,28 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
}
}
public void testConflictingOpsOnReplica() throws Exception {
Map<String, String> mappings =
Collections.singletonMap("type", "{ \"type\": { \"properties\": { \"f\": { \"type\": \"keyword\"} }}}");
try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(2, mappings))) {
shards.startAll();
IndexShard replica1 = shards.getReplicas().get(0);
logger.info("--> isolated replica " + replica1.routingEntry());
shards.removeReplica(replica1);
IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "1").source("{ \"f\": \"1\"}", XContentType.JSON);
shards.index(indexRequest);
shards.addReplica(replica1);
logger.info("--> promoting replica to primary " + replica1.routingEntry());
shards.promoteReplicaToPrimary(replica1);
indexRequest = new IndexRequest(index.getName(), "type", "1").source("{ \"f\": \"2\"}", XContentType.JSON);
shards.index(indexRequest);
shards.refresh("test");
for (IndexShard shard : shards) {
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
TopDocs search = searcher.searcher().search(new TermQuery(new Term("f", "2")), 10);
assertEquals("shard " + shard.routingEntry() + " misses new version", 1, search.totalHits);
}
}
}
}
}

View File

@ -107,7 +107,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
Mapping mappingUpdate) {
Field uidField = new Field("_uid", Uid.createUid(type, id), UidFieldMapper.Defaults.FIELD_TYPE);
Field versionField = new NumericDocValuesField("_version", 0);
SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID();
SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
document.add(uidField);
document.add(versionField);
document.add(seqID.seqNo);

View File

@ -551,7 +551,7 @@ public class IndexShardTests extends IndexShardTestCase {
ParseContext.Document document, BytesReference source, Mapping mappingUpdate) {
Field uidField = new Field("_uid", Uid.createUid(type, id), UidFieldMapper.Defaults.FIELD_TYPE);
Field versionField = new NumericDocValuesField("_version", 0);
SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID();
SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
document.add(uidField);
document.add(versionField);
document.add(seqID.seqNo);

View File

@ -332,7 +332,7 @@ public class RefreshListenersTests extends ESTestCase {
document.add(new TextField("test", testFieldValue, Field.Store.YES));
Field uidField = new Field("_uid", Uid.createUid(type, id), UidFieldMapper.Defaults.FIELD_TYPE);
Field versionField = new NumericDocValuesField("_version", Versions.MATCH_ANY);
SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID();
SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
document.add(uidField);
document.add(versionField);
document.add(seqID.seqNo);

View File

@ -2048,7 +2048,7 @@ public class TranslogTests extends ESTestCase {
public void testTranslogOpSerialization() throws Exception {
BytesReference B_1 = new BytesArray(new byte[]{1});
SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID();
SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
assert Version.CURRENT.major <= 6 : "Using UNASSIGNED_SEQ_NO can be removed in 7.0, because 6.0+ nodes have actual sequence numbers";
long randomSeqNum = randomBoolean() ? SequenceNumbersService.UNASSIGNED_SEQ_NO : randomNonNegativeLong();
long randomPrimaryTerm = randomBoolean() ? 0 : randomNonNegativeLong();

View File

@ -204,7 +204,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
document.add(new TextField("test", "test", Field.Store.YES));
final Field uidField = new Field("_uid", Uid.createUid(type, id), UidFieldMapper.Defaults.FIELD_TYPE);
final Field versionField = new NumericDocValuesField("_version", Versions.MATCH_ANY);
final SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID();
final SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
document.add(uidField);
document.add(versionField);
document.add(seqID.seqNo);