Core: reuse Lucene's TermsEnum for faster _uid/version lookup during

Reusing Lucene's TermsEnum for _uid/version lookups gives a small
indexing (updates) speedup and brings us a closer to not having
to spend RAM on bloom filters.

Closes #6212
This commit is contained in:
mikemccand 2014-05-31 17:38:48 -04:00
parent f51a09d8f7
commit 7552b69b1f
3 changed files with 201 additions and 69 deletions

View File

@ -0,0 +1,150 @@
package org.elasticsearch.common.lucene.uid;
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.DocsAndPositionsEnum;
import org.apache.lucene.index.DocsEnum;
import org.apache.lucene.index.Fields;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.lucene.uid.Versions.DocIdAndVersion;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.internal.VersionFieldMapper;
/** Utility class to do efficient primary-key (only 1 doc contains the
* given term) lookups by segment, re-using the enums. This class is
* not thread safe, so it is the caller's job to create and use one
* instance of this per thread. Do not use this if a term may appear
* in more than one document! It will only return the first one it
* finds. */
final class PerThreadIDAndVersionLookup {
private final AtomicReaderContext[] readerContexts;
private final TermsEnum[] termsEnums;
private final DocsEnum[] docsEnums;
// Only used for back compat, to lookup a version from payload:
private final DocsAndPositionsEnum[] posEnums;
private final Bits[] liveDocs;
private final NumericDocValues[] versions;
private final int numSegs;
private final boolean hasDeletions;
private final boolean[] hasPayloads;
public PerThreadIDAndVersionLookup(IndexReader r) throws IOException {
List<AtomicReaderContext> leaves = new ArrayList<>(r.leaves());
readerContexts = leaves.toArray(new AtomicReaderContext[leaves.size()]);
termsEnums = new TermsEnum[leaves.size()];
docsEnums = new DocsEnum[leaves.size()];
posEnums = new DocsAndPositionsEnum[leaves.size()];
liveDocs = new Bits[leaves.size()];
versions = new NumericDocValues[leaves.size()];
hasPayloads = new boolean[leaves.size()];
int numSegs = 0;
boolean hasDeletions = false;
// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for(int i=leaves.size()-1;i>=0;i--) {
AtomicReaderContext readerContext = leaves.get(i);
Fields fields = readerContext.reader().fields();
if (fields != null) {
Terms terms = fields.terms(UidFieldMapper.NAME);
if (terms != null) {
readerContexts[numSegs] = readerContext;
hasPayloads[numSegs] = terms.hasPayloads();
termsEnums[numSegs] = terms.iterator(null);
assert termsEnums[numSegs] != null;
liveDocs[numSegs] = readerContext.reader().getLiveDocs();
hasDeletions |= readerContext.reader().hasDeletions();
versions[numSegs] = readerContext.reader().getNumericDocValues(VersionFieldMapper.NAME);
numSegs++;
}
}
}
this.numSegs = numSegs;
this.hasDeletions = hasDeletions;
}
/** Return null if id is not found. */
public DocIdAndVersion lookup(BytesRef id) throws IOException {
for(int seg=0;seg<numSegs;seg++) {
if (termsEnums[seg].seekExact(id)) {
NumericDocValues segVersions = versions[seg];
if (segVersions != null || hasPayloads[seg] == false) {
// Use NDV to retrieve the version, in which case we only need DocsEnum:
// there may be more than one matching docID, in the case of nested docs, so we want the last one:
DocsEnum docs = docsEnums[seg] = termsEnums[seg].docs(liveDocs[seg], docsEnums[seg], 0);
int docID = DocsEnum.NO_MORE_DOCS;
for (int d = docs.nextDoc(); d != DocsEnum.NO_MORE_DOCS; d = docs.nextDoc()) {
docID = d;
}
if (docID != DocsEnum.NO_MORE_DOCS) {
if (segVersions != null) {
return new DocIdAndVersion(docID, segVersions.get(docID), readerContexts[seg]);
} else {
// _uid found, but no doc values and no payloads
return new DocIdAndVersion(docID, Versions.NOT_SET, readerContexts[seg]);
}
} else {
assert hasDeletions;
continue;
}
}
// ... but used to be stored as payloads; in this case we must use DocsAndPositionsEnum
DocsAndPositionsEnum dpe = posEnums[seg] = termsEnums[seg].docsAndPositions(liveDocs[seg], posEnums[seg], DocsAndPositionsEnum.FLAG_PAYLOADS);
assert dpe != null; // terms has payloads
int docID = DocsEnum.NO_MORE_DOCS;
for (int d = dpe.nextDoc(); d != DocsEnum.NO_MORE_DOCS; d = dpe.nextDoc()) {
docID = d;
dpe.nextPosition();
final BytesRef payload = dpe.getPayload();
if (payload != null && payload.length == 8) {
// TODO: does this break the nested docs case? we are not returning the last matching docID here?
return new DocIdAndVersion(d, Numbers.bytesToLong(payload), readerContexts[seg]);
}
}
}
}
return null;
}
// TODO: add reopen method to carry over re-used enums...?
}

View File

@ -19,9 +19,15 @@
package org.elasticsearch.common.lucene.uid; package org.elasticsearch.common.lucene.uid;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.index.*; import org.apache.lucene.index.*;
import org.apache.lucene.index.IndexReader.ReaderClosedListener;
import org.apache.lucene.util.Bits; import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.common.Numbers; import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -29,18 +35,57 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.internal.VersionFieldMapper; import org.elasticsearch.index.mapper.internal.VersionFieldMapper;
import java.io.IOException;
import java.util.List;
/** Utility class to resolve the Lucene doc ID and version for a given uid. */ /** Utility class to resolve the Lucene doc ID and version for a given uid. */
public class Versions { public class Versions {
public static final long MATCH_ANY = -3L; // Version was not specified by the user public static final long MATCH_ANY = -3L; // Version was not specified by the user
// TODO: can we remove this now? rolling upgrades only need to handle prev (not older than that) version...?
// the value for MATCH_ANY before ES 1.2.0 - will be removed // the value for MATCH_ANY before ES 1.2.0 - will be removed
public static final long MATCH_ANY_PRE_1_2_0 = 0L; public static final long MATCH_ANY_PRE_1_2_0 = 0L;
public static final long NOT_FOUND = -1L; public static final long NOT_FOUND = -1L;
public static final long NOT_SET = -2L; public static final long NOT_SET = -2L;
// TODO: is there somewhere else we can store these?
private static final ConcurrentHashMap<IndexReader,CloseableThreadLocal<PerThreadIDAndVersionLookup>> lookupStates = new ConcurrentHashMap<>();
// Evict this reader from lookupStates once it's closed:
private static final ReaderClosedListener removeLookupState = new ReaderClosedListener() {
@Override
public void onClose(IndexReader reader) {
CloseableThreadLocal<PerThreadIDAndVersionLookup> ctl = lookupStates.remove(reader);
if (ctl != null) {
ctl.close();
}
}
};
private static PerThreadIDAndVersionLookup getLookupState(IndexReader reader) throws IOException {
CloseableThreadLocal<PerThreadIDAndVersionLookup> ctl = lookupStates.get(reader);
if (ctl == null) {
// First time we are seeing this reader; make a
// new CTL:
ctl = new CloseableThreadLocal<PerThreadIDAndVersionLookup>();
CloseableThreadLocal<PerThreadIDAndVersionLookup> other = lookupStates.putIfAbsent(reader, ctl);
if (other == null) {
// Our CTL won, we must remove it when the
// reader is closed:
reader.addReaderClosedListener(removeLookupState);
} else {
// Another thread beat us to it: just use
// their CTL:
ctl = other;
}
}
PerThreadIDAndVersionLookup lookupState = ctl.get();
if (lookupState == null) {
lookupState = new PerThreadIDAndVersionLookup(reader);
ctl.set(lookupState);
}
return lookupState;
}
public static void writeVersion(long version, StreamOutput out) throws IOException { public static void writeVersion(long version, StreamOutput out) throws IOException {
if (out.getVersion().before(Version.V_1_2_0) && version == MATCH_ANY) { if (out.getVersion().before(Version.V_1_2_0) && version == MATCH_ANY) {
// we have to send out a value the node will understand // we have to send out a value the node will understand
@ -105,17 +150,8 @@ public class Versions {
* </ul> * </ul>
*/ */
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException { public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException {
// iterate backwards to optimize for the frequently updated documents assert term.field().equals(UidFieldMapper.NAME);
// which are likely to be in the last segments return getLookupState(reader).lookup(term.bytes());
final List<AtomicReaderContext> leaves = reader.leaves();
for (int i = leaves.size() - 1; i >= 0; --i) {
final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(leaves.get(i), term);
if (docIdAndVersion != null) {
assert docIdAndVersion.version != NOT_FOUND;
return docIdAndVersion;
}
}
return null;
} }
/** /**
@ -129,58 +165,4 @@ public class Versions {
final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term); final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term);
return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version; return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version;
} }
/** Same as {@link #loadDocIdAndVersion(IndexReader, Term)} but operates directly on a reader context. */
public static DocIdAndVersion loadDocIdAndVersion(AtomicReaderContext readerContext, Term term) throws IOException {
assert term.field().equals(UidFieldMapper.NAME);
final AtomicReader reader = readerContext.reader();
final Bits liveDocs = reader.getLiveDocs();
final Terms terms = reader.terms(UidFieldMapper.NAME);
assert terms != null : "All segments must have a _uid field, but " + reader + " doesn't";
final TermsEnum termsEnum = terms.iterator(null);
if (!termsEnum.seekExact(term.bytes())) {
return null;
}
// Versions are stored as doc values...
final NumericDocValues versions = reader.getNumericDocValues(VersionFieldMapper.NAME);
if (versions != null || !terms.hasPayloads()) {
// only the last doc that matches the _uid is interesting here: if it is deleted, then there is
// no match otherwise previous docs are necessarily either deleted or nested docs
final DocsEnum docs = termsEnum.docs(null, null);
int docID = DocsEnum.NO_MORE_DOCS;
for (int d = docs.nextDoc(); d != DocsEnum.NO_MORE_DOCS; d = docs.nextDoc()) {
docID = d;
}
assert docID != DocsEnum.NO_MORE_DOCS; // would mean that the term exists but has no match at all
if (liveDocs != null && !liveDocs.get(docID)) {
return null;
} else if (versions != null) {
return new DocIdAndVersion(docID, versions.get(docID), readerContext);
} else {
// _uid found, but no doc values and no payloads
return new DocIdAndVersion(docID, NOT_SET, readerContext);
}
}
// ... but used to be stored as payloads
final DocsAndPositionsEnum dpe = termsEnum.docsAndPositions(liveDocs, null, DocsAndPositionsEnum.FLAG_PAYLOADS);
assert dpe != null; // terms has payloads
int docID = DocsEnum.NO_MORE_DOCS;
for (int d = dpe.nextDoc(); d != DocsEnum.NO_MORE_DOCS; d = dpe.nextDoc()) {
docID = d;
dpe.nextPosition();
final BytesRef payload = dpe.getPayload();
if (payload != null && payload.length == 8) {
return new DocIdAndVersion(d, Numbers.bytesToLong(payload), readerContext);
}
}
if (docID == DocsEnum.NO_MORE_DOCS) {
return null;
} else {
return new DocIdAndVersion(docID, NOT_SET, readerContext);
}
}
} }

View File

@ -67,7 +67,7 @@ public class PostingFormats {
for (String luceneName : PostingsFormat.availablePostingsFormats()) { for (String luceneName : PostingsFormat.availablePostingsFormats()) {
buildInPostingFormatsX.put(luceneName, new PreBuiltPostingsFormatProvider.Factory(PostingsFormat.forName(luceneName))); buildInPostingFormatsX.put(luceneName, new PreBuiltPostingsFormatProvider.Factory(PostingsFormat.forName(luceneName)));
} }
final Elasticsearch090PostingsFormat defaultFormat = new Elasticsearch090PostingsFormat(); final PostingsFormat defaultFormat = new Elasticsearch090PostingsFormat();
buildInPostingFormatsX.put("direct", new PreBuiltPostingsFormatProvider.Factory("direct", PostingsFormat.forName("Direct"))); buildInPostingFormatsX.put("direct", new PreBuiltPostingsFormatProvider.Factory("direct", PostingsFormat.forName("Direct")));
buildInPostingFormatsX.put("memory", new PreBuiltPostingsFormatProvider.Factory("memory", PostingsFormat.forName("Memory"))); buildInPostingFormatsX.put("memory", new PreBuiltPostingsFormatProvider.Factory("memory", PostingsFormat.forName("Memory")));
// LUCENE UPGRADE: Need to change this to the relevant ones on a lucene upgrade // LUCENE UPGRADE: Need to change this to the relevant ones on a lucene upgrade