Add more pickyness to index warming and searcher wrappping

this commit also fixes a bug where we wramed a leaf reader in a top level context
which caused atomic segment readers to be used in our top level caches.
This commit is contained in:
Simon Willnauer 2015-10-13 17:17:17 +02:00
parent dac1799305
commit e3f00e302c
6 changed files with 46 additions and 18 deletions

View File

@ -74,6 +74,12 @@ public final class ElasticsearchDirectoryReader extends FilterDirectoryReader {
}
}
/**
* Adds the given listener to the provided directory reader. The reader must contain an {@link ElasticsearchDirectoryReader} in it's hierarchy
* otherwise we can't safely install the listener.
*
* @throws IllegalArgumentException if the reader doesn't contain an {@link ElasticsearchDirectoryReader} in it's hierarchy
*/
@SuppressForbidden(reason = "This is the only sane way to add a ReaderClosedListener")
public static void addReaderCloseListener(DirectoryReader reader, IndexReader.ReaderClosedListener listener) {
ElasticsearchDirectoryReader elasticsearchDirectoryReader = getElasticsearchDirectoryReader(reader);
@ -82,9 +88,12 @@ public final class ElasticsearchDirectoryReader extends FilterDirectoryReader {
elasticsearchDirectoryReader.addReaderClosedListener(listener);
return;
}
throw new IllegalStateException("Can't install close listener reader is not an ElasticsearchDirectoryReader/ElasticsearchLeafReader");
throw new IllegalArgumentException("Can't install close listener reader is not an ElasticsearchDirectoryReader/ElasticsearchLeafReader");
}
/**
* Tries to unwrap the given reader until the first {@link ElasticsearchDirectoryReader} instance is found or <code>null</code> if no instance is found;
*/
public static ElasticsearchDirectoryReader getElasticsearchDirectoryReader(DirectoryReader reader) {
if (reader instanceof FilterDirectoryReader) {
if (reader instanceof ElasticsearchDirectoryReader) {

View File

@ -18,10 +18,7 @@
*/
package org.elasticsearch.common.lucene.index;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.*;
import org.elasticsearch.index.shard.ShardId;
/**
@ -38,7 +35,7 @@ public final class ElasticsearchLeafReader extends FilterLeafReader {
*
* @param in specified base reader.
*/
ElasticsearchLeafReader(LeafReader in, ShardId shardId) {
public ElasticsearchLeafReader(LeafReader in, ShardId shardId) {
super(in);
this.shardId = shardId;
}

View File

@ -39,6 +39,7 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.lucene.index.ElasticsearchLeafReader;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.math.MathUtils;
import org.elasticsearch.common.settings.Settings;
@ -905,9 +906,10 @@ public class InternalEngine extends Engine {
@Override
public void warm(LeafReader reader) throws IOException {
try {
assert isMergedSegment(reader);
LeafReader esLeafReader = new ElasticsearchLeafReader(reader, shardId);
assert isMergedSegment(esLeafReader);
if (warmer != null) {
final Engine.Searcher searcher = new Searcher("warmer", searcherFactory.newSearcher(reader, null));
final Engine.Searcher searcher = new Searcher("warmer", searcherFactory.newSearcher(esLeafReader, null));
final IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, searcher);
warmer.warmNewReaders(context);
}
@ -949,6 +951,9 @@ public class InternalEngine extends Engine {
@Override
public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) throws IOException {
IndexSearcher searcher = super.newSearcher(reader, previousReader);
if (reader instanceof LeafReader && isMergedSegment((LeafReader)reader)) {
return searcher;
}
if (warmer != null) {
// we need to pass a custom searcher that does not release anything on Engine.Search Release,
// we will release explicitly
@ -986,10 +991,11 @@ public class InternalEngine extends Engine {
}
if (newSearcher != null) {
IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, new Searcher("warmer", newSearcher));
IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, new Searcher("new_reader_warming", newSearcher));
warmer.warmNewReaders(context);
}
warmer.warmTopReader(new IndicesWarmer.WarmerContext(shardId, new Searcher("warmer", searcher)));
assert searcher.getIndexReader() instanceof ElasticsearchDirectoryReader : "this class needs an ElasticsearchDirectoryReader but got: " + searcher.getIndexReader().getClass();
warmer.warmTopReader(new IndicesWarmer.WarmerContext(shardId, new Searcher("top_reader_warming", searcher)));
} catch (Throwable e) {
if (isEngineClosed.get() == false) {
logger.warn("failed to prepare/warm", e);

View File

@ -22,6 +22,7 @@ import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.MultiDocValues;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchNoDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.join.JoinUtil;
import org.apache.lucene.search.join.ScoreMode;
@ -288,12 +289,23 @@ public class HasChildQueryBuilder extends AbstractQueryBuilder<HasChildQueryBuil
if (getBoost() != 1.0F) {
return super.rewrite(reader);
}
if (reader instanceof DirectoryReader) {
String joinField = ParentFieldMapper.joinField(parentType);
IndexSearcher indexSearcher = new IndexSearcher(reader);
indexSearcher.setQueryCache(null);
IndexParentChildFieldData indexParentChildFieldData = parentChildIndexFieldData.loadGlobal((DirectoryReader) reader);
MultiDocValues.OrdinalMap ordinalMap = ParentChildIndexFieldData.getOrdinalMap(indexParentChildFieldData, parentType);
return JoinUtil.createJoinQuery(joinField, innerQuery, toQuery, indexSearcher, scoreMode, ordinalMap, minChildren, maxChildren);
} else {
if (reader.leaves().isEmpty() && reader.numDocs() == 0) {
// asserting reader passes down a MultiReader during rewrite which makes this
// blow up since for this query to work we have to have a DirectoryReader otherwise
// we can't load global ordinals - for this to work we simply check if the reader has no leaves
// and rewrite to match nothing
return new MatchNoDocsQuery();
}
throw new IllegalStateException("can't load global ordinals for reader of type: " + reader.getClass() + " must be a DirectoryReader");
}
}
@Override

View File

@ -79,7 +79,7 @@ public class IndexSearcherWrapper {
if (reader == engineSearcher.reader() && indexSearcher == innerIndexSearcher) {
return engineSearcher;
} else {
return new Engine.Searcher(engineSearcher.source(), indexSearcher) {
Engine.Searcher newSearcher = new Engine.Searcher(engineSearcher.source(), indexSearcher) {
@Override
public void close() throws ElasticsearchException {
try {
@ -92,6 +92,9 @@ public class IndexSearcherWrapper {
}
};
// TODO should this be a real exception? this checks that our wrapper doesn't wrap in it's own ElasticsearchDirectoryReader
assert ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(newSearcher.getDirectoryReader()) == elasticsearchDirectoryReader : "Wrapper hides actual ElasticsearchDirectoryReader but shouldn't";
return newSearcher;
}
}

View File

@ -73,7 +73,8 @@ public class IndexSearcherWrapperTests extends ESTestCase {
final AtomicInteger count = new AtomicInteger();
final AtomicInteger outerCount = new AtomicInteger();
try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) {
final Engine.Searcher wrap = wrapper.wrap(ENGINE_CONFIG, engineSearcher);
// sometimes double wrap....
final Engine.Searcher wrap = randomBoolean() ? wrapper.wrap(ENGINE_CONFIG, engineSearcher) : wrapper.wrap(ENGINE_CONFIG, wrapper.wrap(ENGINE_CONFIG, engineSearcher));
assertEquals(1, wrap.reader().getRefCount());
ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), reader -> {
if (reader == open) {