Also make use of the thread local memory reuse for a document being percolated with nested objects.
The memory index will only be reused for the root doc, since most of the times that will be the biggest document.
This commit is contained in:
parent
52d099dfae
commit
502f24d7e4
|
@ -25,6 +25,7 @@ import org.apache.lucene.analysis.TokenStream;
|
|||
import org.apache.lucene.index.*;
|
||||
import org.apache.lucene.index.memory.MemoryIndex;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.util.CloseableThreadLocal;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.mapper.ParseContext;
|
||||
|
@ -32,34 +33,51 @@ import org.elasticsearch.index.mapper.ParsedDocument;
|
|||
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Implementation of {@link PercolatorIndex} that can hold multiple Lucene documents by
|
||||
* opening multiple {@link MemoryIndex} based IndexReaders and wrapping them via a single top level reader.
|
||||
*/
|
||||
class MultiDocumentPercolatorIndex implements PercolatorIndex {
|
||||
|
||||
public MultiDocumentPercolatorIndex() {
|
||||
private final CloseableThreadLocal<MemoryIndex> cache;
|
||||
|
||||
MultiDocumentPercolatorIndex(CloseableThreadLocal<MemoryIndex> cache) {
|
||||
this.cache = cache;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prepare(PercolateContext context, ParsedDocument parsedDocument) {
|
||||
int docCounter = 0;
|
||||
IndexReader[] memoryIndices = new IndexReader[parsedDocument.docs().size()];
|
||||
for (ParseContext.Document d : parsedDocument.docs()) {
|
||||
memoryIndices[docCounter] = indexDoc(d, parsedDocument.analyzer()).createSearcher().getIndexReader();
|
||||
docCounter++;
|
||||
List<ParseContext.Document> docs = parsedDocument.docs();
|
||||
int rootDocIndex = docs.size() - 1;
|
||||
assert rootDocIndex > 0;
|
||||
MemoryIndex rootDocMemoryIndex = null;
|
||||
for (int i = 0; i < docs.size(); i++) {
|
||||
ParseContext.Document d = docs.get(i);
|
||||
MemoryIndex memoryIndex;
|
||||
if (rootDocIndex == i) {
|
||||
// the last doc is always the rootDoc, since that is usually the biggest document it make sense
|
||||
// to reuse the MemoryIndex it uses
|
||||
memoryIndex = rootDocMemoryIndex = cache.get();
|
||||
} else {
|
||||
memoryIndex = new MemoryIndex(true);
|
||||
}
|
||||
memoryIndices[i] = indexDoc(d, parsedDocument.analyzer(), memoryIndex).createSearcher().getIndexReader();
|
||||
}
|
||||
MultiReader mReader = new MultiReader(memoryIndices, true);
|
||||
try {
|
||||
AtomicReader slowReader = SlowCompositeReaderWrapper.wrap(mReader);
|
||||
DocSearcher docSearcher = new DocSearcher(new IndexSearcher(slowReader));
|
||||
DocSearcher docSearcher = new DocSearcher(new IndexSearcher(slowReader), rootDocMemoryIndex);
|
||||
context.initialize(docSearcher, parsedDocument);
|
||||
} catch (IOException e) {
|
||||
throw new ElasticsearchException("Failed to create index for percolator with nested document ", e);
|
||||
}
|
||||
}
|
||||
|
||||
MemoryIndex indexDoc(ParseContext.Document d, Analyzer analyzer) {
|
||||
MemoryIndex memoryIndex = new MemoryIndex(true);
|
||||
MemoryIndex indexDoc(ParseContext.Document d, Analyzer analyzer, MemoryIndex memoryIndex) {
|
||||
for (IndexableField field : d.getFields()) {
|
||||
if (!field.fieldType().indexed() && field.name().equals(UidFieldMapper.NAME)) {
|
||||
continue;
|
||||
|
@ -76,17 +94,14 @@ class MultiDocumentPercolatorIndex implements PercolatorIndex {
|
|||
return memoryIndex;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clean() {
|
||||
// noop
|
||||
}
|
||||
|
||||
private class DocSearcher implements Engine.Searcher {
|
||||
|
||||
private final IndexSearcher searcher;
|
||||
private final MemoryIndex rootDocMemoryIndex;
|
||||
|
||||
private DocSearcher(IndexSearcher searcher) {
|
||||
private DocSearcher(IndexSearcher searcher, MemoryIndex rootDocMemoryIndex) {
|
||||
this.searcher = searcher;
|
||||
this.rootDocMemoryIndex = rootDocMemoryIndex;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -108,6 +123,7 @@ class MultiDocumentPercolatorIndex implements PercolatorIndex {
|
|||
public boolean release() throws ElasticsearchException {
|
||||
try {
|
||||
searcher.getIndexReader().close();
|
||||
rootDocMemoryIndex.reset();
|
||||
} catch (IOException e) {
|
||||
throw new ElasticsearchException("failed to close IndexReader in percolator with nested doc", e);
|
||||
}
|
||||
|
|
|
@ -21,7 +21,9 @@ package org.elasticsearch.percolator;
|
|||
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
|
||||
|
||||
/**
|
||||
* Abstraction on how to index the percolator document.
|
||||
*/
|
||||
interface PercolatorIndex {
|
||||
|
||||
/**
|
||||
|
@ -32,9 +34,4 @@ interface PercolatorIndex {
|
|||
* */
|
||||
void prepare(PercolateContext context, ParsedDocument document);
|
||||
|
||||
/**
|
||||
* Release resources
|
||||
* */
|
||||
void clean();
|
||||
|
||||
}
|
||||
|
|
|
@ -23,8 +23,11 @@ import com.google.common.collect.ImmutableMap;
|
|||
import com.google.common.collect.Lists;
|
||||
import org.apache.lucene.index.AtomicReaderContext;
|
||||
import org.apache.lucene.index.ReaderUtil;
|
||||
import org.apache.lucene.index.memory.ExtendedMemoryIndex;
|
||||
import org.apache.lucene.index.memory.MemoryIndex;
|
||||
import org.apache.lucene.search.*;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.CloseableThreadLocal;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
|
@ -47,6 +50,8 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.text.BytesText;
|
||||
import org.elasticsearch.common.text.StringText;
|
||||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
|
@ -114,6 +119,8 @@ public class PercolatorService extends AbstractComponent {
|
|||
private final SortParseElement sortParseElement;
|
||||
private final ScriptService scriptService;
|
||||
|
||||
private final CloseableThreadLocal<MemoryIndex> cache;
|
||||
|
||||
@Inject
|
||||
public PercolatorService(Settings settings, IndicesService indicesService, CacheRecycler cacheRecycler,
|
||||
PageCacheRecycler pageCacheRecycler, BigArrays bigArrays,
|
||||
|
@ -131,8 +138,15 @@ public class PercolatorService extends AbstractComponent {
|
|||
this.scriptService = scriptService;
|
||||
this.sortParseElement = new SortParseElement();
|
||||
|
||||
single = new SingleDocumentPercolatorIndex(settings);
|
||||
multi = new MultiDocumentPercolatorIndex();
|
||||
final long maxReuseBytes = settings.getAsBytesSize("indices.memory.memory_index.size_per_thread", new ByteSizeValue(1, ByteSizeUnit.MB)).bytes();
|
||||
cache = new CloseableThreadLocal<MemoryIndex>() {
|
||||
@Override
|
||||
protected MemoryIndex initialValue() {
|
||||
return new ExtendedMemoryIndex(true, maxReuseBytes);
|
||||
}
|
||||
};
|
||||
single = new SingleDocumentPercolatorIndex(cache);
|
||||
multi = new MultiDocumentPercolatorIndex(cache);
|
||||
|
||||
percolatorTypes = new ByteObjectOpenHashMap<PercolatorType>(6);
|
||||
percolatorTypes.put(countPercolator.id(), countPercolator);
|
||||
|
@ -385,8 +399,7 @@ public class PercolatorService extends AbstractComponent {
|
|||
}
|
||||
|
||||
public void close() {
|
||||
single.clean();
|
||||
multi.clean();
|
||||
cache.close();;
|
||||
}
|
||||
|
||||
interface PercolatorType {
|
||||
|
|
|
@ -23,32 +23,26 @@ package org.elasticsearch.percolator;
|
|||
import org.apache.lucene.analysis.TokenStream;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.IndexableField;
|
||||
import org.apache.lucene.index.memory.ExtendedMemoryIndex;
|
||||
import org.apache.lucene.index.memory.MemoryIndex;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.util.CloseableThreadLocal;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Implementation of {@link PercolatorIndex} that can only hold a single Lucene document
|
||||
* and is optimized for that
|
||||
*/
|
||||
class SingleDocumentPercolatorIndex implements PercolatorIndex {
|
||||
|
||||
private final CloseableThreadLocal<MemoryIndex> cache;
|
||||
|
||||
public SingleDocumentPercolatorIndex(Settings settings) {
|
||||
final long maxReuseBytes = settings.getAsBytesSize("indices.memory.memory_index.size_per_thread", new ByteSizeValue(1, ByteSizeUnit.MB)).bytes();
|
||||
cache = new CloseableThreadLocal<MemoryIndex>() {
|
||||
@Override
|
||||
protected MemoryIndex initialValue() {
|
||||
return new ExtendedMemoryIndex(true, maxReuseBytes);
|
||||
}
|
||||
};
|
||||
SingleDocumentPercolatorIndex(CloseableThreadLocal<MemoryIndex> cache) {
|
||||
this.cache = cache;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -70,11 +64,6 @@ class SingleDocumentPercolatorIndex implements PercolatorIndex {
|
|||
context.initialize(new DocEngineSearcher(memoryIndex), parsedDocument);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clean() {
|
||||
cache.close();
|
||||
}
|
||||
|
||||
private class DocEngineSearcher implements Engine.Searcher {
|
||||
|
||||
private final IndexSearcher searcher;
|
||||
|
|
Loading…
Reference in New Issue