Make PerThreadIDAndVersionLookup per-segment

Closes #14070

Squashed commit of the following:

commit ad5829e590cf6150763c31886e97d69976b7f02d
Author: Robert Muir <rmuir@apache.org>
Date:   Tue Oct 13 06:50:25 2015 -0400

    Add more tests for version map caching

commit 617333eefd6d4ddc99db491d0843827b1b5936b2
Author: Robert Muir <rmuir@apache.org>
Date:   Mon Oct 12 13:25:51 2015 -0400

    fix javadocs ref

commit 9626dd02e01a2676538e1071e302d882fc870837
Author: Robert Muir <rmuir@apache.org>
Date:   Mon Oct 12 13:23:16 2015 -0400

    make the version map cache per-segment
This commit is contained in:
Robert Muir 2015-10-13 08:34:48 -04:00
parent a550ebf381
commit ceb969a13e
4 changed files with 258 additions and 100 deletions

View File

@ -20,11 +20,9 @@ package org.elasticsearch.common.lucene.uid;
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.lucene.index.Fields;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.PostingsEnum;
@ -47,107 +45,86 @@ import org.elasticsearch.index.mapper.internal.VersionFieldMapper;
* finds. */
final class PerThreadIDAndVersionLookup {
// TODO: do we really need to store all this stuff? some if it might not speed up anything.
// we keep it around for now, to reduce the amount of e.g. hash lookups by field and stuff
private final LeafReaderContext[] readerContexts;
private final TermsEnum[] termsEnums;
private final PostingsEnum[] docsEnums;
// Only used for back compat, to lookup a version from payload:
private final PostingsEnum[] posEnums;
private final Bits[] liveDocs;
private final NumericDocValues[] versions;
private final int numSegs;
private final boolean hasDeletions;
private final boolean[] hasPayloads;
/** terms enum for uid field */
private final TermsEnum termsEnum;
/** _version data */
private final NumericDocValues versions;
/** Only true when versions are indexed as payloads instead of docvalues */
private final boolean hasPayloads;
/** Reused for iteration (when the term exists) */
private PostingsEnum docsEnum;
/** Only used for back compat, to lookup a version from payload */
private PostingsEnum posEnum;
public PerThreadIDAndVersionLookup(IndexReader r) throws IOException {
/**
* Initialize lookup for the provided segment
*/
public PerThreadIDAndVersionLookup(LeafReader reader) throws IOException {
TermsEnum termsEnum = null;
NumericDocValues versions = null;
boolean hasPayloads = false;
List<LeafReaderContext> leaves = new ArrayList<>(r.leaves());
readerContexts = leaves.toArray(new LeafReaderContext[leaves.size()]);
termsEnums = new TermsEnum[leaves.size()];
docsEnums = new PostingsEnum[leaves.size()];
posEnums = new PostingsEnum[leaves.size()];
liveDocs = new Bits[leaves.size()];
versions = new NumericDocValues[leaves.size()];
hasPayloads = new boolean[leaves.size()];
int numSegs = 0;
boolean hasDeletions = false;
// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for(int i=leaves.size()-1;i>=0;i--) {
LeafReaderContext readerContext = leaves.get(i);
Fields fields = readerContext.reader().fields();
if (fields != null) {
Terms terms = fields.terms(UidFieldMapper.NAME);
if (terms != null) {
readerContexts[numSegs] = readerContext;
hasPayloads[numSegs] = terms.hasPayloads();
termsEnums[numSegs] = terms.iterator();
assert termsEnums[numSegs] != null;
liveDocs[numSegs] = readerContext.reader().getLiveDocs();
hasDeletions |= readerContext.reader().hasDeletions();
versions[numSegs] = readerContext.reader().getNumericDocValues(VersionFieldMapper.NAME);
numSegs++;
}
Fields fields = reader.fields();
if (fields != null) {
Terms terms = fields.terms(UidFieldMapper.NAME);
if (terms != null) {
hasPayloads = terms.hasPayloads();
termsEnum = terms.iterator();
assert termsEnum != null;
versions = reader.getNumericDocValues(VersionFieldMapper.NAME);
}
}
this.numSegs = numSegs;
this.hasDeletions = hasDeletions;
this.versions = versions;
this.termsEnum = termsEnum;
this.hasPayloads = hasPayloads;
}
/** Return null if id is not found. */
public DocIdAndVersion lookup(BytesRef id) throws IOException {
for(int seg=0;seg<numSegs;seg++) {
if (termsEnums[seg].seekExact(id)) {
public DocIdAndVersion lookup(BytesRef id, Bits liveDocs, LeafReaderContext context) throws IOException {
if (termsEnum.seekExact(id)) {
if (versions != null || hasPayloads == false) {
// Use NDV to retrieve the version, in which case we only need PostingsEnum:
NumericDocValues segVersions = versions[seg];
if (segVersions != null || hasPayloads[seg] == false) {
// Use NDV to retrieve the version, in which case we only need PostingsEnum:
// there may be more than one matching docID, in the case of nested docs, so we want the last one:
PostingsEnum docs = docsEnums[seg] = termsEnums[seg].postings(docsEnums[seg], 0);
final Bits liveDocs = this.liveDocs[seg];
int docID = DocIdSetIterator.NO_MORE_DOCS;
for (int d = docs.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docs.nextDoc()) {
if (liveDocs != null && liveDocs.get(d) == false) {
continue;
}
docID = d;
}
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
if (segVersions != null) {
return new DocIdAndVersion(docID, segVersions.get(docID), readerContexts[seg]);
} else {
// _uid found, but no doc values and no payloads
return new DocIdAndVersion(docID, Versions.NOT_SET, readerContexts[seg]);
}
} else {
assert hasDeletions;
continue;
}
}
// ... but used to be stored as payloads; in this case we must use PostingsEnum
PostingsEnum dpe = posEnums[seg] = termsEnums[seg].postings(posEnums[seg], PostingsEnum.PAYLOADS);
assert dpe != null; // terms has payloads
final Bits liveDocs = this.liveDocs[seg];
for (int d = dpe.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = dpe.nextDoc()) {
// there may be more than one matching docID, in the case of nested docs, so we want the last one:
docsEnum = termsEnum.postings(docsEnum, 0);
int docID = DocIdSetIterator.NO_MORE_DOCS;
for (int d = docsEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) {
if (liveDocs != null && liveDocs.get(d) == false) {
continue;
}
dpe.nextPosition();
final BytesRef payload = dpe.getPayload();
if (payload != null && payload.length == 8) {
// TODO: does this break the nested docs case? we are not returning the last matching docID here?
return new DocIdAndVersion(d, Numbers.bytesToLong(payload), readerContexts[seg]);
docID = d;
}
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
if (versions != null) {
return new DocIdAndVersion(docID, versions.get(docID), context);
} else {
// _uid found, but no doc values and no payloads
return new DocIdAndVersion(docID, Versions.NOT_SET, context);
}
}
}
// ... but used to be stored as payloads; in this case we must use PostingsEnum
posEnum = termsEnum.postings(posEnum, PostingsEnum.PAYLOADS);
assert posEnum != null; // terms has payloads
for (int d = posEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = posEnum.nextDoc()) {
if (liveDocs != null && liveDocs.get(d) == false) {
continue;
}
posEnum.nextPosition();
final BytesRef payload = posEnum.getPayload();
if (payload != null && payload.length == 8) {
// TODO: does this break the nested docs case? we are not returning the last matching docID here?
return new DocIdAndVersion(d, Numbers.bytesToLong(payload), context);
}
}
}
return null;
}
// TODO: add reopen method to carry over re-used enums...?
}

View File

@ -20,7 +20,8 @@
package org.elasticsearch.common.lucene.uid;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexReader.ReaderClosedListener;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReader.CoreClosedListener;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.Term;
import org.apache.lucene.util.CloseableThreadLocal;
@ -28,6 +29,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
/** Utility class to resolve the Lucene doc ID and version for a given uid. */
@ -41,7 +43,7 @@ public class Versions {
/**
* used when the document is old and doesn't contain any version information in the index
* see {@link PerThreadIDAndVersionLookup#lookup(org.apache.lucene.util.BytesRef)}
* see {@link PerThreadIDAndVersionLookup#lookup}
*/
public static final long NOT_SET = -2L;
@ -52,30 +54,31 @@ public class Versions {
public static final long MATCH_DELETED = -4L;
// TODO: is there somewhere else we can store these?
private static final ConcurrentMap<IndexReader, CloseableThreadLocal<PerThreadIDAndVersionLookup>> lookupStates = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
static final ConcurrentMap<Object, CloseableThreadLocal<PerThreadIDAndVersionLookup>> lookupStates = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
// Evict this reader from lookupStates once it's closed:
private static final ReaderClosedListener removeLookupState = new ReaderClosedListener() {
private static final CoreClosedListener removeLookupState = new CoreClosedListener() {
@Override
public void onClose(IndexReader reader) {
CloseableThreadLocal<PerThreadIDAndVersionLookup> ctl = lookupStates.remove(reader);
public void onClose(Object key) {
CloseableThreadLocal<PerThreadIDAndVersionLookup> ctl = lookupStates.remove(key);
if (ctl != null) {
ctl.close();
}
}
};
private static PerThreadIDAndVersionLookup getLookupState(IndexReader reader) throws IOException {
CloseableThreadLocal<PerThreadIDAndVersionLookup> ctl = lookupStates.get(reader);
private static PerThreadIDAndVersionLookup getLookupState(LeafReader reader) throws IOException {
Object key = reader.getCoreCacheKey();
CloseableThreadLocal<PerThreadIDAndVersionLookup> ctl = lookupStates.get(key);
if (ctl == null) {
// First time we are seeing this reader; make a
// First time we are seeing this reader's core; make a
// new CTL:
ctl = new CloseableThreadLocal<>();
CloseableThreadLocal<PerThreadIDAndVersionLookup> other = lookupStates.putIfAbsent(reader, ctl);
CloseableThreadLocal<PerThreadIDAndVersionLookup> other = lookupStates.putIfAbsent(key, ctl);
if (other == null) {
// Our CTL won, we must remove it when the
// reader is closed:
reader.addReaderClosedListener(removeLookupState);
// core is closed:
reader.addCoreClosedListener(removeLookupState);
} else {
// Another thread beat us to it: just use
// their CTL:
@ -116,7 +119,22 @@ public class Versions {
*/
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException {
assert term.field().equals(UidFieldMapper.NAME);
return getLookupState(reader).lookup(term.bytes());
List<LeafReaderContext> leaves = reader.leaves();
if (leaves.isEmpty()) {
return null;
}
// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
LeafReaderContext context = leaves.get(i);
LeafReader leaf = context.reader();
PerThreadIDAndVersionLookup lookup = getLookupState(leaf);
DocIdAndVersion result = lookup.lookup(term.bytes(), leaf.getLiveDocs(), context);
if (result != null) {
return result;
}
}
return null;
}
/**

View File

@ -0,0 +1,110 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.lucene.uid;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.uid.Versions.DocIdAndVersion;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.internal.VersionFieldMapper;
import org.elasticsearch.test.ESTestCase;
/**
* test per-segment lookup of version-related datastructures
*/
public class VersionLookupTests extends ESTestCase {
/**
* test version lookup actually works
*/
public void testSimple() throws Exception {
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
Document doc = new Document();
doc.add(new Field(UidFieldMapper.NAME, "6", UidFieldMapper.Defaults.FIELD_TYPE));
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
writer.addDocument(doc);
DirectoryReader reader = DirectoryReader.open(writer, false);
LeafReaderContext segment = reader.leaves().get(0);
PerThreadIDAndVersionLookup lookup = new PerThreadIDAndVersionLookup(segment.reader());
// found doc
DocIdAndVersion result = lookup.lookup(new BytesRef("6"), null, segment);
assertNotNull(result);
assertEquals(87, result.version);
assertEquals(0, result.docId);
// not found doc
assertNull(lookup.lookup(new BytesRef("7"), null, segment));
// deleted doc
assertNull(lookup.lookup(new BytesRef("6"), new Bits.MatchNoBits(1), segment));
reader.close();
writer.close();
dir.close();
}
/**
* test version lookup with two documents matching the ID
*/
public void testTwoDocuments() throws Exception {
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
Document doc = new Document();
doc.add(new Field(UidFieldMapper.NAME, "6", UidFieldMapper.Defaults.FIELD_TYPE));
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
writer.addDocument(doc);
writer.addDocument(doc);
DirectoryReader reader = DirectoryReader.open(writer, false);
LeafReaderContext segment = reader.leaves().get(0);
PerThreadIDAndVersionLookup lookup = new PerThreadIDAndVersionLookup(segment.reader());
// return the last doc when there are duplicates
DocIdAndVersion result = lookup.lookup(new BytesRef("6"), null, segment);
assertNotNull(result);
assertEquals(87, result.version);
assertEquals(1, result.docId);
// delete the first doc only
FixedBitSet live = new FixedBitSet(2);
live.set(1);
result = lookup.lookup(new BytesRef("6"), live, segment);
assertNotNull(result);
assertEquals(87, result.version);
assertEquals(1, result.docId);
// delete the second doc only
live.clear(1);
live.set(0);
result = lookup.lookup(new BytesRef("6"), live, segment);
assertNotNull(result);
assertEquals(87, result.version);
assertEquals(0, result.docId);
// delete both docs
assertNull(lookup.lookup(new BytesRef("6"), new Bits.MatchNoBits(2), segment));
reader.close();
writer.close();
dir.close();
}
}

View File

@ -41,9 +41,11 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.internal.VersionFieldMapper;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.MatcherAssert;
import org.junit.Test;
@ -298,4 +300,55 @@ public class VersionsTests extends ESTestCase {
ir.close();
dir.close();
}
/** Test that version map cache works, is evicted on close, etc */
public void testCache() throws Exception {
int size = Versions.lookupStates.size();
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
Document doc = new Document();
doc.add(new Field(UidFieldMapper.NAME, "6", UidFieldMapper.Defaults.FIELD_TYPE));
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
writer.addDocument(doc);
DirectoryReader reader = DirectoryReader.open(writer, false);
// should increase cache size by 1
assertEquals(87, Versions.loadVersion(reader, new Term(UidFieldMapper.NAME, "6")));
assertEquals(size+1, Versions.lookupStates.size());
// should be cache hit
assertEquals(87, Versions.loadVersion(reader, new Term(UidFieldMapper.NAME, "6")));
assertEquals(size+1, Versions.lookupStates.size());
reader.close();
writer.close();
// core should be evicted from the map
assertEquals(size, Versions.lookupStates.size());
dir.close();
}
/** Test that version map cache behaves properly with a filtered reader */
public void testCacheFilterReader() throws Exception {
int size = Versions.lookupStates.size();
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
Document doc = new Document();
doc.add(new Field(UidFieldMapper.NAME, "6", UidFieldMapper.Defaults.FIELD_TYPE));
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
writer.addDocument(doc);
DirectoryReader reader = DirectoryReader.open(writer, false);
assertEquals(87, Versions.loadVersion(reader, new Term(UidFieldMapper.NAME, "6")));
assertEquals(size+1, Versions.lookupStates.size());
// now wrap the reader
DirectoryReader wrapped = ElasticsearchDirectoryReader.wrap(reader, new ShardId("bogus", 5));
assertEquals(87, Versions.loadVersion(wrapped, new Term(UidFieldMapper.NAME, "6")));
// same size map: core cache key is shared
assertEquals(size+1, Versions.lookupStates.size());
reader.close();
writer.close();
// core should be evicted from the map
assertEquals(size, Versions.lookupStates.size());
dir.close();
}
}