Store _version as a numeric doc values field.
Doc values can be expected to be more compact than payloads and should provide better flexibility since doc values formats can be picked on a per-field basis. This patch: - makes _version stored as a numeric doc values field, - manages backwards compatibility: if a version is not found in doc values, then it will look into payloads, - uses background merges to upgrade old segments and move _version from payloads to doc values. Closes #3103
This commit is contained in:
parent
5ea6c77dad
commit
490c7103ae
|
@ -1,213 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.lucene.uid;
|
||||
|
||||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.analysis.TokenStream;
|
||||
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
|
||||
import org.apache.lucene.analysis.tokenattributes.PayloadAttribute;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.index.*;
|
||||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.Numbers;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Reader;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
// TODO: LUCENE 4 UPGRADE: Store version as doc values instead of as a payload.
|
||||
public class UidField extends Field {
|
||||
|
||||
public static class DocIdAndVersion {
|
||||
public final int docId;
|
||||
public final long version;
|
||||
public final AtomicReaderContext reader;
|
||||
|
||||
public DocIdAndVersion(int docId, long version, AtomicReaderContext reader) {
|
||||
this.docId = docId;
|
||||
this.version = version;
|
||||
this.reader = reader;
|
||||
}
|
||||
}
|
||||
|
||||
// this works fine for nested docs since they don't have the payload which has the version
|
||||
// so we iterate till we find the one with the payload
|
||||
public static DocIdAndVersion loadDocIdAndVersion(AtomicReaderContext context, Term term) {
|
||||
int docId = Lucene.NO_DOC;
|
||||
try {
|
||||
Terms terms = context.reader().terms(term.field());
|
||||
if (terms == null) {
|
||||
return null;
|
||||
}
|
||||
final TermsEnum termsEnum = terms.iterator(null);
|
||||
if (termsEnum == null) {
|
||||
return null;
|
||||
}
|
||||
if (!termsEnum.seekExact(term.bytes(), true)) {
|
||||
return null;
|
||||
}
|
||||
DocsAndPositionsEnum uid = termsEnum.docsAndPositions(context.reader().getLiveDocs(), null, DocsAndPositionsEnum.FLAG_PAYLOADS);
|
||||
if (uid == null || uid.nextDoc() == DocIdSetIterator.NO_MORE_DOCS) {
|
||||
return null; // no doc
|
||||
}
|
||||
// Note, only master docs uid have version payload, so we can use that info to not
|
||||
// take them into account
|
||||
do {
|
||||
docId = uid.docID();
|
||||
uid.nextPosition();
|
||||
if (uid.getPayload() == null) {
|
||||
continue;
|
||||
}
|
||||
if (uid.getPayload().length < 8) {
|
||||
continue;
|
||||
}
|
||||
byte[] payload = new byte[uid.getPayload().length];
|
||||
System.arraycopy(uid.getPayload().bytes, uid.getPayload().offset, payload, 0, uid.getPayload().length);
|
||||
return new DocIdAndVersion(docId, Numbers.bytesToLong(payload), context);
|
||||
} while (uid.nextDoc() != DocIdSetIterator.NO_MORE_DOCS);
|
||||
return new DocIdAndVersion(docId, -2, context);
|
||||
} catch (Exception e) {
|
||||
return new DocIdAndVersion(docId, -2, context);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Load the version for the uid from the reader, returning -1 if no doc exists, or -2 if
|
||||
* no version is available (for backward comp.)
|
||||
*/
|
||||
public static long loadVersion(AtomicReaderContext context, Term term) {
|
||||
try {
|
||||
Terms terms = context.reader().terms(term.field());
|
||||
if (terms == null) {
|
||||
return -1;
|
||||
}
|
||||
final TermsEnum termsEnum = terms.iterator(null);
|
||||
if (termsEnum == null) {
|
||||
return -1;
|
||||
}
|
||||
if (!termsEnum.seekExact(term.bytes(), true)) {
|
||||
return -1;
|
||||
}
|
||||
DocsAndPositionsEnum uid = termsEnum.docsAndPositions(context.reader().getLiveDocs(), null, DocsAndPositionsEnum.FLAG_PAYLOADS);
|
||||
if (uid == null || uid.nextDoc() == DocIdSetIterator.NO_MORE_DOCS) {
|
||||
return -1;
|
||||
}
|
||||
// Note, only master docs uid have version payload, so we can use that info to not
|
||||
// take them into account
|
||||
do {
|
||||
uid.nextPosition();
|
||||
if (uid.getPayload() == null) {
|
||||
continue;
|
||||
}
|
||||
if (uid.getPayload().length < 8) {
|
||||
continue;
|
||||
}
|
||||
byte[] payload = new byte[uid.getPayload().length];
|
||||
System.arraycopy(uid.getPayload().bytes, uid.getPayload().offset, payload, 0, uid.getPayload().length);
|
||||
return Numbers.bytesToLong(payload);
|
||||
} while (uid.nextDoc() != DocIdSetIterator.NO_MORE_DOCS);
|
||||
return -2;
|
||||
} catch (Exception e) {
|
||||
return -2;
|
||||
}
|
||||
}
|
||||
|
||||
private String uid;
|
||||
|
||||
private long version;
|
||||
|
||||
public UidField(String uid) {
|
||||
this(UidFieldMapper.NAME, uid, 0);
|
||||
}
|
||||
|
||||
public UidField(String name, String uid, long version) {
|
||||
super(name, UidFieldMapper.Defaults.FIELD_TYPE);
|
||||
this.uid = uid;
|
||||
this.version = version;
|
||||
this.tokenStream = new UidPayloadTokenStream(this);
|
||||
}
|
||||
|
||||
public String uid() {
|
||||
return this.uid;
|
||||
}
|
||||
|
||||
public void setUid(String uid) {
|
||||
this.uid = uid;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String stringValue() {
|
||||
return uid;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Reader readerValue() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public long version() {
|
||||
return this.version;
|
||||
}
|
||||
|
||||
public void version(long version) {
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TokenStream tokenStream(Analyzer analyzer) throws IOException {
|
||||
return tokenStream;
|
||||
}
|
||||
|
||||
public static final class UidPayloadTokenStream extends TokenStream {
|
||||
|
||||
private final PayloadAttribute payloadAttribute = addAttribute(PayloadAttribute.class);
|
||||
private final CharTermAttribute termAtt = addAttribute(CharTermAttribute.class);
|
||||
|
||||
private final UidField field;
|
||||
|
||||
private boolean added = false;
|
||||
|
||||
public UidPayloadTokenStream(UidField field) {
|
||||
this.field = field;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() throws IOException {
|
||||
added = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean incrementToken() throws IOException {
|
||||
if (added) {
|
||||
return false;
|
||||
}
|
||||
termAtt.setLength(0);
|
||||
termAtt.append(field.uid);
|
||||
payloadAttribute.setPayload(new BytesRef(Numbers.longToBytes(field.version())));
|
||||
added = true;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,139 @@
|
|||
/*
|
||||
* Licensed to ElasticSearch and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. ElasticSearch licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.lucene.uid;
|
||||
|
||||
import org.apache.lucene.util.Bits;
|
||||
|
||||
import org.apache.lucene.index.*;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.Numbers;
|
||||
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/** Utility class to resolve the Lucene doc ID and version for a given uid. */
|
||||
public class Versions {
|
||||
|
||||
public static final long NOT_FOUND = -1L;
|
||||
public static final long NOT_SET = -2L;
|
||||
|
||||
private Versions() {}
|
||||
|
||||
/** Wraps an {@link AtomicReaderContext}, a doc ID <b>relative to the context doc base</b> and a version. */
|
||||
public static class DocIdAndVersion {
|
||||
public final int docId;
|
||||
public final long version;
|
||||
public final AtomicReaderContext reader;
|
||||
|
||||
public DocIdAndVersion(int docId, long version, AtomicReaderContext reader) {
|
||||
this.docId = docId;
|
||||
this.version = version;
|
||||
this.reader = reader;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Load the internal doc ID and version for the uid from the reader, returning<ul>
|
||||
* <li>null if the uid wasn't found,
|
||||
* <li>a doc ID and a version otherwise, the version being potentially set to {@link #NOT_SET} if the uid has no associated version
|
||||
* </ul>
|
||||
*/
|
||||
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException {
|
||||
// iterate backwards to optimize for the frequently updated documents
|
||||
// which are likely to be in the last segments
|
||||
final List<AtomicReaderContext> leaves = reader.leaves();
|
||||
for (int i = leaves.size() - 1; i >= 0; --i) {
|
||||
final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(leaves.get(i), term);
|
||||
if (docIdAndVersion != null) {
|
||||
assert docIdAndVersion.version != NOT_FOUND;
|
||||
return docIdAndVersion;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Load the version for the uid from the reader, returning<ul>
|
||||
* <li>{@link #NOT_FOUND} if no matching doc exists,
|
||||
* <li>{@link #NOT_SET} if no version is available,
|
||||
* <li>the version associated with the provided uid otherwise
|
||||
* </ul>
|
||||
*/
|
||||
public static long loadVersion(IndexReader reader, Term term) throws IOException {
|
||||
final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term);
|
||||
return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version;
|
||||
}
|
||||
|
||||
/** Same as {@link #loadDocIdAndVersion(IndexReader, Term)} but operates directly on a reader context. */
|
||||
public static DocIdAndVersion loadDocIdAndVersion(AtomicReaderContext readerContext, Term term) throws IOException {
|
||||
assert term.field().equals(UidFieldMapper.NAME);
|
||||
final AtomicReader reader = readerContext.reader();
|
||||
final Bits liveDocs = reader.getLiveDocs();
|
||||
final Terms terms = reader.terms(UidFieldMapper.NAME);
|
||||
assert terms != null : "All segments must have a _uid field, but " + reader + " doesn't";
|
||||
final TermsEnum termsEnum = terms.iterator(null);
|
||||
final boolean useCache = false; // avoid high cache churn
|
||||
if (!termsEnum.seekExact(term.bytes(), useCache)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Versions are stored as doc values...
|
||||
final NumericDocValues versions = reader.getNumericDocValues(UidFieldMapper.VERSION);
|
||||
if (versions != null || !terms.hasPayloads()) {
|
||||
// only the last doc that matches the _uid is interesting here: if it is deleted, then there is
|
||||
// no match otherwise previous docs are necessarily either deleted or nested docs
|
||||
final DocsEnum docs = termsEnum.docs(null, null);
|
||||
int docID = DocsEnum.NO_MORE_DOCS;
|
||||
for (int d = docs.nextDoc(); d != DocsEnum.NO_MORE_DOCS; d = docs.nextDoc()) {
|
||||
docID = d;
|
||||
}
|
||||
assert docID != DocsEnum.NO_MORE_DOCS; // would mean that the term exists but has no match at all
|
||||
if (liveDocs != null && !liveDocs.get(docID)) {
|
||||
return null;
|
||||
} else if (versions != null) {
|
||||
return new DocIdAndVersion(docID, versions.get(docID), readerContext);
|
||||
} else {
|
||||
// _uid found, but no doc values and no payloads
|
||||
return new DocIdAndVersion(docID, NOT_SET, readerContext);
|
||||
}
|
||||
}
|
||||
|
||||
// ... but used to be stored as payloads
|
||||
final DocsAndPositionsEnum dpe = termsEnum.docsAndPositions(liveDocs, null, DocsAndPositionsEnum.FLAG_PAYLOADS);
|
||||
assert dpe != null; // terms has payloads
|
||||
int docID = DocsEnum.NO_MORE_DOCS;
|
||||
for (int d = dpe.nextDoc(); d != DocsEnum.NO_MORE_DOCS; d = dpe.nextDoc()) {
|
||||
docID = d;
|
||||
dpe.nextPosition();
|
||||
final BytesRef payload = dpe.getPayload();
|
||||
if (payload != null && payload.length == 8) {
|
||||
return new DocIdAndVersion(d, Numbers.bytesToLong(payload), readerContext);
|
||||
}
|
||||
}
|
||||
|
||||
if (docID == DocsEnum.NO_MORE_DOCS) {
|
||||
return null;
|
||||
} else {
|
||||
return new DocIdAndVersion(docID, NOT_SET, readerContext);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -19,12 +19,15 @@
|
|||
|
||||
package org.elasticsearch.index.codec;
|
||||
|
||||
import org.apache.lucene.codecs.DocValuesFormat;
|
||||
import org.apache.lucene.codecs.PostingsFormat;
|
||||
import org.apache.lucene.codecs.diskdv.DiskDocValuesFormat;
|
||||
import org.apache.lucene.codecs.lucene42.Lucene42Codec;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.index.codec.postingsformat.PostingsFormatProvider;
|
||||
import org.elasticsearch.index.mapper.FieldMappers;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
|
||||
|
||||
/**
|
||||
* {@link PerFieldMappingPostingFormatCodec This postings format} is the default
|
||||
|
@ -39,11 +42,13 @@ public class PerFieldMappingPostingFormatCodec extends Lucene42Codec {
|
|||
private final ESLogger logger;
|
||||
private final MapperService mapperService;
|
||||
private final PostingsFormat defaultPostingFormat;
|
||||
private final DocValuesFormat diskDocValuesFormat;
|
||||
|
||||
public PerFieldMappingPostingFormatCodec(MapperService mapperService, PostingsFormat defaultPostingFormat, ESLogger logger) {
|
||||
this.mapperService = mapperService;
|
||||
this.logger = logger;
|
||||
this.defaultPostingFormat = defaultPostingFormat;
|
||||
this.diskDocValuesFormat = new DiskDocValuesFormat();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -56,4 +61,14 @@ public class PerFieldMappingPostingFormatCodec extends Lucene42Codec {
|
|||
PostingsFormatProvider postingsFormat = indexName.mapper().postingsFormatProvider();
|
||||
return postingsFormat != null ? postingsFormat.get() : defaultPostingFormat;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DocValuesFormat getDocValuesFormatForField(String field) {
|
||||
if (UidFieldMapper.VERSION.equals(field)) {
|
||||
// Use DiskDVF for version by default
|
||||
// TODO: Make it configurable
|
||||
return diskDocValuesFormat;
|
||||
}
|
||||
return super.getDocValuesFormatForField(field);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.elasticsearch.common.Nullable;
|
|||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.component.CloseableComponent;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lucene.uid.UidField;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
|
@ -392,6 +392,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
|||
this.doc = doc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DocumentMapper docMapper() {
|
||||
return this.docMapper;
|
||||
}
|
||||
|
@ -411,6 +412,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
|||
return this.origin;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ParsedDocument parsedDoc() {
|
||||
return this.doc;
|
||||
}
|
||||
|
@ -445,6 +447,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
|||
|
||||
public Create version(long version) {
|
||||
this.version = version;
|
||||
this.doc.version().setLongValue(version);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -461,6 +464,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
|||
return this.doc.parent();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Document> docs() {
|
||||
return this.doc.docs();
|
||||
}
|
||||
|
@ -473,11 +477,6 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
|||
return this.doc.source();
|
||||
}
|
||||
|
||||
public UidField uidField() {
|
||||
return doc.uid();
|
||||
}
|
||||
|
||||
|
||||
public Create startTime(long startTime) {
|
||||
this.startTime = startTime;
|
||||
return this;
|
||||
|
@ -520,6 +519,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
|||
this.doc = doc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DocumentMapper docMapper() {
|
||||
return this.docMapper;
|
||||
}
|
||||
|
@ -543,12 +543,14 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
|||
return this.uid;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ParsedDocument parsedDoc() {
|
||||
return this.doc;
|
||||
}
|
||||
|
||||
public Index version(long version) {
|
||||
this.version = version;
|
||||
doc.version().setLongValue(version);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -565,6 +567,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
|||
return this.versionType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Document> docs() {
|
||||
return this.doc.docs();
|
||||
}
|
||||
|
@ -601,10 +604,6 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
|||
return this.doc.source();
|
||||
}
|
||||
|
||||
public UidField uidField() {
|
||||
return doc.uid();
|
||||
}
|
||||
|
||||
public Index startTime(long startTime) {
|
||||
this.startTime = startTime;
|
||||
return this;
|
||||
|
@ -834,7 +833,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
|||
private final boolean exists;
|
||||
private final long version;
|
||||
private final Translog.Source source;
|
||||
private final UidField.DocIdAndVersion docIdAndVersion;
|
||||
private final Versions.DocIdAndVersion docIdAndVersion;
|
||||
private final Searcher searcher;
|
||||
|
||||
public static final GetResult NOT_EXISTS = new GetResult(false, -1, null);
|
||||
|
@ -847,7 +846,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
|||
this.searcher = null;
|
||||
}
|
||||
|
||||
public GetResult(Searcher searcher, UidField.DocIdAndVersion docIdAndVersion) {
|
||||
public GetResult(Searcher searcher, Versions.DocIdAndVersion docIdAndVersion) {
|
||||
this.exists = true;
|
||||
this.source = null;
|
||||
this.version = docIdAndVersion.version;
|
||||
|
@ -872,7 +871,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
|||
return this.searcher;
|
||||
}
|
||||
|
||||
public UidField.DocIdAndVersion docIdAndVersion() {
|
||||
public Versions.DocIdAndVersion docIdAndVersion() {
|
||||
return docIdAndVersion;
|
||||
}
|
||||
|
||||
|
|
|
@ -37,7 +37,7 @@ import org.elasticsearch.common.inject.Inject;
|
|||
import org.elasticsearch.common.lucene.HashedBytesRef;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.search.XFilteredQuery;
|
||||
import org.elasticsearch.common.lucene.uid.UidField;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
|
@ -50,6 +50,7 @@ import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
|
|||
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
|
||||
import org.elasticsearch.index.engine.*;
|
||||
import org.elasticsearch.index.indexing.ShardIndexingService;
|
||||
import org.elasticsearch.index.merge.policy.IndexUpgraderMergePolicy;
|
||||
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
|
||||
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
|
||||
import org.elasticsearch.index.search.nested.IncludeNestedDocsQuery;
|
||||
|
@ -332,20 +333,15 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||
// no version, get the version from the index, we know that we refresh on flush
|
||||
Searcher searcher = searcher();
|
||||
try {
|
||||
List<AtomicReaderContext> readers = searcher.reader().leaves();
|
||||
for (int i = 0; i < readers.size(); i++) {
|
||||
AtomicReaderContext readerContext = readers.get(i);
|
||||
UidField.DocIdAndVersion docIdAndVersion = UidField.loadDocIdAndVersion(readerContext, get.uid());
|
||||
if (docIdAndVersion != null && docIdAndVersion.docId != Lucene.NO_DOC) {
|
||||
// note, we don't release the searcher here, since it will be released as part of the external
|
||||
// API usage, since it still needs it to load data...
|
||||
return new GetResult(searcher, docIdAndVersion);
|
||||
}
|
||||
final Versions.DocIdAndVersion docIdAndVersion = Versions.loadDocIdAndVersion(searcher.reader(), get.uid());
|
||||
if (docIdAndVersion != null) {
|
||||
return new GetResult(searcher, docIdAndVersion);
|
||||
}
|
||||
// don't release the searcher on this path, it is the responsability of the caller to call GetResult.release
|
||||
} catch (Exception e) {
|
||||
searcher.release();
|
||||
//TODO: A better exception goes here
|
||||
throw new EngineException(shardId(), "failed to load document", e);
|
||||
throw new EngineException(shardId(), "Couldn't resolve version", e);
|
||||
}
|
||||
searcher.release();
|
||||
return GetResult.NOT_EXISTS;
|
||||
|
@ -383,7 +379,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||
|
||||
private void innerCreate(Create create, IndexWriter writer) throws IOException {
|
||||
synchronized (dirtyLock(create.uid())) {
|
||||
UidField uidField = create.uidField();
|
||||
HashedBytesRef versionKey = versionKey(create.uid());
|
||||
final long currentVersion;
|
||||
VersionValue versionValue = versionMap.get(versionKey);
|
||||
|
@ -462,7 +457,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||
}
|
||||
}
|
||||
|
||||
uidField.version(updatedVersion);
|
||||
create.version(updatedVersion);
|
||||
|
||||
if (create.docs().size() > 1) {
|
||||
|
@ -508,7 +502,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||
|
||||
private void innerIndex(Index index, IndexWriter writer) throws IOException {
|
||||
synchronized (dirtyLock(index.uid())) {
|
||||
UidField uidField = index.uidField();
|
||||
HashedBytesRef versionKey = versionKey(index.uid());
|
||||
final long currentVersion;
|
||||
VersionValue versionValue = versionMap.get(versionKey);
|
||||
|
@ -568,7 +561,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||
updatedVersion = index.version();
|
||||
}
|
||||
|
||||
uidField.version(updatedVersion);
|
||||
index.version(updatedVersion);
|
||||
|
||||
if (currentVersion == -1) {
|
||||
|
@ -1313,19 +1305,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||
return dirtyLock(uid.bytes());
|
||||
}
|
||||
|
||||
private long loadCurrentVersionFromIndex(Term uid) {
|
||||
private long loadCurrentVersionFromIndex(Term uid) throws IOException {
|
||||
Searcher searcher = searcher();
|
||||
try {
|
||||
List<AtomicReaderContext> readers = searcher.reader().leaves();
|
||||
for (int i = 0; i < readers.size(); i++) {
|
||||
AtomicReaderContext readerContext = readers.get(i);
|
||||
long version = UidField.loadVersion(readerContext, uid);
|
||||
// either -2 (its there, but no version associated), or an actual version
|
||||
if (version != -1) {
|
||||
return version;
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
return Versions.loadVersion(searcher.reader(), uid);
|
||||
} finally {
|
||||
searcher.release();
|
||||
}
|
||||
|
@ -1344,7 +1327,11 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||
config.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
|
||||
config.setIndexDeletionPolicy(deletionPolicy);
|
||||
config.setMergeScheduler(mergeScheduler.newMergeScheduler());
|
||||
config.setMergePolicy(mergePolicyProvider.newMergePolicy());
|
||||
MergePolicy mergePolicy = mergePolicyProvider.newMergePolicy();
|
||||
// Give us the opportunity to upgrade old segments while performing
|
||||
// background merges
|
||||
mergePolicy = new IndexUpgraderMergePolicy(mergePolicy);
|
||||
config.setMergePolicy(mergePolicy);
|
||||
config.setSimilarity(similarityService.similarity());
|
||||
config.setRAMBufferSizeMB(indexingBufferSize.mbFrac());
|
||||
config.setTermIndexInterval(termIndexInterval);
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.elasticsearch.ElasticSearchException;
|
|||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.lucene.uid.UidField;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.metrics.CounterMetric;
|
||||
import org.elasticsearch.common.metrics.MeanMetric;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -311,7 +311,7 @@ public class ShardGetService extends AbstractIndexShardComponent {
|
|||
private GetResult innerGetLoadFromStoredFields(String type, String id, String[] gFields, Engine.GetResult get, DocumentMapper docMapper) {
|
||||
Map<String, GetField> fields = null;
|
||||
BytesReference source = null;
|
||||
UidField.DocIdAndVersion docIdAndVersion = get.docIdAndVersion();
|
||||
Versions.DocIdAndVersion docIdAndVersion = get.docIdAndVersion();
|
||||
FieldsVisitor fieldVisitor = buildFieldsVisitors(gFields);
|
||||
if (fieldVisitor != null) {
|
||||
try {
|
||||
|
|
|
@ -577,7 +577,7 @@ public class DocumentMapper implements ToXContent {
|
|||
}
|
||||
}
|
||||
|
||||
ParsedDocument doc = new ParsedDocument(context.uid(), context.id(), context.type(), source.routing(), source.timestamp(), source.ttl(), context.docs(), context.analyzer(),
|
||||
ParsedDocument doc = new ParsedDocument(context.uid(), context.version(), context.id(), context.type(), source.routing(), source.timestamp(), source.ttl(), context.docs(), context.analyzer(),
|
||||
context.source(), context.mappingsModified()).parent(source.parent());
|
||||
// reset the context to free up memory
|
||||
context.reset(null, null, null, null);
|
||||
|
|
|
@ -21,10 +21,10 @@ package org.elasticsearch.index.mapper;
|
|||
|
||||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.lucene.all.AllEntries;
|
||||
import org.elasticsearch.common.lucene.uid.UidField;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.analysis.AnalysisService;
|
||||
|
@ -66,7 +66,7 @@ public class ParseContext {
|
|||
|
||||
private DocumentMapper.ParseListener listener;
|
||||
|
||||
private UidField uid;
|
||||
private Field uid, version;
|
||||
|
||||
private StringBuilder stringBuilder = new StringBuilder();
|
||||
|
||||
|
@ -104,6 +104,7 @@ public class ParseContext {
|
|||
}
|
||||
this.analyzer = null;
|
||||
this.uid = null;
|
||||
this.version = null;
|
||||
this.id = null;
|
||||
this.sourceToParse = source;
|
||||
this.source = source == null ? null : sourceToParse.source();
|
||||
|
@ -232,17 +233,25 @@ public class ParseContext {
|
|||
this.id = id;
|
||||
}
|
||||
|
||||
public UidField uid() {
|
||||
public Field uid() {
|
||||
return this.uid;
|
||||
}
|
||||
|
||||
/**
|
||||
* Really, just the uid mapper should set this.
|
||||
*/
|
||||
public void uid(UidField uid) {
|
||||
public void uid(Field uid) {
|
||||
this.uid = uid;
|
||||
}
|
||||
|
||||
public Field version() {
|
||||
return this.version;
|
||||
}
|
||||
|
||||
public void version(Field version) {
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
public boolean includeInAll(Boolean includeInAll, FieldMapper mapper) {
|
||||
return includeInAll(includeInAll, mapper.fieldType().indexed());
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ package org.elasticsearch.index.mapper;
|
|||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.lucene.uid.UidField;
|
||||
import org.apache.lucene.document.Field;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
|
@ -31,7 +31,7 @@ import java.util.List;
|
|||
*/
|
||||
public class ParsedDocument {
|
||||
|
||||
private final UidField uid;
|
||||
private final Field uid, version;
|
||||
|
||||
private final String id;
|
||||
|
||||
|
@ -53,8 +53,9 @@ public class ParsedDocument {
|
|||
|
||||
private String parent;
|
||||
|
||||
public ParsedDocument(UidField uid, String id, String type, String routing, long timestamp, long ttl, List<Document> documents, Analyzer analyzer, BytesReference source, boolean mappingsModified) {
|
||||
public ParsedDocument(Field uid, Field version, String id, String type, String routing, long timestamp, long ttl, List<Document> documents, Analyzer analyzer, BytesReference source, boolean mappingsModified) {
|
||||
this.uid = uid;
|
||||
this.version = version;
|
||||
this.id = id;
|
||||
this.type = type;
|
||||
this.routing = routing;
|
||||
|
@ -66,10 +67,14 @@ public class ParsedDocument {
|
|||
this.mappingsModified = mappingsModified;
|
||||
}
|
||||
|
||||
public UidField uid() {
|
||||
public Field uid() {
|
||||
return this.uid;
|
||||
}
|
||||
|
||||
public Field version() {
|
||||
return version;
|
||||
}
|
||||
|
||||
public String id() {
|
||||
return this.id;
|
||||
}
|
||||
|
@ -122,6 +127,7 @@ public class ParsedDocument {
|
|||
return mappingsModified;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("Document ").append("uid[").append(uid).append("] doc [").append(documents).append("]");
|
||||
|
|
|
@ -21,11 +21,12 @@ package org.elasticsearch.index.mapper.internal;
|
|||
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.document.FieldType;
|
||||
import org.apache.lucene.document.NumericDocValuesField;
|
||||
import org.apache.lucene.index.FieldInfo;
|
||||
import org.apache.lucene.index.IndexableField;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.uid.UidField;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.codec.postingsformat.PostingsFormatProvider;
|
||||
import org.elasticsearch.index.fielddata.FieldDataType;
|
||||
|
@ -42,7 +43,8 @@ import static org.elasticsearch.index.mapper.MapperBuilders.uid;
|
|||
*/
|
||||
public class UidFieldMapper extends AbstractFieldMapper<Uid> implements InternalMapper, RootMapper {
|
||||
|
||||
public static final String NAME = "_uid".intern();
|
||||
public static final String NAME = "_uid";
|
||||
public static final String VERSION = "_version";
|
||||
|
||||
public static final String CONTENT_TYPE = "_uid";
|
||||
|
||||
|
@ -50,22 +52,18 @@ public class UidFieldMapper extends AbstractFieldMapper<Uid> implements Internal
|
|||
public static final String NAME = UidFieldMapper.NAME;
|
||||
|
||||
public static final FieldType FIELD_TYPE = new FieldType(AbstractFieldMapper.Defaults.FIELD_TYPE);
|
||||
public static final FieldType NESTED_FIELD_TYPE = new FieldType(AbstractFieldMapper.Defaults.FIELD_TYPE);
|
||||
public static final FieldType NESTED_FIELD_TYPE;
|
||||
|
||||
static {
|
||||
FIELD_TYPE.setIndexed(true);
|
||||
FIELD_TYPE.setTokenized(false);
|
||||
FIELD_TYPE.setStored(true);
|
||||
FIELD_TYPE.setOmitNorms(true);
|
||||
FIELD_TYPE.setIndexOptions(FieldInfo.IndexOptions.DOCS_AND_FREQS_AND_POSITIONS); // we store payload (otherwise, we really need just docs)
|
||||
FIELD_TYPE.setIndexOptions(FieldInfo.IndexOptions.DOCS_ONLY);
|
||||
FIELD_TYPE.freeze();
|
||||
|
||||
NESTED_FIELD_TYPE.setIndexed(true);
|
||||
NESTED_FIELD_TYPE.setTokenized(false);
|
||||
NESTED_FIELD_TYPE = new FieldType(FIELD_TYPE);
|
||||
NESTED_FIELD_TYPE.setStored(false);
|
||||
NESTED_FIELD_TYPE.setOmitNorms(true);
|
||||
// we can set this to another index option when we move away from storing payload..
|
||||
//NESTED_FIELD_TYPE.setIndexOptions(FieldInfo.IndexOptions.DOCS_ONLY);
|
||||
NESTED_FIELD_TYPE.freeze();
|
||||
}
|
||||
}
|
||||
|
@ -88,7 +86,7 @@ public class UidFieldMapper extends AbstractFieldMapper<Uid> implements Internal
|
|||
|
||||
public static class TypeParser implements Mapper.TypeParser {
|
||||
@Override
|
||||
public Mapper.Builder parse(String name, Map<String, Object> node, ParserContext parserContext) throws MapperParsingException {
|
||||
public Mapper.Builder<?, ?> parse(String name, Map<String, Object> node, ParserContext parserContext) throws MapperParsingException {
|
||||
Builder builder = uid();
|
||||
for (Map.Entry<String, Object> entry : node.entrySet()) {
|
||||
String fieldName = Strings.toUnderscoreCase(entry.getKey());
|
||||
|
@ -102,10 +100,19 @@ public class UidFieldMapper extends AbstractFieldMapper<Uid> implements Internal
|
|||
}
|
||||
}
|
||||
|
||||
private ThreadLocal<UidField> fieldCache = new ThreadLocal<UidField>() {
|
||||
private static class UidAndVersion {
|
||||
final Field uid;
|
||||
final Field version;
|
||||
UidAndVersion() {
|
||||
uid = new Field(NAME, "", Defaults.FIELD_TYPE);
|
||||
version = new NumericDocValuesField(VERSION, -1L);
|
||||
}
|
||||
}
|
||||
|
||||
private final ThreadLocal<UidAndVersion> fieldCache = new ThreadLocal<UidAndVersion>() {
|
||||
@Override
|
||||
protected UidField initialValue() {
|
||||
return new UidField(names().indexName(), "", 0);
|
||||
protected UidAndVersion initialValue() {
|
||||
return new UidAndVersion();
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -158,12 +165,12 @@ public class UidFieldMapper extends AbstractFieldMapper<Uid> implements Internal
|
|||
// since we did not have the uid in the pre phase, we did not add it automatically to the nested docs
|
||||
// as they were created we need to make sure we add it to all the nested docs...
|
||||
if (context.docs().size() > 1) {
|
||||
UidField uidField = (UidField) context.rootDoc().getField(UidFieldMapper.NAME);
|
||||
final IndexableField uidField = context.rootDoc().getField(UidFieldMapper.NAME);
|
||||
assert uidField != null;
|
||||
// we need to go over the docs and add it...
|
||||
for (int i = 1; i < context.docs().size(); i++) {
|
||||
// we don't need to add it as a full uid field in nested docs, since we don't need versioning
|
||||
context.docs().get(i).add(new Field(UidFieldMapper.NAME, uidField.uid(), Defaults.NESTED_FIELD_TYPE));
|
||||
context.docs().get(i).add(new Field(UidFieldMapper.NAME, uidField.stringValue(), Defaults.NESTED_FIELD_TYPE));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -188,10 +195,13 @@ public class UidFieldMapper extends AbstractFieldMapper<Uid> implements Internal
|
|||
// so, caching uid stream and field is fine
|
||||
// since we don't do any mapping parsing without immediate indexing
|
||||
// and, when percolating, we don't index the uid
|
||||
UidField field = fieldCache.get();
|
||||
field.setUid(Uid.createUid(context.stringBuilder(), context.type(), context.id()));
|
||||
context.uid(field);
|
||||
return field; // version get updated by the engine
|
||||
UidAndVersion fields = fieldCache.get();
|
||||
fields.uid.setStringValue(Uid.createUid(context.stringBuilder(), context.type(), context.id()));
|
||||
context.uid(fields.uid);
|
||||
context.version(fields.version);
|
||||
// Add the _version here, parse will take care of adding the _uid
|
||||
context.doc().add(fields.version);
|
||||
return fields.uid; // version get updated by the engine
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -241,7 +251,7 @@ public class UidFieldMapper extends AbstractFieldMapper<Uid> implements Internal
|
|||
|
||||
@Override
|
||||
public void merge(Mapper mergeWith, MergeContext mergeContext) throws MergeMappingException {
|
||||
AbstractFieldMapper fieldMergeWith = (AbstractFieldMapper) mergeWith;
|
||||
AbstractFieldMapper<?> fieldMergeWith = (AbstractFieldMapper<?>) mergeWith;
|
||||
// do nothing here, no merging, but also no exception
|
||||
if (!mergeContext.mergeFlags().simulate()) {
|
||||
// apply changeable values
|
||||
|
|
|
@ -30,7 +30,6 @@ import org.elasticsearch.ElasticSearchIllegalStateException;
|
|||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
|
||||
import org.elasticsearch.common.lucene.search.TermFilter;
|
||||
import org.elasticsearch.common.lucene.uid.UidField;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
@ -426,11 +425,7 @@ public class ObjectMapper implements Mapper, AllFieldMapper.IncludeInAll {
|
|||
// we also rely on this for UidField#loadVersion
|
||||
|
||||
// this is a deeply nested field
|
||||
if (uidField.stringValue() != null) {
|
||||
nestedDoc.add(new Field(UidFieldMapper.NAME, uidField.stringValue(), UidFieldMapper.Defaults.NESTED_FIELD_TYPE));
|
||||
} else {
|
||||
nestedDoc.add(new Field(UidFieldMapper.NAME, ((UidField) uidField).uid(), UidFieldMapper.Defaults.NESTED_FIELD_TYPE));
|
||||
}
|
||||
nestedDoc.add(new Field(UidFieldMapper.NAME, uidField.stringValue(), UidFieldMapper.Defaults.NESTED_FIELD_TYPE));
|
||||
}
|
||||
// the type of the nested doc starts with __, so we can identify that its a nested one in filters
|
||||
// note, we don't prefix it with the type of the doc since it allows us to execute a nested query
|
||||
|
|
|
@ -0,0 +1,221 @@
|
|||
/*
|
||||
* Licensed to ElasticSearch and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. ElasticSearch licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.merge.policy;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.lucene.index.*;
|
||||
import org.apache.lucene.index.FieldInfo.DocValuesType;
|
||||
import org.apache.lucene.index.FieldInfo.IndexOptions;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.packed.GrowableWriter;
|
||||
import org.apache.lucene.util.packed.PackedInts;
|
||||
import org.elasticsearch.common.Numbers;
|
||||
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* A {@link MergePolicy} that upgrades segments.
|
||||
* <p>
|
||||
* It can be useful to use the background merging process to upgrade segments,
|
||||
* for example when we perform internal changes that imply different index
|
||||
* options or when a user modifies his mapping in non-breaking ways: we could
|
||||
* imagine using this merge policy to be able to add doc values to fields after
|
||||
* the fact or on the opposite to remove them.
|
||||
* <p>
|
||||
* For now, this {@link MergePolicy} takes care of moving versions that used to
|
||||
* be stored as payloads to numeric doc values.
|
||||
*/
|
||||
public final class IndexUpgraderMergePolicy extends MergePolicy {
|
||||
|
||||
private final MergePolicy delegate;
|
||||
|
||||
/** @param delegate the merge policy to wrap */
|
||||
public IndexUpgraderMergePolicy(MergePolicy delegate) {
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
/** Return an "upgraded" view of the reader. */
|
||||
static AtomicReader filter(AtomicReader reader) throws IOException {
|
||||
final FieldInfos fieldInfos = reader.getFieldInfos();
|
||||
final FieldInfo versionInfo = fieldInfos.fieldInfo(UidFieldMapper.VERSION);
|
||||
if (versionInfo != null && versionInfo.hasDocValues()) {
|
||||
// the reader is a recent one, it has versions and they are stored
|
||||
// in a numeric doc values field
|
||||
return reader;
|
||||
}
|
||||
// The segment is an old one, load all versions in memory and hide
|
||||
// them behind a numeric doc values field
|
||||
final Terms terms = reader.terms(UidFieldMapper.NAME);
|
||||
if (terms == null || !terms.hasPayloads()) {
|
||||
// The segment doesn't have an _uid field or doesn't have paylods
|
||||
// don't try to do anything clever. If any other segment has versions
|
||||
// all versions of this segment will be initialized to 0
|
||||
return reader;
|
||||
}
|
||||
final TermsEnum uids = terms.iterator(null);
|
||||
final GrowableWriter versions = new GrowableWriter(2, reader.maxDoc(), PackedInts.DEFAULT);
|
||||
DocsAndPositionsEnum dpe = null;
|
||||
for (BytesRef uid = uids.next(); uid != null; uid = uids.next()) {
|
||||
dpe = uids.docsAndPositions(reader.getLiveDocs(), dpe, DocsAndPositionsEnum.FLAG_PAYLOADS);
|
||||
assert dpe != null : "field has payloads";
|
||||
for (int doc = dpe.nextDoc(); doc != DocsEnum.NO_MORE_DOCS; doc = dpe.nextDoc()) {
|
||||
dpe.nextPosition();
|
||||
final BytesRef payload = dpe.getPayload();
|
||||
if (payload != null && payload.length == 8) {
|
||||
final long version = Numbers.bytesToLong(payload);
|
||||
versions.set(doc, version);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Build new field infos, doc values, and return a filter reader
|
||||
final FieldInfo newVersionInfo;
|
||||
if (versionInfo == null) {
|
||||
newVersionInfo = new FieldInfo(UidFieldMapper.VERSION, false, fieldInfos.size(), false, true, false,
|
||||
IndexOptions.DOCS_ONLY, DocValuesType.NUMERIC, DocValuesType.NUMERIC, Collections.<String, String>emptyMap());
|
||||
} else {
|
||||
newVersionInfo = new FieldInfo(UidFieldMapper.VERSION, versionInfo.isIndexed(), versionInfo.number,
|
||||
versionInfo.hasVectors(), versionInfo.omitsNorms(), versionInfo.hasPayloads(),
|
||||
versionInfo.getIndexOptions(), versionInfo.getDocValuesType(), versionInfo.getNormType(), versionInfo.attributes());
|
||||
}
|
||||
final ArrayList<FieldInfo> fieldInfoList = new ArrayList<FieldInfo>();
|
||||
for (FieldInfo info : fieldInfos) {
|
||||
if (info != versionInfo) {
|
||||
fieldInfoList.add(info);
|
||||
}
|
||||
}
|
||||
fieldInfoList.add(newVersionInfo);
|
||||
final FieldInfos newFieldInfos = new FieldInfos(fieldInfoList.toArray(new FieldInfo[fieldInfoList.size()]));
|
||||
final NumericDocValues versionValues = new NumericDocValues() {
|
||||
@Override
|
||||
public long get(int index) {
|
||||
return versions.get(index);
|
||||
}
|
||||
};
|
||||
return new FilterAtomicReader(reader) {
|
||||
@Override
|
||||
public FieldInfos getFieldInfos() {
|
||||
return newFieldInfos;
|
||||
}
|
||||
@Override
|
||||
public NumericDocValues getNumericDocValues(String field) throws IOException {
|
||||
if (UidFieldMapper.VERSION.equals(field)) {
|
||||
return versionValues;
|
||||
}
|
||||
return super.getNumericDocValues(field);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
static class IndexUpgraderOneMerge extends OneMerge {
|
||||
|
||||
public IndexUpgraderOneMerge(List<SegmentInfoPerCommit> segments) {
|
||||
super(segments);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AtomicReader> getMergeReaders() throws IOException {
|
||||
final List<AtomicReader> readers = super.getMergeReaders();
|
||||
ImmutableList.Builder<AtomicReader> newReaders = ImmutableList.builder();
|
||||
for (AtomicReader reader : readers) {
|
||||
newReaders.add(filter(reader));
|
||||
}
|
||||
return newReaders.build();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static class IndexUpgraderMergeSpecification extends MergeSpecification {
|
||||
|
||||
@Override
|
||||
public void add(OneMerge merge) {
|
||||
super.add(new IndexUpgraderOneMerge(merge.segments));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String segString(Directory dir) {
|
||||
return "IndexUpgraderMergeSpec[" + super.segString(dir) + "]";
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static MergeSpecification upgradedMergeSpecification(MergeSpecification spec) {
|
||||
if (spec == null) {
|
||||
return null;
|
||||
}
|
||||
MergeSpecification upgradedSpec = new IndexUpgraderMergeSpecification();
|
||||
for (OneMerge merge : spec.merges) {
|
||||
upgradedSpec.add(merge);
|
||||
}
|
||||
return upgradedSpec;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MergeSpecification findMerges(MergeTrigger mergeTrigger,
|
||||
SegmentInfos segmentInfos) throws IOException {
|
||||
return upgradedMergeSpecification(delegate.findMerges(mergeTrigger, segmentInfos));
|
||||
}
|
||||
|
||||
@Override
|
||||
public MergeSpecification findForcedMerges(SegmentInfos segmentInfos,
|
||||
int maxSegmentCount, Map<SegmentInfoPerCommit,Boolean> segmentsToMerge)
|
||||
throws IOException {
|
||||
return upgradedMergeSpecification(delegate.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge));
|
||||
}
|
||||
|
||||
@Override
|
||||
public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos)
|
||||
throws IOException {
|
||||
return upgradedMergeSpecification(delegate.findForcedDeletesMerges(segmentInfos));
|
||||
}
|
||||
|
||||
@Override
|
||||
public MergePolicy clone() {
|
||||
return new IndexUpgraderMergePolicy(delegate.clone());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
delegate.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean useCompoundFile(SegmentInfos segments,
|
||||
SegmentInfoPerCommit newSegment) throws IOException {
|
||||
return delegate.useCompoundFile(segments, newSegment);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setIndexWriter(IndexWriter writer) {
|
||||
delegate.setIndexWriter(writer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getClass().getSimpleName() + "(" + delegate + ")";
|
||||
}
|
||||
|
||||
}
|
|
@ -304,7 +304,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
|||
long startTime = System.nanoTime();
|
||||
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type());
|
||||
ParsedDocument doc = docMapper.parse(source);
|
||||
return new Engine.Create(docMapper, docMapper.uidMapper().term(doc.uid().uid()), doc).startTime(startTime);
|
||||
return new Engine.Create(docMapper, docMapper.uidMapper().term(doc.uid().stringValue()), doc).startTime(startTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -325,7 +325,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
|||
long startTime = System.nanoTime();
|
||||
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type());
|
||||
ParsedDocument doc = docMapper.parse(source);
|
||||
return new Engine.Index(docMapper, docMapper.uidMapper().term(doc.uid().uid()), doc).startTime(startTime);
|
||||
return new Engine.Index(docMapper, docMapper.uidMapper().term(doc.uid().stringValue()), doc).startTime(startTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -36,7 +36,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.lucene.uid.UidField;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
|
@ -234,7 +234,7 @@ public class IndicesTTLService extends AbstractLifecycleComponent<IndicesTTLServ
|
|||
UidAndRoutingFieldsVisitor fieldsVisitor = new UidAndRoutingFieldsVisitor();
|
||||
context.reader().document(doc, fieldsVisitor);
|
||||
Uid uid = fieldsVisitor.uid();
|
||||
long version = UidField.loadVersion(context, new Term(UidFieldMapper.NAME, uid.toBytesRef()));
|
||||
final long version = Versions.loadVersion(context.reader(), new Term(UidFieldMapper.NAME, uid.toBytesRef()));
|
||||
docsToPurge.add(new DocToPurge(uid.type(), uid.id(), version, fieldsVisitor.routing()));
|
||||
} catch (Exception e) {
|
||||
logger.trace("failed to collect doc", e);
|
||||
|
|
|
@ -22,13 +22,14 @@ package org.elasticsearch.search.fetch.version;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.common.lucene.uid.UidField;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
|
||||
import org.elasticsearch.search.SearchParseElement;
|
||||
import org.elasticsearch.search.fetch.FetchSubPhase;
|
||||
import org.elasticsearch.search.internal.InternalSearchHit;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
@ -60,10 +61,16 @@ public class VersionFetchSubPhase implements FetchSubPhase {
|
|||
// it might make sense to cache the TermDocs on a shared fetch context and just skip here)
|
||||
// it is going to mean we work on the high level multi reader and not the lower level reader as is
|
||||
// the case below...
|
||||
long version = UidField.loadVersion(
|
||||
hitContext.readerContext(),
|
||||
new Term(UidFieldMapper.NAME, hitContext.fieldVisitor().uid().toBytesRef())
|
||||
);
|
||||
long version;
|
||||
try {
|
||||
version = Versions.loadVersion(
|
||||
hitContext.readerContext().reader(),
|
||||
new Term(UidFieldMapper.NAME, hitContext.fieldVisitor().uid().toBytesRef())
|
||||
);
|
||||
} catch (IOException e) {
|
||||
throw new ElasticSearchException("Could not query index for _version", e);
|
||||
}
|
||||
|
||||
if (version < 0) {
|
||||
version = -1;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,207 @@
|
|||
/*
|
||||
* Licensed to ElasticSearch and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. ElasticSearch licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.lucene.util;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
import org.apache.lucene.store.Lock;
|
||||
|
||||
import com.google.common.collect.HashMultiset;
|
||||
import com.google.common.collect.Multiset;
|
||||
|
||||
/** A mock {@link Directory} impl that tracks open files. */
|
||||
public class MockDirectoryWrapper extends Directory {
|
||||
|
||||
private final Directory delegate;
|
||||
private final Multiset<String> openFiles;
|
||||
|
||||
public MockDirectoryWrapper(Directory delegate) {
|
||||
this.delegate = delegate;
|
||||
this.openFiles = HashMultiset.create();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
delegate.close();
|
||||
if (!openFiles.isEmpty()) {
|
||||
throw new IllegalStateException("There are still open files!!! " + openFiles);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexOutput createOutput(String name, IOContext context) throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return new MockIndexOutput(name, delegate.createOutput(name, context));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteFile(String name) throws IOException {
|
||||
delegate.deleteFile(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean fileExists(String name) throws IOException {
|
||||
return delegate.fileExists(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long fileLength(String name) throws IOException {
|
||||
return delegate.fileLength(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] listAll() throws IOException {
|
||||
return delegate.listAll();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Lock makeLock(String name) {
|
||||
return delegate.makeLock(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexInput openInput(String name, IOContext context) throws IOException {
|
||||
return new MockIndexInput(name, delegate.openInput(name, context));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sync(Collection<String> files) throws IOException {
|
||||
delegate.sync(files);
|
||||
}
|
||||
|
||||
private class MockIndexInput extends IndexInput {
|
||||
|
||||
private final String name;
|
||||
private final IndexInput delegate;
|
||||
|
||||
protected MockIndexInput(String name, IndexInput delegate) {
|
||||
super(name);
|
||||
this.name = name;
|
||||
openFiles.add(name);
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
try {
|
||||
delegate.close();
|
||||
} finally {
|
||||
openFiles.remove(name);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFilePointer() {
|
||||
return delegate.getFilePointer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long length() {
|
||||
return delegate.length();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void seek(long pointer) throws IOException {
|
||||
delegate.seek(pointer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte readByte() throws IOException {
|
||||
return delegate.readByte();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readBytes(byte[] b, int o, int l) throws IOException {
|
||||
delegate.readBytes(b, o, l);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexInput clone() {
|
||||
// Clones don't need to be closed, nothing to track
|
||||
return delegate.clone();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private class MockIndexOutput extends IndexOutput {
|
||||
|
||||
private final String name;
|
||||
private final IndexOutput delegate;
|
||||
|
||||
MockIndexOutput(String name, IndexOutput delegate) {
|
||||
super();
|
||||
this.name = name;
|
||||
openFiles.add(name);
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
try {
|
||||
delegate.close();
|
||||
} finally {
|
||||
openFiles.remove(name);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() throws IOException {
|
||||
delegate.flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFilePointer() {
|
||||
return delegate.getFilePointer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long length() throws IOException {
|
||||
return delegate.length();
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public void seek(long pointer) throws IOException {
|
||||
delegate.seek(pointer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeByte(byte b) throws IOException {
|
||||
delegate.writeByte(b);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBytes(byte[] b, int o, int l) throws IOException {
|
||||
delegate.writeBytes(b, o, l);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "MockDirectoryWrapper[" + delegate + "]";
|
||||
}
|
||||
|
||||
}
|
|
@ -21,13 +21,14 @@ package org.elasticsearch.benchmark.common.lucene.uidscan;
|
|||
|
||||
import jsr166y.ThreadLocalRandom;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field.Store;
|
||||
import org.apache.lucene.document.NumericDocValuesField;
|
||||
import org.apache.lucene.document.StringField;
|
||||
import org.apache.lucene.index.*;
|
||||
import org.apache.lucene.store.FSDirectory;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.Numbers;
|
||||
import org.elasticsearch.common.StopWatch;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.uid.UidField;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.unit.SizeValue;
|
||||
|
||||
import java.io.File;
|
||||
|
@ -53,12 +54,13 @@ public class LuceneUidScanBenchmark {
|
|||
System.out.println("Indexing " + INDEX_COUNT + " docs...");
|
||||
for (long i = startUid; i < LIMIT; i++) {
|
||||
Document doc = new Document();
|
||||
doc.add(new UidField("_uid", Long.toString(i), i));
|
||||
doc.add(new StringField("_uid", Long.toString(i), Store.NO));
|
||||
doc.add(new NumericDocValuesField("_version", i));
|
||||
writer.addDocument(doc);
|
||||
}
|
||||
System.out.println("Done indexing, took " + watch.stop().lastTaskTime());
|
||||
|
||||
final IndexReader reader = IndexReader.open(writer, true);
|
||||
final IndexReader reader = DirectoryReader.open(writer, true);
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(NUMBER_OF_THREADS);
|
||||
Thread[] threads = new Thread[NUMBER_OF_THREADS];
|
||||
|
@ -69,18 +71,8 @@ public class LuceneUidScanBenchmark {
|
|||
try {
|
||||
for (long i = 0; i < SCAN_COUNT; i++) {
|
||||
long id = startUid + (Math.abs(ThreadLocalRandom.current().nextInt()) % INDEX_COUNT);
|
||||
DocsAndPositionsEnum uid = MultiFields.getTermPositionsEnum(reader,
|
||||
MultiFields.getLiveDocs(reader),
|
||||
"_uid",
|
||||
new BytesRef(Long.toString(id)));
|
||||
uid.nextDoc();
|
||||
uid.nextPosition();
|
||||
if (uid.getPayload() == null) {
|
||||
System.err.println("no payload...");
|
||||
break;
|
||||
}
|
||||
BytesRef payload = uid.getPayload();
|
||||
if (Numbers.bytesToLong(BytesRef.deepCopyOf(payload).bytes) != id) {
|
||||
final long version = Versions.loadVersion(reader, new Term("_uid", Long.toString(id)));
|
||||
if (version != id) {
|
||||
System.err.println("wrong id...");
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -1,90 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.test.unit.common.lucene.uid;
|
||||
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.index.*;
|
||||
import org.apache.lucene.store.RAMDirectory;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.uid.UidField;
|
||||
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
|
||||
import org.hamcrest.MatcherAssert;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class UidFieldTests {
|
||||
|
||||
@Test
|
||||
public void testUidField() throws Exception {
|
||||
IndexWriter writer = new IndexWriter(new RAMDirectory(), new IndexWriterConfig(Lucene.VERSION, Lucene.STANDARD_ANALYZER));
|
||||
|
||||
DirectoryReader directoryReader = DirectoryReader.open(writer, true);
|
||||
AtomicReader atomicReader = SlowCompositeReaderWrapper.wrap(directoryReader);
|
||||
MatcherAssert.assertThat(UidField.loadVersion(atomicReader.getContext(), new Term("_uid", "1")), equalTo(-1l));
|
||||
|
||||
Document doc = new Document();
|
||||
doc.add(new Field("_uid", "1", UidFieldMapper.Defaults.FIELD_TYPE));
|
||||
writer.addDocument(doc);
|
||||
directoryReader = DirectoryReader.openIfChanged(directoryReader);
|
||||
atomicReader = SlowCompositeReaderWrapper.wrap(directoryReader);
|
||||
assertThat(UidField.loadVersion(atomicReader.getContext(), new Term("_uid", "1")), equalTo(-2l));
|
||||
assertThat(UidField.loadDocIdAndVersion(atomicReader.getContext(), new Term("_uid", "1")).version, equalTo(-2l));
|
||||
|
||||
doc = new Document();
|
||||
doc.add(new UidField("_uid", "1", 1));
|
||||
writer.updateDocument(new Term("_uid", "1"), doc);
|
||||
directoryReader = DirectoryReader.openIfChanged(directoryReader);
|
||||
atomicReader = SlowCompositeReaderWrapper.wrap(directoryReader);
|
||||
assertThat(UidField.loadVersion(atomicReader.getContext(), new Term("_uid", "1")), equalTo(1l));
|
||||
assertThat(UidField.loadDocIdAndVersion(atomicReader.getContext(), new Term("_uid", "1")).version, equalTo(1l));
|
||||
|
||||
doc = new Document();
|
||||
UidField uid = new UidField("_uid", "1", 2);
|
||||
doc.add(uid);
|
||||
writer.updateDocument(new Term("_uid", "1"), doc);
|
||||
directoryReader = DirectoryReader.openIfChanged(directoryReader);
|
||||
atomicReader = SlowCompositeReaderWrapper.wrap(directoryReader);
|
||||
assertThat(UidField.loadVersion(atomicReader.getContext(), new Term("_uid", "1")), equalTo(2l));
|
||||
assertThat(UidField.loadDocIdAndVersion(atomicReader.getContext(), new Term("_uid", "1")).version, equalTo(2l));
|
||||
|
||||
// test reuse of uid field
|
||||
doc = new Document();
|
||||
uid.version(3);
|
||||
doc.add(uid);
|
||||
writer.updateDocument(new Term("_uid", "1"), doc);
|
||||
directoryReader = DirectoryReader.openIfChanged(directoryReader);
|
||||
atomicReader = SlowCompositeReaderWrapper.wrap(directoryReader);
|
||||
assertThat(UidField.loadVersion(atomicReader.getContext(), new Term("_uid", "1")), equalTo(3l));
|
||||
assertThat(UidField.loadDocIdAndVersion(atomicReader.getContext(), new Term("_uid", "1")).version, equalTo(3l));
|
||||
|
||||
writer.deleteDocuments(new Term("_uid", "1"));
|
||||
directoryReader = DirectoryReader.openIfChanged(directoryReader);
|
||||
atomicReader = SlowCompositeReaderWrapper.wrap(directoryReader);
|
||||
assertThat(UidField.loadVersion(atomicReader.getContext(), new Term("_uid", "1")), equalTo(-1l));
|
||||
assertThat(UidField.loadDocIdAndVersion(atomicReader.getContext(), new Term("_uid", "1")), nullValue());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,262 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.test.unit.common.lucene.uid;
|
||||
|
||||
import org.apache.lucene.document.NumericDocValuesField;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.analysis.TokenStream;
|
||||
import org.apache.lucene.analysis.core.KeywordAnalyzer;
|
||||
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
|
||||
import org.apache.lucene.analysis.tokenattributes.PayloadAttribute;
|
||||
import org.apache.lucene.document.*;
|
||||
import org.apache.lucene.document.Field.Store;
|
||||
import org.apache.lucene.index.*;
|
||||
import org.apache.lucene.index.FieldInfo.IndexOptions;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.RAMDirectory;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.MockDirectoryWrapper;
|
||||
import org.elasticsearch.common.Numbers;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
|
||||
import org.elasticsearch.index.merge.policy.IndexUpgraderMergePolicy;
|
||||
import org.hamcrest.MatcherAssert;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
public class VersionsTests {
|
||||
|
||||
@Test
|
||||
public void testVersions() throws Exception {
|
||||
IndexWriter writer = new IndexWriter(new RAMDirectory(), new IndexWriterConfig(Lucene.VERSION, Lucene.STANDARD_ANALYZER));
|
||||
|
||||
DirectoryReader directoryReader = DirectoryReader.open(writer, true);
|
||||
MatcherAssert.assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND));
|
||||
|
||||
Document doc = new Document();
|
||||
doc.add(new Field(UidFieldMapper.NAME, "1", UidFieldMapper.Defaults.FIELD_TYPE));
|
||||
writer.addDocument(doc);
|
||||
directoryReader = DirectoryReader.openIfChanged(directoryReader);
|
||||
assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_SET));
|
||||
assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(Versions.NOT_SET));
|
||||
|
||||
doc = new Document();
|
||||
doc.add(new Field(UidFieldMapper.NAME, "1", UidFieldMapper.Defaults.FIELD_TYPE));
|
||||
doc.add(new NumericDocValuesField(UidFieldMapper.VERSION, 1));
|
||||
writer.updateDocument(new Term(UidFieldMapper.NAME, "1"), doc);
|
||||
directoryReader = DirectoryReader.openIfChanged(directoryReader);
|
||||
assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(1l));
|
||||
assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(1l));
|
||||
|
||||
doc = new Document();
|
||||
Field uid = new Field(UidFieldMapper.NAME, "1", UidFieldMapper.Defaults.FIELD_TYPE);
|
||||
Field version = new NumericDocValuesField(UidFieldMapper.VERSION, 2);
|
||||
doc.add(uid);
|
||||
doc.add(version);
|
||||
writer.updateDocument(new Term(UidFieldMapper.NAME, "1"), doc);
|
||||
directoryReader = DirectoryReader.openIfChanged(directoryReader);
|
||||
assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(2l));
|
||||
assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(2l));
|
||||
|
||||
// test reuse of uid field
|
||||
doc = new Document();
|
||||
version.setLongValue(3);
|
||||
doc.add(uid);
|
||||
doc.add(version);
|
||||
writer.updateDocument(new Term(UidFieldMapper.NAME, "1"), doc);
|
||||
directoryReader = DirectoryReader.openIfChanged(directoryReader);
|
||||
assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(3l));
|
||||
assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(3l));
|
||||
|
||||
writer.deleteDocuments(new Term(UidFieldMapper.NAME, "1"));
|
||||
directoryReader = DirectoryReader.openIfChanged(directoryReader);
|
||||
assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND));
|
||||
assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), nullValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNestedDocuments() throws IOException {
|
||||
IndexWriter writer = new IndexWriter(new RAMDirectory(), new IndexWriterConfig(Lucene.VERSION, Lucene.STANDARD_ANALYZER));
|
||||
|
||||
List<Document> docs = new ArrayList<Document>();
|
||||
for (int i = 0; i < 4; ++i) {
|
||||
// Nested
|
||||
Document doc = new Document();
|
||||
doc.add(new Field(UidFieldMapper.NAME, "1", UidFieldMapper.Defaults.NESTED_FIELD_TYPE));
|
||||
docs.add(doc);
|
||||
}
|
||||
// Root
|
||||
Document doc = new Document();
|
||||
doc.add(new Field(UidFieldMapper.NAME, "1", UidFieldMapper.Defaults.FIELD_TYPE));
|
||||
NumericDocValuesField version = new NumericDocValuesField(UidFieldMapper.VERSION, 5L);
|
||||
doc.add(version);
|
||||
docs.add(doc);
|
||||
|
||||
writer.updateDocuments(new Term(UidFieldMapper.NAME, "1"), docs);
|
||||
DirectoryReader directoryReader = DirectoryReader.open(writer, true);
|
||||
assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(5l));
|
||||
assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(5l));
|
||||
|
||||
version.setLongValue(6L);
|
||||
writer.updateDocuments(new Term(UidFieldMapper.NAME, "1"), docs);
|
||||
version.setLongValue(7L);
|
||||
writer.updateDocuments(new Term(UidFieldMapper.NAME, "1"), docs);
|
||||
directoryReader = DirectoryReader.openIfChanged(directoryReader);
|
||||
assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(7l));
|
||||
assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(7l));
|
||||
|
||||
writer.deleteDocuments(new Term(UidFieldMapper.NAME, "1"));
|
||||
directoryReader = DirectoryReader.openIfChanged(directoryReader);
|
||||
assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND));
|
||||
assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), nullValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBackwardCompatibility() throws IOException {
|
||||
IndexWriter writer = new IndexWriter(new RAMDirectory(), new IndexWriterConfig(Lucene.VERSION, Lucene.STANDARD_ANALYZER));
|
||||
|
||||
DirectoryReader directoryReader = DirectoryReader.open(writer, true);
|
||||
MatcherAssert.assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND));
|
||||
|
||||
Document doc = new Document();
|
||||
UidField uidAndVersion = new UidField("1", 1L);
|
||||
doc.add(uidAndVersion);
|
||||
writer.addDocument(doc);
|
||||
|
||||
uidAndVersion.uid = "2";
|
||||
uidAndVersion.version = 2;
|
||||
writer.addDocument(doc);
|
||||
writer.commit();
|
||||
|
||||
directoryReader = DirectoryReader.openIfChanged(directoryReader);
|
||||
assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(1l));
|
||||
assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "2")), equalTo(2l));
|
||||
assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "3")), equalTo(Versions.NOT_FOUND));
|
||||
}
|
||||
|
||||
// This is how versions used to be encoded
|
||||
private static class UidField extends Field {
|
||||
private static final FieldType FIELD_TYPE = new FieldType();
|
||||
static {
|
||||
FIELD_TYPE.setTokenized(true);
|
||||
FIELD_TYPE.setIndexed(true);
|
||||
FIELD_TYPE.setIndexOptions(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS);
|
||||
FIELD_TYPE.setStored(true);
|
||||
FIELD_TYPE.freeze();
|
||||
}
|
||||
String uid;
|
||||
long version;
|
||||
UidField(String uid, long version) {
|
||||
super(UidFieldMapper.NAME, uid, FIELD_TYPE);
|
||||
this.uid = uid;
|
||||
this.version = version;
|
||||
}
|
||||
@Override
|
||||
public TokenStream tokenStream(Analyzer analyzer) throws IOException {
|
||||
return new TokenStream() {
|
||||
boolean finished = true;
|
||||
final CharTermAttribute term = addAttribute(CharTermAttribute.class);
|
||||
final PayloadAttribute payload = addAttribute(PayloadAttribute.class);
|
||||
@Override
|
||||
public boolean incrementToken() throws IOException {
|
||||
if (finished) {
|
||||
return false;
|
||||
}
|
||||
term.setEmpty().append(uid);
|
||||
payload.setPayload(new BytesRef(Numbers.longToBytes(version)));
|
||||
finished = true;
|
||||
return true;
|
||||
}
|
||||
@Override
|
||||
public void reset() throws IOException {
|
||||
finished = false;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergingOldIndices() throws Exception {
|
||||
final IndexWriterConfig iwConf = new IndexWriterConfig(Lucene.VERSION, new KeywordAnalyzer());
|
||||
iwConf.setMergePolicy(new IndexUpgraderMergePolicy(iwConf.getMergePolicy()));
|
||||
final Directory dir = new MockDirectoryWrapper(new RAMDirectory());
|
||||
final IndexWriter iw = new IndexWriter(dir, iwConf);
|
||||
|
||||
// 1st segment, no _version
|
||||
Document document = new Document();
|
||||
StringField uid = new StringField(UidFieldMapper.NAME, "1", Store.YES);
|
||||
document.add(uid);
|
||||
iw.addDocument(document);
|
||||
uid.setStringValue("2");
|
||||
iw.addDocument(document);
|
||||
iw.commit();
|
||||
|
||||
// 2nd segment, old layout
|
||||
document = new Document();
|
||||
UidField uidAndVersion = new UidField("3", 3L);
|
||||
document.add(uidAndVersion);
|
||||
iw.addDocument(document);
|
||||
uidAndVersion.uid = "4";
|
||||
uidAndVersion.version = 4L;
|
||||
iw.addDocument(document);
|
||||
iw.commit();
|
||||
|
||||
// 3rd segment new layout
|
||||
document = new Document();
|
||||
uid.setStringValue("5");
|
||||
Field version = new NumericDocValuesField(UidFieldMapper.VERSION, 5L);
|
||||
document.add(uid);
|
||||
document.add(version);
|
||||
iw.addDocument(document);
|
||||
uid.setStringValue("6");
|
||||
version.setLongValue(6L);
|
||||
iw.addDocument(document);
|
||||
iw.commit();
|
||||
|
||||
final Map<String, Long> expectedVersions = ImmutableMap.<String, Long>builder()
|
||||
.put("1", 0L).put("2", 0L).put("3", 0L).put("4", 4L).put("5", 5L).put("6", 6L).build();
|
||||
|
||||
// Force merge and check versions
|
||||
iw.forceMerge(1);
|
||||
final AtomicReader ir = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(iw.getDirectory()));
|
||||
final NumericDocValues versions = ir.getNumericDocValues(UidFieldMapper.VERSION);
|
||||
assertThat(versions, notNullValue());
|
||||
for (int i = 0; i < ir.maxDoc(); ++i) {
|
||||
final String uidValue = ir.document(i).get(UidFieldMapper.NAME);
|
||||
final long expectedVersion = expectedVersions.get(uidValue);
|
||||
assertThat(versions.get(i), equalTo(expectedVersion));
|
||||
}
|
||||
|
||||
iw.close();
|
||||
assertThat(IndexWriter.isLocked(iw.getDirectory()), is(false));
|
||||
ir.close();
|
||||
dir.close();
|
||||
}
|
||||
}
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.test.unit.index.engine;
|
|||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.document.NumericDocValuesField;
|
||||
import org.apache.lucene.document.TextField;
|
||||
import org.apache.lucene.index.IndexDeletionPolicy;
|
||||
import org.apache.lucene.index.Term;
|
||||
|
@ -29,7 +30,6 @@ import org.apache.lucene.search.TermQuery;
|
|||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.uid.UidField;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy;
|
||||
|
@ -38,6 +38,7 @@ import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
|
|||
import org.elasticsearch.index.engine.*;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
|
||||
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
|
||||
import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider;
|
||||
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
|
||||
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
|
||||
|
@ -125,9 +126,10 @@ public abstract class AbstractSimpleEngineTests {
|
|||
|
||||
|
||||
private ParsedDocument testParsedDocument(String uid, String id, String type, String routing, long timestamp, long ttl, Document document, Analyzer analyzer, BytesReference source, boolean mappingsModified) {
|
||||
UidField uidField = new UidField("_uid", uid, 0);
|
||||
Field uidField = new Field("_uid", uid, UidFieldMapper.Defaults.FIELD_TYPE);
|
||||
Field versionField = new NumericDocValuesField("_version", 0);
|
||||
document.add(uidField);
|
||||
return new ParsedDocument(uidField, id, type, routing, timestamp, ttl, Arrays.asList(document), analyzer, source, mappingsModified);
|
||||
return new ParsedDocument(uidField, versionField, id, type, routing, timestamp, ttl, Arrays.asList(document), analyzer, source, mappingsModified);
|
||||
}
|
||||
|
||||
protected Store createStore() throws IOException {
|
||||
|
|
Loading…
Reference in New Issue