From 7552b69b1f3e06248edee06221ab9e7655107d67 Mon Sep 17 00:00:00 2001 From: mikemccand Date: Sat, 31 May 2014 17:38:48 -0400 Subject: [PATCH] Core: reuse Lucene's TermsEnum for faster _uid/version lookup during Reusing Lucene's TermsEnum for _uid/version lookups gives a small indexing (updates) speedup and brings us a closer to not having to spend RAM on bloom filters. Closes #6212 --- .../uid/PerThreadIDAndVersionLookup.java | 150 ++++++++++++++++++ .../common/lucene/uid/Versions.java | 118 ++++++-------- .../codec/postingsformat/PostingFormats.java | 2 +- 3 files changed, 201 insertions(+), 69 deletions(-) create mode 100644 src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDAndVersionLookup.java diff --git a/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDAndVersionLookup.java b/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDAndVersionLookup.java new file mode 100644 index 00000000000..7508463ae71 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDAndVersionLookup.java @@ -0,0 +1,150 @@ +package org.elasticsearch.common.lucene.uid; + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +import org.apache.lucene.index.AtomicReaderContext; +import org.apache.lucene.index.DocsAndPositionsEnum; +import org.apache.lucene.index.DocsEnum; +import org.apache.lucene.index.Fields; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.CollectionUtil; +import org.elasticsearch.common.Numbers; +import org.elasticsearch.common.lucene.uid.Versions.DocIdAndVersion; +import org.elasticsearch.index.mapper.internal.UidFieldMapper; +import org.elasticsearch.index.mapper.internal.VersionFieldMapper; + + +/** Utility class to do efficient primary-key (only 1 doc contains the + * given term) lookups by segment, re-using the enums. This class is + * not thread safe, so it is the caller's job to create and use one + * instance of this per thread. Do not use this if a term may appear + * in more than one document! It will only return the first one it + * finds. */ + +final class PerThreadIDAndVersionLookup { + + private final AtomicReaderContext[] readerContexts; + private final TermsEnum[] termsEnums; + private final DocsEnum[] docsEnums; + // Only used for back compat, to lookup a version from payload: + private final DocsAndPositionsEnum[] posEnums; + private final Bits[] liveDocs; + private final NumericDocValues[] versions; + private final int numSegs; + private final boolean hasDeletions; + private final boolean[] hasPayloads; + + public PerThreadIDAndVersionLookup(IndexReader r) throws IOException { + + List leaves = new ArrayList<>(r.leaves()); + + readerContexts = leaves.toArray(new AtomicReaderContext[leaves.size()]); + termsEnums = new TermsEnum[leaves.size()]; + docsEnums = new DocsEnum[leaves.size()]; + posEnums = new DocsAndPositionsEnum[leaves.size()]; + liveDocs = new Bits[leaves.size()]; + versions = new NumericDocValues[leaves.size()]; + hasPayloads = new boolean[leaves.size()]; + int numSegs = 0; + boolean hasDeletions = false; + // iterate backwards to optimize for the frequently updated documents + // which are likely to be in the last segments + for(int i=leaves.size()-1;i>=0;i--) { + AtomicReaderContext readerContext = leaves.get(i); + Fields fields = readerContext.reader().fields(); + if (fields != null) { + Terms terms = fields.terms(UidFieldMapper.NAME); + if (terms != null) { + readerContexts[numSegs] = readerContext; + hasPayloads[numSegs] = terms.hasPayloads(); + termsEnums[numSegs] = terms.iterator(null); + assert termsEnums[numSegs] != null; + liveDocs[numSegs] = readerContext.reader().getLiveDocs(); + hasDeletions |= readerContext.reader().hasDeletions(); + versions[numSegs] = readerContext.reader().getNumericDocValues(VersionFieldMapper.NAME); + numSegs++; + } + } + } + this.numSegs = numSegs; + this.hasDeletions = hasDeletions; + } + + /** Return null if id is not found. */ + public DocIdAndVersion lookup(BytesRef id) throws IOException { + for(int seg=0;seg> lookupStates = new ConcurrentHashMap<>(); + + // Evict this reader from lookupStates once it's closed: + private static final ReaderClosedListener removeLookupState = new ReaderClosedListener() { + @Override + public void onClose(IndexReader reader) { + CloseableThreadLocal ctl = lookupStates.remove(reader); + if (ctl != null) { + ctl.close(); + } + } + }; + + private static PerThreadIDAndVersionLookup getLookupState(IndexReader reader) throws IOException { + CloseableThreadLocal ctl = lookupStates.get(reader); + if (ctl == null) { + // First time we are seeing this reader; make a + // new CTL: + ctl = new CloseableThreadLocal(); + CloseableThreadLocal other = lookupStates.putIfAbsent(reader, ctl); + if (other == null) { + // Our CTL won, we must remove it when the + // reader is closed: + reader.addReaderClosedListener(removeLookupState); + } else { + // Another thread beat us to it: just use + // their CTL: + ctl = other; + } + } + + PerThreadIDAndVersionLookup lookupState = ctl.get(); + if (lookupState == null) { + lookupState = new PerThreadIDAndVersionLookup(reader); + ctl.set(lookupState); + } + + return lookupState; + } + public static void writeVersion(long version, StreamOutput out) throws IOException { if (out.getVersion().before(Version.V_1_2_0) && version == MATCH_ANY) { // we have to send out a value the node will understand @@ -105,17 +150,8 @@ public class Versions { * */ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException { - // iterate backwards to optimize for the frequently updated documents - // which are likely to be in the last segments - final List leaves = reader.leaves(); - for (int i = leaves.size() - 1; i >= 0; --i) { - final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(leaves.get(i), term); - if (docIdAndVersion != null) { - assert docIdAndVersion.version != NOT_FOUND; - return docIdAndVersion; - } - } - return null; + assert term.field().equals(UidFieldMapper.NAME); + return getLookupState(reader).lookup(term.bytes()); } /** @@ -129,58 +165,4 @@ public class Versions { final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term); return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version; } - - /** Same as {@link #loadDocIdAndVersion(IndexReader, Term)} but operates directly on a reader context. */ - public static DocIdAndVersion loadDocIdAndVersion(AtomicReaderContext readerContext, Term term) throws IOException { - assert term.field().equals(UidFieldMapper.NAME); - final AtomicReader reader = readerContext.reader(); - final Bits liveDocs = reader.getLiveDocs(); - final Terms terms = reader.terms(UidFieldMapper.NAME); - assert terms != null : "All segments must have a _uid field, but " + reader + " doesn't"; - final TermsEnum termsEnum = terms.iterator(null); - if (!termsEnum.seekExact(term.bytes())) { - return null; - } - - // Versions are stored as doc values... - final NumericDocValues versions = reader.getNumericDocValues(VersionFieldMapper.NAME); - if (versions != null || !terms.hasPayloads()) { - // only the last doc that matches the _uid is interesting here: if it is deleted, then there is - // no match otherwise previous docs are necessarily either deleted or nested docs - final DocsEnum docs = termsEnum.docs(null, null); - int docID = DocsEnum.NO_MORE_DOCS; - for (int d = docs.nextDoc(); d != DocsEnum.NO_MORE_DOCS; d = docs.nextDoc()) { - docID = d; - } - assert docID != DocsEnum.NO_MORE_DOCS; // would mean that the term exists but has no match at all - if (liveDocs != null && !liveDocs.get(docID)) { - return null; - } else if (versions != null) { - return new DocIdAndVersion(docID, versions.get(docID), readerContext); - } else { - // _uid found, but no doc values and no payloads - return new DocIdAndVersion(docID, NOT_SET, readerContext); - } - } - - // ... but used to be stored as payloads - final DocsAndPositionsEnum dpe = termsEnum.docsAndPositions(liveDocs, null, DocsAndPositionsEnum.FLAG_PAYLOADS); - assert dpe != null; // terms has payloads - int docID = DocsEnum.NO_MORE_DOCS; - for (int d = dpe.nextDoc(); d != DocsEnum.NO_MORE_DOCS; d = dpe.nextDoc()) { - docID = d; - dpe.nextPosition(); - final BytesRef payload = dpe.getPayload(); - if (payload != null && payload.length == 8) { - return new DocIdAndVersion(d, Numbers.bytesToLong(payload), readerContext); - } - } - - if (docID == DocsEnum.NO_MORE_DOCS) { - return null; - } else { - return new DocIdAndVersion(docID, NOT_SET, readerContext); - } - } - } diff --git a/src/main/java/org/elasticsearch/index/codec/postingsformat/PostingFormats.java b/src/main/java/org/elasticsearch/index/codec/postingsformat/PostingFormats.java index 86d56648eaf..920d4e4d479 100644 --- a/src/main/java/org/elasticsearch/index/codec/postingsformat/PostingFormats.java +++ b/src/main/java/org/elasticsearch/index/codec/postingsformat/PostingFormats.java @@ -67,7 +67,7 @@ public class PostingFormats { for (String luceneName : PostingsFormat.availablePostingsFormats()) { buildInPostingFormatsX.put(luceneName, new PreBuiltPostingsFormatProvider.Factory(PostingsFormat.forName(luceneName))); } - final Elasticsearch090PostingsFormat defaultFormat = new Elasticsearch090PostingsFormat(); + final PostingsFormat defaultFormat = new Elasticsearch090PostingsFormat(); buildInPostingFormatsX.put("direct", new PreBuiltPostingsFormatProvider.Factory("direct", PostingsFormat.forName("Direct"))); buildInPostingFormatsX.put("memory", new PreBuiltPostingsFormatProvider.Factory("memory", PostingsFormat.forName("Memory"))); // LUCENE UPGRADE: Need to change this to the relevant ones on a lucene upgrade