Merge pull request #14084 from s1monw/close_the_wrapper_only

Streamline top level reader close listeners and forbid general usage
This commit is contained in:
Simon Willnauer 2015-10-14 09:39:30 +02:00
commit 5828796848
35 changed files with 572 additions and 173 deletions

View File

@ -18,10 +18,8 @@
*/
package org.elasticsearch.common.lucene.index;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.*;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
@ -76,4 +74,38 @@ public final class ElasticsearchDirectoryReader extends FilterDirectoryReader {
}
}
/**
* Adds the given listener to the provided directory reader. The reader must contain an {@link ElasticsearchDirectoryReader} in it's hierarchy
* otherwise we can't safely install the listener.
*
* @throws IllegalArgumentException if the reader doesn't contain an {@link ElasticsearchDirectoryReader} in it's hierarchy
*/
@SuppressForbidden(reason = "This is the only sane way to add a ReaderClosedListener")
public static void addReaderCloseListener(DirectoryReader reader, IndexReader.ReaderClosedListener listener) {
ElasticsearchDirectoryReader elasticsearchDirectoryReader = getElasticsearchDirectoryReader(reader);
if (elasticsearchDirectoryReader != null) {
assert reader.getCoreCacheKey() == elasticsearchDirectoryReader.getCoreCacheKey();
elasticsearchDirectoryReader.addReaderClosedListener(listener);
return;
}
throw new IllegalArgumentException("Can't install close listener reader is not an ElasticsearchDirectoryReader/ElasticsearchLeafReader");
}
/**
* Tries to unwrap the given reader until the first {@link ElasticsearchDirectoryReader} instance is found or <code>null</code> if no instance is found;
*/
public static ElasticsearchDirectoryReader getElasticsearchDirectoryReader(DirectoryReader reader) {
if (reader instanceof FilterDirectoryReader) {
if (reader instanceof ElasticsearchDirectoryReader) {
return (ElasticsearchDirectoryReader) reader;
} else {
// We need to use FilterDirectoryReader#getDelegate and not FilterDirectoryReader#unwrap, because
// If there are multiple levels of filtered leaf readers then with the unwrap() method it immediately
// returns the most inner leaf reader and thus skipping of over any other filtered leaf reader that
// may be instance of ElasticsearchLeafReader. This can cause us to miss the shardId.
return getElasticsearchDirectoryReader(((FilterDirectoryReader) reader).getDelegate());
}
}
return null;
}
}

View File

@ -18,10 +18,7 @@
*/
package org.elasticsearch.common.lucene.index;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.*;
import org.elasticsearch.index.shard.ShardId;
/**
@ -38,7 +35,7 @@ public final class ElasticsearchLeafReader extends FilterLeafReader {
*
* @param in specified base reader.
*/
ElasticsearchLeafReader(LeafReader in, ShardId shardId) {
public ElasticsearchLeafReader(LeafReader in, ShardId shardId) {
super(in);
this.shardId = shardId;
}
@ -55,8 +52,18 @@ public final class ElasticsearchLeafReader extends FilterLeafReader {
return in.getCoreCacheKey();
}
@Override
public Object getCombinedCoreAndDeletesKey() {
return in.getCombinedCoreAndDeletesKey();
public static ElasticsearchLeafReader getElasticsearchLeafReader(LeafReader reader) {
if (reader instanceof FilterLeafReader) {
if (reader instanceof ElasticsearchLeafReader) {
return (ElasticsearchLeafReader) reader;
} else {
// We need to use FilterLeafReader#getDelegate and not FilterLeafReader#unwrap, because
// If there are multiple levels of filtered leaf readers then with the unwrap() method it immediately
// returns the most inner leaf reader and thus skipping of over any other filtered leaf reader that
// may be instance of ElasticsearchLeafReader. This can cause us to miss the shardId.
return getElasticsearchLeafReader(((FilterLeafReader) reader).getDelegate());
}
}
return null;
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common.lucene.uid;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReader.CoreClosedListener;

View File

@ -596,6 +596,13 @@ public abstract class Engine implements Closeable {
return searcher.getIndexReader();
}
public DirectoryReader getDirectoryReader() {
if (reader() instanceof DirectoryReader) {
return (DirectoryReader) reader();
}
throw new IllegalStateException("Can't use " + reader().getClass() + " as a directory reader");
}
public IndexSearcher searcher() {
return searcher;
}

View File

@ -39,6 +39,7 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.lucene.index.ElasticsearchLeafReader;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.math.MathUtils;
import org.elasticsearch.common.settings.Settings;
@ -905,9 +906,10 @@ public class InternalEngine extends Engine {
@Override
public void warm(LeafReader reader) throws IOException {
try {
assert isMergedSegment(reader);
LeafReader esLeafReader = new ElasticsearchLeafReader(reader, shardId);
assert isMergedSegment(esLeafReader);
if (warmer != null) {
final Engine.Searcher searcher = new Searcher("warmer", searcherFactory.newSearcher(reader, null));
final Engine.Searcher searcher = new Searcher("warmer", searcherFactory.newSearcher(esLeafReader, null));
final IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, searcher);
warmer.warmNewReaders(context);
}
@ -949,6 +951,12 @@ public class InternalEngine extends Engine {
@Override
public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) throws IOException {
IndexSearcher searcher = super.newSearcher(reader, previousReader);
if (reader instanceof LeafReader && isMergedSegment((LeafReader)reader)) {
// we call newSearcher from the IndexReaderWarmer which warms segments during merging
// in that case the reader is a LeafReader and all we need to do is to build a new Searcher
// and return it since it does it's own warming for that particular reader.
return searcher;
}
if (warmer != null) {
// we need to pass a custom searcher that does not release anything on Engine.Search Release,
// we will release explicitly
@ -986,10 +994,11 @@ public class InternalEngine extends Engine {
}
if (newSearcher != null) {
IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, new Searcher("warmer", newSearcher));
IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, new Searcher("new_reader_warming", newSearcher));
warmer.warmNewReaders(context);
}
warmer.warmTopReader(new IndicesWarmer.WarmerContext(shardId, new Searcher("warmer", searcher)));
assert searcher.getIndexReader() instanceof ElasticsearchDirectoryReader : "this class needs an ElasticsearchDirectoryReader but got: " + searcher.getIndexReader().getClass();
warmer.warmTopReader(new IndicesWarmer.WarmerContext(shardId, new Searcher("top_reader_warming", searcher)));
} catch (Throwable e) {
if (isEngineClosed.get() == false) {
logger.warn("failed to prepare/warm", e);

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.fielddata;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.DocIdSet;
@ -235,11 +236,11 @@ public interface IndexFieldData<FD extends AtomicFieldData> extends IndexCompone
CircuitBreakerService breakerService, MapperService mapperService);
}
public static interface Global<FD extends AtomicFieldData> extends IndexFieldData<FD> {
interface Global<FD extends AtomicFieldData> extends IndexFieldData<FD> {
IndexFieldData<FD> loadGlobal(IndexReader indexReader);
IndexFieldData<FD> loadGlobal(DirectoryReader indexReader);
IndexFieldData<FD> localGlobalDirect(IndexReader indexReader) throws Exception;
IndexFieldData<FD> localGlobalDirect(DirectoryReader indexReader) throws Exception;
}

View File

@ -19,10 +19,10 @@
package org.elasticsearch.index.fielddata;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.shard.ShardId;
@ -33,7 +33,7 @@ public interface IndexFieldDataCache {
<FD extends AtomicFieldData, IFD extends IndexFieldData<FD>> FD load(LeafReaderContext context, IFD indexFieldData) throws Exception;
<FD extends AtomicFieldData, IFD extends IndexFieldData.Global<FD>> IFD load(final IndexReader indexReader, final IFD indexFieldData) throws Exception;
<FD extends AtomicFieldData, IFD extends IndexFieldData.Global<FD>> IFD load(final DirectoryReader indexReader, final IFD indexFieldData) throws Exception;
/**
* Clears all the field data stored cached in on this index.
@ -67,7 +67,7 @@ public interface IndexFieldDataCache {
@Override
@SuppressWarnings("unchecked")
public <FD extends AtomicFieldData, IFD extends IndexFieldData.Global<FD>> IFD load(IndexReader indexReader, IFD indexFieldData) throws Exception {
public <FD extends AtomicFieldData, IFD extends IndexFieldData.Global<FD>> IFD load(DirectoryReader indexReader, IFD indexFieldData) throws Exception {
return (IFD) indexFieldData.localGlobalDirect(indexReader);
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.fielddata;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
@ -33,12 +34,12 @@ public interface IndexOrdinalsFieldData extends IndexFieldData.Global<AtomicOrdi
* potentially from a cache.
*/
@Override
IndexOrdinalsFieldData loadGlobal(IndexReader indexReader);
IndexOrdinalsFieldData loadGlobal(DirectoryReader indexReader);
/**
* Load a global view of the ordinals for the given {@link IndexReader}.
*/
@Override
IndexOrdinalsFieldData localGlobalDirect(IndexReader indexReader) throws Exception;
IndexOrdinalsFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception;
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.fielddata;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
@ -34,12 +35,12 @@ public interface IndexParentChildFieldData extends IndexFieldData.Global<AtomicP
* potentially from a cache.
*/
@Override
IndexParentChildFieldData loadGlobal(IndexReader indexReader);
IndexParentChildFieldData loadGlobal(DirectoryReader indexReader);
/**
* Load a global view of the ordinals for the given {@link IndexReader}.
*/
@Override
IndexParentChildFieldData localGlobalDirect(IndexReader indexReader) throws Exception;
IndexParentChildFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception;
}

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.index.fielddata.ordinals;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.util.Accountable;
@ -59,12 +60,12 @@ public abstract class GlobalOrdinalsIndexFieldData extends AbstractIndexComponen
}
@Override
public IndexOrdinalsFieldData loadGlobal(IndexReader indexReader) {
public IndexOrdinalsFieldData loadGlobal(DirectoryReader indexReader) {
return this;
}
@Override
public IndexOrdinalsFieldData localGlobalDirect(IndexReader indexReader) throws Exception {
public IndexOrdinalsFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception {
return this;
}

View File

@ -59,7 +59,7 @@ public abstract class AbstractIndexOrdinalsFieldData extends AbstractIndexFieldD
}
@Override
public IndexOrdinalsFieldData loadGlobal(IndexReader indexReader) {
public IndexOrdinalsFieldData loadGlobal(DirectoryReader indexReader) {
if (indexReader.leaves().size() <= 1) {
// ordinals are already global
return this;
@ -76,7 +76,7 @@ public abstract class AbstractIndexOrdinalsFieldData extends AbstractIndexFieldD
}
@Override
public IndexOrdinalsFieldData localGlobalDirect(IndexReader indexReader) throws Exception {
public IndexOrdinalsFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception {
return GlobalOrdinalsBuilder.build(indexReader, this, indexSettings, breakerService, logger);
}

View File

@ -19,11 +19,7 @@
package org.elasticsearch.index.fielddata.plain;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.RandomAccessOrds;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.*;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.settings.Settings;
@ -123,12 +119,12 @@ public class IndexIndexFieldData extends AbstractIndexOrdinalsFieldData {
}
@Override
public IndexOrdinalsFieldData loadGlobal(IndexReader indexReader) {
public IndexOrdinalsFieldData loadGlobal(DirectoryReader indexReader) {
return this;
}
@Override
public IndexOrdinalsFieldData localGlobalDirect(IndexReader indexReader) throws Exception {
public IndexOrdinalsFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception {
return loadGlobal(indexReader);
}

View File

@ -135,7 +135,7 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData<AtomicPare
}
@Override
public IndexParentChildFieldData loadGlobal(IndexReader indexReader) {
public IndexParentChildFieldData loadGlobal(DirectoryReader indexReader) {
if (indexReader.leaves().size() <= 1) {
// ordinals are already global
return this;
@ -170,7 +170,7 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData<AtomicPare
}
@Override
public IndexParentChildFieldData localGlobalDirect(IndexReader indexReader) throws Exception {
public IndexParentChildFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception {
final long startTime = System.nanoTime();
long ramBytesUsed = 0;
@ -347,7 +347,7 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData<AtomicPare
}
@Override
public IndexParentChildFieldData loadGlobal(IndexReader indexReader) {
public IndexParentChildFieldData loadGlobal(DirectoryReader indexReader) {
if (indexReader.getCoreCacheKey() == reader.getCoreCacheKey()) {
return this;
}
@ -355,7 +355,7 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData<AtomicPare
}
@Override
public IndexParentChildFieldData localGlobalDirect(IndexReader indexReader) throws Exception {
public IndexParentChildFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception {
return loadGlobal(indexReader);
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.fielddata.plain;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.IndexReader;
import org.elasticsearch.ElasticsearchException;
@ -61,7 +62,7 @@ public class SortedSetDVOrdinalsIndexFieldData extends DocValuesIndexFieldData i
}
@Override
public IndexOrdinalsFieldData loadGlobal(IndexReader indexReader) {
public IndexOrdinalsFieldData loadGlobal(DirectoryReader indexReader) {
if (indexReader.leaves().size() <= 1) {
// ordinals are already global
return this;
@ -78,7 +79,7 @@ public class SortedSetDVOrdinalsIndexFieldData extends DocValuesIndexFieldData i
}
@Override
public IndexOrdinalsFieldData localGlobalDirect(IndexReader indexReader) throws Exception {
public IndexOrdinalsFieldData localGlobalDirect(DirectoryReader indexReader) throws Exception {
return GlobalOrdinalsBuilder.build(indexReader, this, indexSettings, breakerService, logger);
}
}

View File

@ -18,9 +18,11 @@
*/
package org.elasticsearch.index.query;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.MultiDocValues;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchNoDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.join.JoinUtil;
import org.apache.lucene.search.join.ScoreMode;
@ -287,12 +289,23 @@ public class HasChildQueryBuilder extends AbstractQueryBuilder<HasChildQueryBuil
if (getBoost() != 1.0F) {
return super.rewrite(reader);
}
String joinField = ParentFieldMapper.joinField(parentType);
IndexSearcher indexSearcher = new IndexSearcher(reader);
indexSearcher.setQueryCache(null);
IndexParentChildFieldData indexParentChildFieldData = parentChildIndexFieldData.loadGlobal(indexSearcher.getIndexReader());
MultiDocValues.OrdinalMap ordinalMap = ParentChildIndexFieldData.getOrdinalMap(indexParentChildFieldData, parentType);
return JoinUtil.createJoinQuery(joinField, innerQuery, toQuery, indexSearcher, scoreMode, ordinalMap, minChildren, maxChildren);
if (reader instanceof DirectoryReader) {
String joinField = ParentFieldMapper.joinField(parentType);
IndexSearcher indexSearcher = new IndexSearcher(reader);
indexSearcher.setQueryCache(null);
IndexParentChildFieldData indexParentChildFieldData = parentChildIndexFieldData.loadGlobal((DirectoryReader) reader);
MultiDocValues.OrdinalMap ordinalMap = ParentChildIndexFieldData.getOrdinalMap(indexParentChildFieldData, parentType);
return JoinUtil.createJoinQuery(joinField, innerQuery, toQuery, indexSearcher, scoreMode, ordinalMap, minChildren, maxChildren);
} else {
if (reader.leaves().isEmpty() && reader.numDocs() == 0) {
// asserting reader passes down a MultiReader during rewrite which makes this
// blow up since for this query to work we have to have a DirectoryReader otherwise
// we can't load global ordinals - for this to work we simply check if the reader has no leaves
// and rewrite to match nothing
return new MatchNoDocsQuery();
}
throw new IllegalStateException("can't load global ordinals for reader of type: " + reader.getClass() + " must be a DirectoryReader");
}
}
@Override

View File

@ -20,26 +20,38 @@
package org.elasticsearch.index.shard;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.search.IndexSearcher;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException;
import java.io.IOException;
/**
* Extension point to add custom functionality at request time to the {@link DirectoryReader}
* and {@link IndexSearcher} managed by the {@link Engine}.
* and {@link IndexSearcher} managed by the {@link IndexShard}.
*/
public interface IndexSearcherWrapper {
public class IndexSearcherWrapper {
/**
* Wraps the given {@link DirectoryReader}. The wrapped reader can filter out document just like delete documents etc. but
* must not change any term or document content.
* <p>
* NOTE: The wrapper has a per-request lifecycle, must delegate {@link IndexReader#getCoreCacheKey()} and must be an instance
* of {@link FilterDirectoryReader} that eventually exposes the original reader via {@link FilterDirectoryReader#getDelegate()}.
* The returned reader is closed once it goes out of scope.
* </p>
* @param reader The provided directory reader to be wrapped to add custom functionality
* @return a new directory reader wrapping the provided directory reader or if no wrapping was performed
* the provided directory reader
*/
DirectoryReader wrap(DirectoryReader reader) throws IOException;
protected DirectoryReader wrap(DirectoryReader reader) throws IOException {
return reader;
}
/**
* @param engineConfig The engine config which can be used to get the query cache and query cache policy from
@ -48,34 +60,87 @@ public interface IndexSearcherWrapper {
* @return a new index searcher wrapping the provided index searcher or if no wrapping was performed
* the provided index searcher
*/
IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws IOException;
protected IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws IOException {
return searcher;
}
/**
* If there are configured {@link IndexSearcherWrapper} instances, the {@link IndexSearcher} of the provided engine searcher
* gets wrapped and a new {@link Engine.Searcher} instances is returned, otherwise the provided {@link Engine.Searcher} is returned.
*
* This is invoked each time a {@link Engine.Searcher} is requested to do an operation. (for example search)
*/
default Engine.Searcher wrap(EngineConfig engineConfig, Engine.Searcher engineSearcher) throws IOException {
DirectoryReader reader = wrap((DirectoryReader) engineSearcher.reader());
IndexSearcher innerIndexSearcher = new IndexSearcher(reader);
public final Engine.Searcher wrap(EngineConfig engineConfig, Engine.Searcher engineSearcher) throws IOException {
final ElasticsearchDirectoryReader elasticsearchDirectoryReader = ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(engineSearcher.getDirectoryReader());
if (elasticsearchDirectoryReader == null) {
throw new IllegalStateException("Can't wrap non elasticsearch directory reader");
}
NonClosingReaderWrapper nonClosingReaderWrapper = new NonClosingReaderWrapper(engineSearcher.getDirectoryReader());
DirectoryReader reader = wrap(nonClosingReaderWrapper);
if (reader != nonClosingReaderWrapper) {
if (reader.getCoreCacheKey() != elasticsearchDirectoryReader.getCoreCacheKey()) {
throw new IllegalStateException("wrapped directory reader doesn't delegate IndexReader#getCoreCacheKey, wrappers must override this method and delegate" +
" to the original readers core cache key. Wrapped readers can't be used as cache keys since their are used only per request which would lead to subtle bugs");
}
if (ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(reader) != elasticsearchDirectoryReader) {
// prevent that somebody wraps with a non-filter reader
throw new IllegalStateException("wrapped directory reader hides actual ElasticsearchDirectoryReader but shouldn't");
}
}
final IndexSearcher innerIndexSearcher = new IndexSearcher(reader);
innerIndexSearcher.setQueryCache(engineConfig.getQueryCache());
innerIndexSearcher.setQueryCachingPolicy(engineConfig.getQueryCachingPolicy());
innerIndexSearcher.setSimilarity(engineConfig.getSimilarity());
// TODO: Right now IndexSearcher isn't wrapper friendly, when it becomes wrapper friendly we should revise this extension point
// For example if IndexSearcher#rewrite() is overwritten than also IndexSearcher#createNormalizedWeight needs to be overwritten
// This needs to be fixed before we can allow the IndexSearcher from Engine to be wrapped multiple times
IndexSearcher indexSearcher = wrap(engineConfig, innerIndexSearcher);
if (reader == engineSearcher.reader() && indexSearcher == innerIndexSearcher) {
final IndexSearcher indexSearcher = wrap(engineConfig, innerIndexSearcher);
if (reader == nonClosingReaderWrapper && indexSearcher == innerIndexSearcher) {
return engineSearcher;
} else {
return new Engine.Searcher(engineSearcher.source(), indexSearcher) {
@Override
public void close() throws ElasticsearchException {
engineSearcher.close();
try {
reader().close();
// we close the reader to make sure wrappers can release resources if needed....
// our NonClosingReaderWrapper makes sure that our reader is not closed
} catch (IOException e) {
throw new ElasticsearchException("failed to close reader", e);
} finally {
engineSearcher.close();
}
}
};
}
}
private static final class NonClosingReaderWrapper extends FilterDirectoryReader {
private NonClosingReaderWrapper(DirectoryReader in) throws IOException {
super(in, new SubReaderWrapper() {
@Override
public LeafReader wrap(LeafReader reader) {
return reader;
}
});
}
@Override
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
return new NonClosingReaderWrapper(in);
}
@Override
protected void doClose() throws IOException {
// don't close here - mimic the MultiReader#doClose = false behavior that FilterDirectoryReader doesn't have
}
@Override
public Object getCoreCacheKey() {
return in.getCoreCacheKey();
}
}
}

View File

@ -34,7 +34,7 @@ public final class ShardUtils {
*/
@Nullable
public static ShardId extractShardId(LeafReader reader) {
final ElasticsearchLeafReader esReader = getElasticsearchLeafReader(reader);
final ElasticsearchLeafReader esReader = ElasticsearchLeafReader.getElasticsearchLeafReader(reader);
if (esReader != null) {
assert reader.getRefCount() > 0 : "ElasticsearchLeafReader is already closed";
return esReader.shardId();
@ -47,45 +47,14 @@ public final class ShardUtils {
* will return null.
*/
@Nullable
public static ShardId extractShardId(IndexReader reader) {
final ElasticsearchDirectoryReader esReader = getElasticsearchDirectoryReader(reader);
public static ShardId extractShardId(DirectoryReader reader) {
final ElasticsearchDirectoryReader esReader = ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(reader);
if (esReader != null) {
return esReader.shardId();
}
if (!reader.leaves().isEmpty()) {
return extractShardId(reader.leaves().get(0).reader());
}
return null;
throw new IllegalArgumentException("can't extract shard ID, can't unwrap ElasticsearchDirectoryReader");
}
private static ElasticsearchLeafReader getElasticsearchLeafReader(LeafReader reader) {
if (reader instanceof FilterLeafReader) {
if (reader instanceof ElasticsearchLeafReader) {
return (ElasticsearchLeafReader) reader;
} else {
// We need to use FilterLeafReader#getDelegate and not FilterLeafReader#unwrap, because
// If there are multiple levels of filtered leaf readers then with the unwrap() method it immediately
// returns the most inner leaf reader and thus skipping of over any other filtered leaf reader that
// may be instance of ElasticsearchLeafReader. This can cause us to miss the shardId.
return getElasticsearchLeafReader(((FilterLeafReader) reader).getDelegate());
}
}
return null;
}
private static ElasticsearchDirectoryReader getElasticsearchDirectoryReader(IndexReader reader) {
if (reader instanceof FilterDirectoryReader) {
if (reader instanceof ElasticsearchDirectoryReader) {
return (ElasticsearchDirectoryReader) reader;
} else {
// We need to use FilterDirectoryReader#getDelegate and not FilterDirectoryReader#unwrap, because
// If there are multiple levels of filtered leaf readers then with the unwrap() method it immediately
// returns the most inner leaf reader and thus skipping of over any other filtered leaf reader that
// may be instance of ElasticsearchLeafReader. This can cause us to miss the shardId.
return getElasticsearchDirectoryReader(((FilterDirectoryReader) reader).getDelegate());
}
}
return null;
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.indices;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -180,6 +181,10 @@ public final class IndicesWarmer extends AbstractComponent {
return searcher.reader();
}
public DirectoryReader getDirectoryReader() {
return searcher.getDirectoryReader();
}
@Override
public String toString() {
return "WarmerContext: " + searcher.reader();

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.cache.*;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.MemorySizeValue;
import org.elasticsearch.common.unit.TimeValue;
@ -248,7 +249,7 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis
if (!registeredClosedListeners.containsKey(cleanupKey)) {
Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE);
if (previous == null) {
context.searcher().getIndexReader().addReaderClosedListener(cleanupKey);
ElasticsearchDirectoryReader.addReaderCloseListener(context.searcher().getDirectoryReader(), cleanupKey);
}
}
} else {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.indices.fielddata.cache;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentReader;
@ -31,6 +32,7 @@ import org.elasticsearch.common.cache.RemovalNotification;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
@ -167,12 +169,12 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
}
@Override
public <FD extends AtomicFieldData, IFD extends IndexFieldData.Global<FD>> IFD load(final IndexReader indexReader, final IFD indexFieldData) throws Exception {
public <FD extends AtomicFieldData, IFD extends IndexFieldData.Global<FD>> IFD load(final DirectoryReader indexReader, final IFD indexFieldData) throws Exception {
final ShardId shardId = ShardUtils.extractShardId(indexReader);
final Key key = new Key(this, indexReader.getCoreCacheKey(), shardId);
//noinspection unchecked
final Accountable accountable = cache.computeIfAbsent(key, k -> {
indexReader.addReaderClosedListener(IndexFieldCache.this);
ElasticsearchDirectoryReader.addReaderCloseListener(indexReader, IndexFieldCache.this);
for (Listener listener : this.listeners) {
k.listeners.add(listener);
}

View File

@ -1016,7 +1016,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
try {
final long start = System.nanoTime();
IndexFieldData.Global ifd = indexFieldDataService.getForField(fieldType);
ifd.loadGlobal(context.reader());
ifd.loadGlobal(context.getDirectoryReader());
if (indexShard.warmerService().logger().isTraceEnabled()) {
indexShard.warmerService().logger().trace("warmed global ordinals for [{}], took [{}]", fieldType.names().fullName(), TimeValue.timeValueNanos(System.nanoTime() - start));
}

View File

@ -18,12 +18,7 @@
*/
package org.elasticsearch.search.aggregations.support;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.RandomAccessOrds;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.*;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.util.Bits;
@ -146,7 +141,7 @@ public abstract class ValuesSource {
@Override
public RandomAccessOrds globalOrdinalsValues(LeafReaderContext context) {
final IndexOrdinalsFieldData global = indexFieldData.loadGlobal(context.parent.reader());
final IndexOrdinalsFieldData global = indexFieldData.loadGlobal((DirectoryReader)context.parent.reader());
final AtomicOrdinalsFieldData atomicFieldData = global.load(context);
return atomicFieldData.getOrdinalsValues();
}
@ -162,7 +157,7 @@ public abstract class ValuesSource {
}
public long globalMaxOrd(IndexSearcher indexSearcher, String type) {
IndexReader indexReader = indexSearcher.getIndexReader();
DirectoryReader indexReader = (DirectoryReader) indexSearcher.getIndexReader();
if (indexReader.leaves().isEmpty()) {
return 0;
} else {
@ -175,7 +170,7 @@ public abstract class ValuesSource {
}
public SortedDocValues globalOrdinalsValues(String type, LeafReaderContext context) {
final IndexParentChildFieldData global = indexFieldData.loadGlobal(context.parent.reader());
final IndexParentChildFieldData global = indexFieldData.loadGlobal((DirectoryReader)context.parent.reader());
final AtomicParentChildFieldData atomicFieldData = global.load(context);
return atomicFieldData.getOrdinalsValues(type);
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.search.internal;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TermContext;
import org.apache.lucene.search.*;
@ -40,9 +41,12 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
private AggregatedDfs aggregatedDfs;
private final Engine.Searcher engineSearcher;
public ContextIndexSearcher(SearchContext searchContext, Engine.Searcher searcher) {
super(searcher.reader());
in = searcher.searcher();
engineSearcher = searcher;
setSimilarity(searcher.searcher().getSimilarity(true));
setQueryCache(searchContext.getQueryCache());
setQueryCachingPolicy(searchContext.indexShard().getQueryCachingPolicy());
@ -104,4 +108,8 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
}
return collectionStatistics;
}
public DirectoryReader getDirectoryReader() {
return engineSearcher.getDirectoryReader();
}
}

View File

@ -67,10 +67,6 @@ public class ESDirectoryReaderTests extends ESTestCase {
assertEquals(1, ir2.numDocs());
assertEquals(1, ir2.leaves().size());
assertSame(ir.leaves().get(0).reader().getCoreCacheKey(), ir2.leaves().get(0).reader().getCoreCacheKey());
// this is kind of stupid, but for now its here
assertNotSame(ir.leaves().get(0).reader().getCombinedCoreAndDeletesKey(), ir2.leaves().get(0).reader().getCombinedCoreAndDeletesKey());
IOUtils.close(ir, ir2, iw, dir);
}
}

View File

@ -80,7 +80,7 @@ public class VersionsTests extends ESTestCase {
public void testVersions() throws Exception {
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
DirectoryReader directoryReader = DirectoryReader.open(writer, true);
DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1));
MatcherAssert.assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND));
Document doc = new Document();
@ -148,7 +148,7 @@ public class VersionsTests extends ESTestCase {
docs.add(doc);
writer.updateDocuments(new Term(UidFieldMapper.NAME, "1"), docs);
DirectoryReader directoryReader = DirectoryReader.open(writer, true);
DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1));
assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(5l));
assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(5l));
@ -174,7 +174,7 @@ public class VersionsTests extends ESTestCase {
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
DirectoryReader directoryReader = DirectoryReader.open(writer, true);
DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1));
MatcherAssert.assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND));
Document doc = new Document();
@ -286,7 +286,7 @@ public class VersionsTests extends ESTestCase {
// Force merge and check versions
iw.forceMerge(1, true);
final LeafReader ir = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(iw.getDirectory()));
final LeafReader ir = SlowCompositeReaderWrapper.wrap(ElasticsearchDirectoryReader.wrap(DirectoryReader.open(iw.getDirectory()), new ShardId("foo", 1)));
final NumericDocValues versions = ir.getNumericDocValues(VersionFieldMapper.NAME);
assertThat(versions, notNullValue());
for (int i = 0; i < ir.maxDoc(); ++i) {

View File

@ -54,7 +54,7 @@ public class IndexModuleTests extends ModuleTestCase {
assertInstanceBinding(module, IndexMetaData.class, (x) -> x == meta);
}
public static final class Wrapper implements IndexSearcherWrapper {
public static final class Wrapper extends IndexSearcherWrapper {
@Override
public DirectoryReader wrap(DirectoryReader reader) {

View File

@ -1499,7 +1499,7 @@ public class InternalEngineTests extends ESTestCase {
@Test
public void testExtractShardId() {
try (Engine.Searcher test = this.engine.acquireSearcher("test")) {
ShardId shardId = ShardUtils.extractShardId(test.reader());
ShardId shardId = ShardUtils.extractShardId(test.getDirectoryReader());
assertNotNull(shardId);
assertEquals(shardId, engine.config().getShardId());
}

View File

@ -855,7 +855,7 @@ public class ShadowEngineTests extends ESTestCase {
@Test
public void testExtractShardId() {
try (Engine.Searcher test = replicaEngine.acquireSearcher("test")) {
ShardId shardId = ShardUtils.extractShardId(test.reader());
ShardId shardId = ShardUtils.extractShardId(test.getDirectoryReader());
assertNotNull(shardId);
assertEquals(shardId, replicaEngine.config().getShardId());
}

View File

@ -26,6 +26,7 @@ import org.apache.lucene.document.StringField;
import org.apache.lucene.index.*;
import org.apache.lucene.search.Filter;
import org.apache.lucene.store.RAMDirectory;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
@ -35,6 +36,7 @@ import org.elasticsearch.index.mapper.Mapper.BuilderContext;
import org.elasticsearch.index.mapper.MapperBuilders;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.junit.After;
@ -52,7 +54,7 @@ public abstract class AbstractFieldDataTestCase extends ESSingleNodeTestCase {
protected MapperService mapperService;
protected IndexWriter writer;
protected LeafReaderContext readerContext;
protected IndexReader topLevelReader;
protected DirectoryReader topLevelReader;
protected IndicesFieldDataCache indicesFieldDataCache;
protected abstract FieldDataType getFieldDataType();
@ -112,7 +114,7 @@ public abstract class AbstractFieldDataTestCase extends ESSingleNodeTestCase {
if (readerContext != null) {
readerContext.reader().close();
}
topLevelReader = DirectoryReader.open(writer, true);
topLevelReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1));
LeafReader reader = SlowCompositeReaderWrapper.wrap(topLevelReader);
readerContext = reader.getContext();
return readerContext;

View File

@ -0,0 +1,277 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.*;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.similarities.DefaultSimilarity;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
public class IndexSearcherWrapperTests extends ESTestCase {
private static final EngineConfig ENGINE_CONFIG = new EngineConfig(null, null, null, Settings.EMPTY, null, null, null, null, null, null, new DefaultSimilarity(), null, null, null, null, QueryCachingPolicy.ALWAYS_CACHE, null);
public void testReaderCloseListenerIsCalled() throws IOException {
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig();
IndexWriter writer = new IndexWriter(dir, iwc);
Document doc = new Document();
doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
doc.add(new TextField("field", "doc", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
writer.addDocument(doc);
DirectoryReader open = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1));
IndexSearcher searcher = new IndexSearcher(open);
assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits);
final AtomicInteger closeCalls = new AtomicInteger(0);
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {
@Override
public DirectoryReader wrap(DirectoryReader reader) throws IOException {
return new FieldMaskingReader("field", reader, closeCalls);
}
@Override
public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws EngineException {
return searcher;
}
};
final int sourceRefCount = open.getRefCount();
final AtomicInteger count = new AtomicInteger();
final AtomicInteger outerCount = new AtomicInteger();
try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) {
final Engine.Searcher wrap = wrapper.wrap(ENGINE_CONFIG, engineSearcher);
assertEquals(1, wrap.reader().getRefCount());
ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), reader -> {
if (reader == open) {
count.incrementAndGet();
}
outerCount.incrementAndGet();
});
assertEquals(0, wrap.searcher().search(new TermQuery(new Term("field", "doc")), 1).totalHits);
wrap.close();
assertFalse("wrapped reader is closed", wrap.reader().tryIncRef());
assertEquals(sourceRefCount, open.getRefCount());
}
assertEquals(1, closeCalls.get());
IOUtils.close(open, writer, dir);
assertEquals(1, outerCount.get());
assertEquals(1, count.get());
assertEquals(0, open.getRefCount());
assertEquals(1, closeCalls.get());
}
public void testIsCacheable() throws IOException {
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig();
IndexWriter writer = new IndexWriter(dir, iwc);
Document doc = new Document();
doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
doc.add(new TextField("field", "doc", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
writer.addDocument(doc);
DirectoryReader open = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1));
IndexSearcher searcher = new IndexSearcher(open);
assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits);
searcher.setSimilarity(iwc.getSimilarity());
final AtomicInteger closeCalls = new AtomicInteger(0);
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {
@Override
public DirectoryReader wrap(DirectoryReader reader) throws IOException {
return new FieldMaskingReader("field", reader, closeCalls);
}
@Override
public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws EngineException {
return searcher;
}
};
final ConcurrentHashMap<Object, TopDocs> cache = new ConcurrentHashMap<>();
try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) {
try (final Engine.Searcher wrap = wrapper.wrap(ENGINE_CONFIG, engineSearcher)) {
ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), reader -> {
cache.remove(reader.getCoreCacheKey());
});
TopDocs search = wrap.searcher().search(new TermQuery(new Term("field", "doc")), 1);
cache.put(wrap.reader().getCoreCacheKey(), search);
}
}
assertEquals(1, closeCalls.get());
assertEquals(1, cache.size());
IOUtils.close(open, writer, dir);
assertEquals(0, cache.size());
assertEquals(1, closeCalls.get());
}
public void testNoWrap() throws IOException {
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig();
IndexWriter writer = new IndexWriter(dir, iwc);
Document doc = new Document();
doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
doc.add(new TextField("field", "doc", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
writer.addDocument(doc);
DirectoryReader open = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1));
IndexSearcher searcher = new IndexSearcher(open);
assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits);
searcher.setSimilarity(iwc.getSimilarity());
IndexSearcherWrapper wrapper = new IndexSearcherWrapper();
try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) {
final Engine.Searcher wrap = wrapper.wrap(ENGINE_CONFIG, engineSearcher);
assertSame(wrap, engineSearcher);
}
IOUtils.close(open, writer, dir);
}
public void testWrappedReaderMustDelegateCoreCacheKey() throws IOException {
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig();
IndexWriter writer = new IndexWriter(dir, iwc);
Document doc = new Document();
doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
doc.add(new TextField("field", "doc", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
writer.addDocument(doc);
DirectoryReader open = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1));
IndexSearcher searcher = new IndexSearcher(open);
assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits);
searcher.setSimilarity(iwc.getSimilarity());
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {
@Override
protected DirectoryReader wrap(DirectoryReader reader) throws IOException {
return new BrokenWrapper(reader, false);
}
};
try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) {
try {
wrapper.wrap(ENGINE_CONFIG, engineSearcher);
fail("reader must delegate cache key");
} catch (IllegalStateException ex) {
// all is well
}
}
wrapper = new IndexSearcherWrapper() {
@Override
protected DirectoryReader wrap(DirectoryReader reader) throws IOException {
return new BrokenWrapper(reader, true);
}
};
try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) {
try {
wrapper.wrap(ENGINE_CONFIG, engineSearcher);
fail("reader must delegate cache key");
} catch (IllegalStateException ex) {
// all is well
}
}
IOUtils.close(open, writer, dir);
}
private static class FieldMaskingReader extends FilterDirectoryReader {
private final String field;
private final AtomicInteger closeCalls;
public FieldMaskingReader(String field, DirectoryReader in, AtomicInteger closeCalls) throws IOException {
super(in, new SubReaderWrapper() {
@Override
public LeafReader wrap(LeafReader reader) {
return new FieldFilterLeafReader(reader, Collections.singleton(field), true);
}
});
this.closeCalls = closeCalls;
this.field = field;
}
@Override
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
return new FieldMaskingReader(field, in, closeCalls);
}
@Override
public Object getCoreCacheKey() {
return in.getCoreCacheKey();
}
@Override
protected void doClose() throws IOException {
super.doClose();
closeCalls.incrementAndGet();
}
}
private static class BrokenWrapper extends FilterDirectoryReader {
private final boolean hideDelegate;
public BrokenWrapper(DirectoryReader in, boolean hideDelegate) throws IOException {
super(in, new SubReaderWrapper() {
@Override
public LeafReader wrap(LeafReader reader) {
return reader;
}
});
this.hideDelegate = hideDelegate;
}
@Override
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
return new BrokenWrapper(in, hideDelegate);
}
@Override
public DirectoryReader getDelegate() {
if (hideDelegate) {
try {
return ElasticsearchDirectoryReader.wrap(super.getDelegate(), new ShardId("foo", 1));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return super.getDelegate();
}
@Override
public Object getCoreCacheKey() {
if (hideDelegate == false) {
return super.getCoreCacheKey();
} else {
return in.getCoreCacheKey();
}
}
}
}

View File

@ -27,6 +27,7 @@ import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
@ -62,13 +63,12 @@ import org.elasticsearch.index.IndexServicesProvider;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.fielddata.FieldDataStats;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.indexing.IndexingOperationListener;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.snapshots.IndexShardRepository;
@ -88,6 +88,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
@ -897,7 +898,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexService("test");
IndexShard shard = indexService.getShardOrNull(0);
client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefresh(randomBoolean()).get();
client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get();
client().prepareIndex("test", "test", "1").setSource("{\"foobar\" : \"bar\"}").setRefresh(true).get();
Engine.GetResult getResult = shard.get(new Engine.Get(false, new Term(UidFieldMapper.NAME, Uid.createUid("test", "1"))));
@ -928,26 +929,48 @@ public class IndexShardTests extends ESSingleNodeTestCase {
IndexServicesProvider newProvider = new IndexServicesProvider(indexServices.getIndicesLifecycle(), indexServices.getThreadPool(), indexServices.getMapperService(), indexServices.getQueryParserService(), indexServices.getIndexCache(), indexServices.getIndicesQueryCache(), indexServices.getCodecService(), indexServices.getTermVectorsService(), indexServices.getIndexFieldDataService(), indexServices.getWarmer(), indexServices.getSimilarityService(), indexServices.getFactory(), indexServices.getBigArrays(), wrapper, indexServices.getIndexingMemoryController());
IndexShard newShard = new IndexShard(shard.shardId(), shard.indexSettings, shard.shardPath(), shard.store(), newProvider);
try {
ShardRoutingHelper.reinit(routing);
newShard.updateRoutingEntry(routing, false);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
assertTrue(newShard.recoverFromStore(routing, localNode));
routing = new ShardRouting(routing);
ShardRoutingHelper.moveToStarted(routing);
newShard.updateRoutingEntry(routing, true);
try (Engine.Searcher searcher = newShard.acquireSearcher("test")) {
TopDocs search = searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10);
assertEquals(search.totalHits, 0);
search = searcher.searcher().search(new TermQuery(new Term("foobar", "bar")), 10);
assertEquals(search.totalHits, 1);
}
getResult = newShard.get(new Engine.Get(false, new Term(UidFieldMapper.NAME, Uid.createUid("test", "1"))));
assertTrue(getResult.exists());
assertNotNull(getResult.searcher()); // make sure get uses the wrapped reader
assertTrue(getResult.searcher().reader() instanceof FieldMaskingReader);
getResult.release();
ShardRoutingHelper.reinit(routing);
newShard.updateRoutingEntry(routing, false);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
assertTrue(newShard.recoverFromStore(routing, localNode));
routing = new ShardRouting(routing);
ShardRoutingHelper.moveToStarted(routing);
newShard.updateRoutingEntry(routing, true);
try (Engine.Searcher searcher = newShard.acquireSearcher("test")) {
TopDocs search = searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10);
assertEquals(search.totalHits, 0);
search = searcher.searcher().search(new TermQuery(new Term("foobar", "bar")), 10);
assertEquals(search.totalHits, 1);
// test global ordinals are evicted
MappedFieldType foo = newShard.mapperService().indexName("foo");
IndexFieldData.Global ifd = shard.indexFieldDataService().getForField(foo);
FieldDataStats before = shard.fieldData().stats("foo");
FieldDataStats after = null;
try (Engine.Searcher searcher = newShard.acquireSearcher("test")) {
assumeTrue("we have to have more than one segment", searcher.getDirectoryReader().leaves().size() > 1);
IndexFieldData indexFieldData = ifd.loadGlobal(searcher.getDirectoryReader());
after = shard.fieldData().stats("foo");
assertEquals(after.getEvictions(), before.getEvictions());
assertTrue(indexFieldData.toString(), after.getMemorySizeInBytes() > before.getMemorySizeInBytes());
}
assertEquals(shard.fieldData().stats("foo").getEvictions(), before.getEvictions());
assertEquals(shard.fieldData().stats("foo").getMemorySizeInBytes(), after.getMemorySizeInBytes());
newShard.flush(new FlushRequest().force(true).waitIfOngoing(true));
newShard.refresh("test");
assertEquals(shard.fieldData().stats("foo").getMemorySizeInBytes(), before.getMemorySizeInBytes());
assertEquals(shard.fieldData().stats("foo").getEvictions(), before.getEvictions());
} finally {
newShard.close("just do it", randomBoolean());
}
getResult = newShard.get(new Engine.Get(false, new Term(UidFieldMapper.NAME, Uid.createUid("test", "1"))));
assertTrue(getResult.exists());
assertNotNull(getResult.searcher()); // make sure get uses the wrapped reader
assertTrue(getResult.searcher().reader() instanceof FieldMaskingReader);
getResult.release();
newShard.close("just do it", randomBoolean());
}
private static class FieldMaskingReader extends FilterDirectoryReader {
@ -958,17 +981,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
private final String filteredField = field;
@Override
public LeafReader wrap(LeafReader reader) {
return new FilterLeafReader(reader) {
@Override
public Fields fields() throws IOException {
return new FilterFields(super.fields()) {
@Override
public Terms terms(String field) throws IOException {
return filteredField.equals(field) ? null : super.terms(field);
}
};
}
};
return new FieldFilterLeafReader(reader, Collections.singleton(field), true);
}
});
this.field = field;
@ -979,5 +992,10 @@ public class IndexShardTests extends ESSingleNodeTestCase {
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
return new FieldMaskingReader(field, in);
}
@Override
public Object getCoreCacheKey() {
return in.getCoreCacheKey();
}
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.test.engine;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.IndexSearcher;
import org.elasticsearch.common.logging.ESLogger;
@ -33,7 +34,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
class AssertingSearcher extends Engine.Searcher {
private final Engine.Searcher wrappedSearcher;
private final ShardId shardId;
private final IndexSearcher indexSearcher;
private RuntimeException firstReleaseStack;
private final Object lock = new Object();
private final int initialRefCount;
@ -50,7 +50,6 @@ class AssertingSearcher extends Engine.Searcher {
this.logger = logger;
this.shardId = shardId;
initialRefCount = wrappedSearcher.reader().getRefCount();
this.indexSearcher = indexSearcher;
assert initialRefCount > 0 : "IndexReader#getRefCount() was [" + initialRefCount + "] expected a value > [0] - reader is already closed";
}
@ -82,16 +81,6 @@ class AssertingSearcher extends Engine.Searcher {
}
}
@Override
public IndexReader reader() {
return indexSearcher.getIndexReader();
}
@Override
public IndexSearcher searcher() {
return indexSearcher;
}
public ShardId shardId() {
return shardId;
}

View File

@ -169,11 +169,6 @@ public final class MockEngineSupport {
return in.getCoreCacheKey();
}
@Override
public Object getCombinedCoreAndDeletesKey() {
return in.getCombinedCoreAndDeletesKey();
}
}
public Engine.Searcher wrapSearcher(String source, Engine.Searcher engineSearcher, IndexSearcher searcher, SearcherManager manager) {

View File

@ -90,3 +90,6 @@ java.net.InetSocketAddress#getHostName() @ Use getHostString() instead, which av
@defaultMessage Do not violate java's access system
java.lang.reflect.AccessibleObject#setAccessible(boolean)
java.lang.reflect.AccessibleObject#setAccessible(java.lang.reflect.AccessibleObject[], boolean)
@defaultMessage this should not have been added to lucene in the first place
org.apache.lucene.index.IndexReader#getCombinedCoreAndDeletesKey()

View File

@ -36,6 +36,10 @@ org.apache.lucene.index.IndexReader#decRef()
org.apache.lucene.index.IndexReader#incRef()
org.apache.lucene.index.IndexReader#tryIncRef()
@defaultMessage Close listeners can only installed via ElasticsearchDirectoryReader#addReaderCloseListener
org.apache.lucene.index.IndexReader#addReaderClosedListener(org.apache.lucene.index.IndexReader$ReaderClosedListener)
org.apache.lucene.index.IndexReader#removeReaderClosedListener(org.apache.lucene.index.IndexReader$ReaderClosedListener)
@defaultMessage Pass the precision step from the mappings explicitly instead
org.apache.lucene.search.NumericRangeQuery#newDoubleRange(java.lang.String,java.lang.Double,java.lang.Double,boolean,boolean)
org.apache.lucene.search.NumericRangeQuery#newFloatRange(java.lang.String,java.lang.Float,java.lang.Float,boolean,boolean)