From 5a6fa628443036a0dffca7b3ded9e1660c4c0d7b Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Thu, 15 Jun 2017 10:17:42 +0200 Subject: [PATCH] Speed up PK lookups at index time. (#19856) At index time Elasticsearch needs to look up the version associated with the `_id` of the document that is being indexed, which is often the bottleneck for indexing. While reviewing the output of the `jfr` telemetry from a Rally benchmark, I saw that significant time was spent in `ConcurrentHashMap#get` and `ThreadLocal#get`. The reason is that we cache lookup objects per thread and segment, and for every indexed document, we first need to look up the cache associated with this segment (`ConcurrentHashMap#get`) and then get a state that is local to the current thread (`ThreadLocal#get`). So if you are indexing N documents per second and have S segments, both these methods will be called N*S times per second. This commit changes version lookup to use a cache per index reader rather than per segment. While this makes cache entries live for less long, we now only need to do one call to `ConcurrentHashMap#get` and `ThreadLocal#get` per indexed document. --- .../uid/PerThreadIDVersionAndSeqNoLookup.java | 44 ++++++------ .../lucene/uid/VersionsAndSeqNoResolver.java | 69 +++++++++++-------- .../common/lucene/uid/VersionLookupTests.java | 53 ++++++++------ 3 files changed, 92 insertions(+), 74 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java b/core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java index e8b47783afb..fe26e392d59 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java @@ -43,12 +43,21 @@ import java.io.IOException; * not thread safe, so it is the caller's job to create and use one * instance of this per thread. Do not use this if a term may appear * in more than one document! It will only return the first one it - * finds. */ - + * finds. + * This class uses live docs, so it should be cached based on the + * {@link org.apache.lucene.index.IndexReader#getReaderCacheHelper() reader cache helper} + * rather than the {@link LeafReader#getCoreCacheHelper() core cache helper}. + */ final class PerThreadIDVersionAndSeqNoLookup { // TODO: do we really need to store all this stuff? some if it might not speed up anything. // we keep it around for now, to reduce the amount of e.g. hash lookups by field and stuff + /** The {@link LeafReaderContext} that needs to be looked up. */ + private final LeafReaderContext context; + /** Live docs of the context, cached to avoid the cost of ensureOpen() on every + * segment for every index operation. */ + private final Bits liveDocs; + /** terms enum for uid field */ final String uidField; private final TermsEnum termsEnum; @@ -62,7 +71,10 @@ final class PerThreadIDVersionAndSeqNoLookup { /** * Initialize lookup for the provided segment */ - PerThreadIDVersionAndSeqNoLookup(LeafReader reader, String uidField) throws IOException { + PerThreadIDVersionAndSeqNoLookup(LeafReaderContext context, String uidField) throws IOException { + this.context = context; + final LeafReader reader = context.reader(); + this.liveDocs = reader.getLiveDocs(); this.uidField = uidField; Fields fields = reader.fields(); Terms terms = fields.terms(uidField); @@ -80,11 +92,11 @@ final class PerThreadIDVersionAndSeqNoLookup { } /** Return null if id is not found. */ - public DocIdAndVersion lookupVersion(BytesRef id, Bits liveDocs, LeafReaderContext context) + public DocIdAndVersion lookupVersion(BytesRef id) throws IOException { assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) : "context's reader is not the same as the reader class was initialized on."; - int docID = getDocID(id, liveDocs); + int docID = getDocID(id); if (docID != DocIdSetIterator.NO_MORE_DOCS) { final NumericDocValues versions = context.reader().getNumericDocValues(VersionFieldMapper.NAME); @@ -104,7 +116,7 @@ final class PerThreadIDVersionAndSeqNoLookup { * returns the internal lucene doc id for the given id bytes. * {@link DocIdSetIterator#NO_MORE_DOCS} is returned if not found * */ - private int getDocID(BytesRef id, Bits liveDocs) throws IOException { + private int getDocID(BytesRef id) throws IOException { if (termsEnum.seekExact(id)) { int docID = DocIdSetIterator.NO_MORE_DOCS; // there may be more than one matching docID, in the case of nested docs, so we want the last one: @@ -122,10 +134,8 @@ final class PerThreadIDVersionAndSeqNoLookup { } /** Return null if id is not found. */ - DocIdAndSeqNo lookupSeqNo(BytesRef id, Bits liveDocs, LeafReaderContext context) throws IOException { - assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) : - "context's reader is not the same as the reader class was initialized on."; - int docID = getDocID(id, liveDocs); + DocIdAndSeqNo lookupSeqNo(BytesRef id) throws IOException { + int docID = getDocID(id); if (docID != DocIdSetIterator.NO_MORE_DOCS) { NumericDocValues seqNos = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME); long seqNo; @@ -139,18 +149,4 @@ final class PerThreadIDVersionAndSeqNoLookup { return null; } } - - /** - * returns 0 if the primary term is not found. - * - * Note that 0 is an illegal primary term. See {@link org.elasticsearch.cluster.metadata.IndexMetaData#primaryTerm(int)} - **/ - long lookUpPrimaryTerm(int docID, LeafReader reader) throws IOException { - NumericDocValues primaryTerms = reader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); - if (primaryTerms != null && primaryTerms.advanceExact(docID)) { - return primaryTerms.longValue(); - } else { - return 0; - } - } } diff --git a/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java b/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java index 3cdbfa38b62..1740e7877ac 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java @@ -20,11 +20,12 @@ package org.elasticsearch.common.lucene.uid; import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.Term; import org.apache.lucene.util.CloseableThreadLocal; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; import java.io.IOException; import java.util.List; @@ -36,26 +37,31 @@ import static org.elasticsearch.common.lucene.uid.Versions.NOT_FOUND; /** Utility class to resolve the Lucene doc ID, version, seqNo and primaryTerms for a given uid. */ public final class VersionsAndSeqNoResolver { - static final ConcurrentMap> lookupStates = + static final ConcurrentMap> lookupStates = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); // Evict this reader from lookupStates once it's closed: private static final IndexReader.ClosedListener removeLookupState = key -> { - CloseableThreadLocal ctl = lookupStates.remove(key); + CloseableThreadLocal ctl = lookupStates.remove(key); if (ctl != null) { ctl.close(); } }; - private static PerThreadIDVersionAndSeqNoLookup getLookupState(LeafReader reader, String uidField) throws IOException { - IndexReader.CacheHelper cacheHelper = reader.getCoreCacheHelper(); - CloseableThreadLocal ctl = lookupStates.get(cacheHelper.getKey()); + private static PerThreadIDVersionAndSeqNoLookup[] getLookupState(IndexReader reader, String uidField) throws IOException { + // We cache on the top level + // This means cache entries have a shorter lifetime, maybe as low as 1s with the + // default refresh interval and a steady indexing rate, but on the other hand it + // proved to be cheaper than having to perform a CHM and a TL get for every segment. + // See https://github.com/elastic/elasticsearch/pull/19856. + IndexReader.CacheHelper cacheHelper = reader.getReaderCacheHelper(); + CloseableThreadLocal ctl = lookupStates.get(cacheHelper.getKey()); if (ctl == null) { // First time we are seeing this reader's core; make a new CTL: ctl = new CloseableThreadLocal<>(); - CloseableThreadLocal other = lookupStates.putIfAbsent(cacheHelper.getKey(), ctl); + CloseableThreadLocal other = lookupStates.putIfAbsent(cacheHelper.getKey(), ctl); if (other == null) { - // Our CTL won, we must remove it when the core is closed: + // Our CTL won, we must remove it when the reader is closed: cacheHelper.addClosedListener(removeLookupState); } else { // Another thread beat us to it: just use their CTL: @@ -63,13 +69,22 @@ public final class VersionsAndSeqNoResolver { } } - PerThreadIDVersionAndSeqNoLookup lookupState = ctl.get(); + PerThreadIDVersionAndSeqNoLookup[] lookupState = ctl.get(); if (lookupState == null) { - lookupState = new PerThreadIDVersionAndSeqNoLookup(reader, uidField); + lookupState = new PerThreadIDVersionAndSeqNoLookup[reader.leaves().size()]; + for (LeafReaderContext leaf : reader.leaves()) { + lookupState[leaf.ord] = new PerThreadIDVersionAndSeqNoLookup(leaf, uidField); + } ctl.set(lookupState); - } else if (Objects.equals(lookupState.uidField, uidField) == false) { + } + + if (lookupState.length != reader.leaves().size()) { + throw new AssertionError("Mismatched numbers of leaves: " + lookupState.length + " != " + reader.leaves().size()); + } + + if (lookupState.length > 0 && Objects.equals(lookupState[0].uidField, uidField) == false) { throw new AssertionError("Index does not consistently use the same uid field: [" - + uidField + "] != [" + lookupState.uidField + "]"); + + uidField + "] != [" + lookupState[0].uidField + "]"); } return lookupState; @@ -112,17 +127,13 @@ public final class VersionsAndSeqNoResolver { * */ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException { + PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field()); List leaves = reader.leaves(); - if (leaves.isEmpty()) { - return null; - } // iterate backwards to optimize for the frequently updated documents // which are likely to be in the last segments for (int i = leaves.size() - 1; i >= 0; i--) { - LeafReaderContext context = leaves.get(i); - LeafReader leaf = context.reader(); - PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf, term.field()); - DocIdAndVersion result = lookup.lookupVersion(term.bytes(), leaf.getLiveDocs(), context); + PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaves.get(i).ord]; + DocIdAndVersion result = lookup.lookupVersion(term.bytes()); if (result != null) { return result; } @@ -137,17 +148,13 @@ public final class VersionsAndSeqNoResolver { * */ public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) throws IOException { + PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field()); List leaves = reader.leaves(); - if (leaves.isEmpty()) { - return null; - } // iterate backwards to optimize for the frequently updated documents // which are likely to be in the last segments for (int i = leaves.size() - 1; i >= 0; i--) { - LeafReaderContext context = leaves.get(i); - LeafReader leaf = context.reader(); - PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf, term.field()); - DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf.getLiveDocs(), context); + PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaves.get(i).ord]; + DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes()); if (result != null) { return result; } @@ -159,9 +166,13 @@ public final class VersionsAndSeqNoResolver { * Load the primaryTerm associated with the given {@link DocIdAndSeqNo} */ public static long loadPrimaryTerm(DocIdAndSeqNo docIdAndSeqNo, String uidField) throws IOException { - LeafReader leaf = docIdAndSeqNo.context.reader(); - PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf, uidField); - long result = lookup.lookUpPrimaryTerm(docIdAndSeqNo.docId, leaf); + NumericDocValues primaryTerms = docIdAndSeqNo.context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); + long result; + if (primaryTerms != null && primaryTerms.advanceExact(docIdAndSeqNo.docId)) { + result = primaryTerms.longValue(); + } else { + result = 0; + } assert result > 0 : "should always resolve a primary term for a resolved sequence number. primary_term [" + result + "]" + " docId [" + docIdAndSeqNo.docId + "] seqNo [" + docIdAndSeqNo.seqNo + "]"; return result; diff --git a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java index e8b5220396d..ccede9dea50 100644 --- a/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java +++ b/core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java @@ -26,10 +26,10 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NoMergePolicy; +import org.apache.lucene.index.Term; import org.apache.lucene.store.Directory; -import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.FixedBitSet; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion; import org.elasticsearch.index.mapper.IdFieldMapper; @@ -46,23 +46,31 @@ public class VersionLookupTests extends ESTestCase { */ public void testSimple() throws Exception { Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)); + IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER) + // to have deleted docs + .setMergePolicy(NoMergePolicy.INSTANCE)); Document doc = new Document(); doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE)); doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87)); writer.addDocument(doc); + writer.addDocument(new Document()); DirectoryReader reader = DirectoryReader.open(writer); LeafReaderContext segment = reader.leaves().get(0); - PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader(), IdFieldMapper.NAME); + PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment, IdFieldMapper.NAME); // found doc - DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), null, segment); + DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6")); assertNotNull(result); assertEquals(87, result.version); assertEquals(0, result.docId); // not found doc - assertNull(lookup.lookupVersion(new BytesRef("7"), null, segment)); + assertNull(lookup.lookupVersion(new BytesRef("7"))); // deleted doc - assertNull(lookup.lookupVersion(new BytesRef("6"), new Bits.MatchNoBits(1), segment)); + writer.deleteDocuments(new Term(IdFieldMapper.NAME, "6")); + reader.close(); + reader = DirectoryReader.open(writer); + segment = reader.leaves().get(0); + lookup = new PerThreadIDVersionAndSeqNoLookup(segment, IdFieldMapper.NAME); + assertNull(lookup.lookupVersion(new BytesRef("6"))); reader.close(); writer.close(); dir.close(); @@ -73,36 +81,39 @@ public class VersionLookupTests extends ESTestCase { */ public void testTwoDocuments() throws Exception { Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)); + IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER) + .setMergePolicy(NoMergePolicy.INSTANCE)); Document doc = new Document(); doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE)); doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87)); writer.addDocument(doc); writer.addDocument(doc); + writer.addDocument(new Document()); DirectoryReader reader = DirectoryReader.open(writer); LeafReaderContext segment = reader.leaves().get(0); - PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader(), IdFieldMapper.NAME); + PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment, IdFieldMapper.NAME); // return the last doc when there are duplicates - DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), null, segment); + DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6")); assertNotNull(result); assertEquals(87, result.version); assertEquals(1, result.docId); // delete the first doc only - FixedBitSet live = new FixedBitSet(2); - live.set(1); - result = lookup.lookupVersion(new BytesRef("6"), live, segment); + assertTrue(writer.tryDeleteDocument(reader, 0) >= 0); + reader.close(); + reader = DirectoryReader.open(writer); + segment = reader.leaves().get(0); + lookup = new PerThreadIDVersionAndSeqNoLookup(segment, IdFieldMapper.NAME); + result = lookup.lookupVersion(new BytesRef("6")); assertNotNull(result); assertEquals(87, result.version); assertEquals(1, result.docId); - // delete the second doc only - live.clear(1); - live.set(0); - result = lookup.lookupVersion(new BytesRef("6"), live, segment); - assertNotNull(result); - assertEquals(87, result.version); - assertEquals(0, result.docId); // delete both docs - assertNull(lookup.lookupVersion(new BytesRef("6"), new Bits.MatchNoBits(2), segment)); + assertTrue(writer.tryDeleteDocument(reader, 1) >= 0); + reader.close(); + reader = DirectoryReader.open(writer); + segment = reader.leaves().get(0); + lookup = new PerThreadIDVersionAndSeqNoLookup(segment, IdFieldMapper.NAME); + assertNull(lookup.lookupVersion(new BytesRef("6"))); reader.close(); writer.close(); dir.close();