Streamline top level reader close listeners and forbid general usage
IndexReader#addReaderCloseListener is very error prone when it comes to caching and reader wrapping. The listeners are not delegated to the sub readers nor can it's implementation change since it's final in the base class. This commit only allows installing close listeners on the top level ElasticsearchDirecotryReader which is known to work an has a defined lifetime which corresponds to its subreader. This ensure that cachesa re cleared once the reader goes out of scope.
This commit is contained in:
parent
077a401c28
commit
d3436ff592
|
@ -18,10 +18,8 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.common.lucene.index;
|
package org.elasticsearch.common.lucene.index;
|
||||||
|
|
||||||
import org.apache.lucene.index.DirectoryReader;
|
import org.apache.lucene.index.*;
|
||||||
import org.apache.lucene.index.FilterDirectoryReader;
|
import org.elasticsearch.common.SuppressForbidden;
|
||||||
import org.apache.lucene.index.FilterLeafReader;
|
|
||||||
import org.apache.lucene.index.LeafReader;
|
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -76,4 +74,38 @@ public final class ElasticsearchDirectoryReader extends FilterDirectoryReader {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressForbidden(reason = "This is the only sane way to add a ReaderClosedListener")
|
||||||
|
public static void addReaderCloseListener(IndexReader reader, IndexReader.ReaderClosedListener listener) {
|
||||||
|
ElasticsearchDirectoryReader elasticsearchDirectoryReader = getElasticsearchDirectoryReader(reader);
|
||||||
|
if (elasticsearchDirectoryReader == null && reader instanceof LeafReader) {
|
||||||
|
ElasticsearchLeafReader leafReader = ElasticsearchLeafReader.getElasticsearchLeafReader((LeafReader) reader);
|
||||||
|
if (leafReader != null) {
|
||||||
|
assert reader.getCoreCacheKey() == leafReader.getCoreCacheKey();
|
||||||
|
leafReader.addReaderClosedListener(listener);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
assert reader.getCoreCacheKey() == elasticsearchDirectoryReader.getCoreCacheKey();
|
||||||
|
elasticsearchDirectoryReader.addReaderClosedListener(listener);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
throw new IllegalStateException("Can't install close listener reader is not an ElasticsearchDirectoryReader/ElasticsearchLeafReader");
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ElasticsearchDirectoryReader getElasticsearchDirectoryReader(IndexReader reader) {
|
||||||
|
if (reader instanceof FilterDirectoryReader) {
|
||||||
|
if (reader instanceof ElasticsearchDirectoryReader) {
|
||||||
|
return (ElasticsearchDirectoryReader) reader;
|
||||||
|
} else {
|
||||||
|
// We need to use FilterDirectoryReader#getDelegate and not FilterDirectoryReader#unwrap, because
|
||||||
|
// If there are multiple levels of filtered leaf readers then with the unwrap() method it immediately
|
||||||
|
// returns the most inner leaf reader and thus skipping of over any other filtered leaf reader that
|
||||||
|
// may be instance of ElasticsearchLeafReader. This can cause us to miss the shardId.
|
||||||
|
return getElasticsearchDirectoryReader(((FilterDirectoryReader) reader).getDelegate());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,4 +59,19 @@ public final class ElasticsearchLeafReader extends FilterLeafReader {
|
||||||
public Object getCombinedCoreAndDeletesKey() {
|
public Object getCombinedCoreAndDeletesKey() {
|
||||||
return in.getCombinedCoreAndDeletesKey();
|
return in.getCombinedCoreAndDeletesKey();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static ElasticsearchLeafReader getElasticsearchLeafReader(LeafReader reader) {
|
||||||
|
if (reader instanceof FilterLeafReader) {
|
||||||
|
if (reader instanceof ElasticsearchLeafReader) {
|
||||||
|
return (ElasticsearchLeafReader) reader;
|
||||||
|
} else {
|
||||||
|
// We need to use FilterLeafReader#getDelegate and not FilterLeafReader#unwrap, because
|
||||||
|
// If there are multiple levels of filtered leaf readers then with the unwrap() method it immediately
|
||||||
|
// returns the most inner leaf reader and thus skipping of over any other filtered leaf reader that
|
||||||
|
// may be instance of ElasticsearchLeafReader. This can cause us to miss the shardId.
|
||||||
|
return getElasticsearchLeafReader(((FilterLeafReader) reader).getDelegate());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,6 @@ package org.elasticsearch.index.fielddata;
|
||||||
import org.apache.lucene.index.LeafReaderContext;
|
import org.apache.lucene.index.LeafReaderContext;
|
||||||
import org.apache.lucene.index.IndexReader;
|
import org.apache.lucene.index.IndexReader;
|
||||||
import org.apache.lucene.util.Accountable;
|
import org.apache.lucene.util.Accountable;
|
||||||
import org.elasticsearch.index.mapper.FieldMapper;
|
|
||||||
import org.elasticsearch.index.mapper.MappedFieldType;
|
import org.elasticsearch.index.mapper.MappedFieldType;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
|
||||||
|
|
|
@ -20,11 +20,14 @@
|
||||||
package org.elasticsearch.index.shard;
|
package org.elasticsearch.index.shard;
|
||||||
|
|
||||||
import org.apache.lucene.index.DirectoryReader;
|
import org.apache.lucene.index.DirectoryReader;
|
||||||
|
import org.apache.lucene.index.FilterDirectoryReader;
|
||||||
|
import org.apache.lucene.index.LeafReader;
|
||||||
import org.apache.lucene.search.IndexSearcher;
|
import org.apache.lucene.search.IndexSearcher;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
|
import org.elasticsearch.common.SuppressForbidden;
|
||||||
|
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
|
||||||
import org.elasticsearch.index.engine.Engine;
|
import org.elasticsearch.index.engine.Engine;
|
||||||
import org.elasticsearch.index.engine.EngineConfig;
|
import org.elasticsearch.index.engine.EngineConfig;
|
||||||
import org.elasticsearch.index.engine.EngineException;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@ -32,14 +35,16 @@ import java.io.IOException;
|
||||||
* Extension point to add custom functionality at request time to the {@link DirectoryReader}
|
* Extension point to add custom functionality at request time to the {@link DirectoryReader}
|
||||||
* and {@link IndexSearcher} managed by the {@link Engine}.
|
* and {@link IndexSearcher} managed by the {@link Engine}.
|
||||||
*/
|
*/
|
||||||
public interface IndexSearcherWrapper {
|
public class IndexSearcherWrapper {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param reader The provided directory reader to be wrapped to add custom functionality
|
* @param reader The provided directory reader to be wrapped to add custom functionality
|
||||||
* @return a new directory reader wrapping the provided directory reader or if no wrapping was performed
|
* @return a new directory reader wrapping the provided directory reader or if no wrapping was performed
|
||||||
* the provided directory reader
|
* the provided directory reader
|
||||||
*/
|
*/
|
||||||
DirectoryReader wrap(DirectoryReader reader) throws IOException;
|
protected DirectoryReader wrap(DirectoryReader reader) throws IOException {
|
||||||
|
return reader;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param engineConfig The engine config which can be used to get the query cache and query cache policy from
|
* @param engineConfig The engine config which can be used to get the query cache and query cache policy from
|
||||||
|
@ -48,17 +53,22 @@ public interface IndexSearcherWrapper {
|
||||||
* @return a new index searcher wrapping the provided index searcher or if no wrapping was performed
|
* @return a new index searcher wrapping the provided index searcher or if no wrapping was performed
|
||||||
* the provided index searcher
|
* the provided index searcher
|
||||||
*/
|
*/
|
||||||
IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws IOException;
|
protected IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws IOException {
|
||||||
|
return searcher;
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* If there are configured {@link IndexSearcherWrapper} instances, the {@link IndexSearcher} of the provided engine searcher
|
* If there are configured {@link IndexSearcherWrapper} instances, the {@link IndexSearcher} of the provided engine searcher
|
||||||
* gets wrapped and a new {@link Engine.Searcher} instances is returned, otherwise the provided {@link Engine.Searcher} is returned.
|
* gets wrapped and a new {@link Engine.Searcher} instances is returned, otherwise the provided {@link Engine.Searcher} is returned.
|
||||||
*
|
*
|
||||||
* This is invoked each time a {@link Engine.Searcher} is requested to do an operation. (for example search)
|
* This is invoked each time a {@link Engine.Searcher} is requested to do an operation. (for example search)
|
||||||
*/
|
*/
|
||||||
default Engine.Searcher wrap(EngineConfig engineConfig, Engine.Searcher engineSearcher) throws IOException {
|
public final Engine.Searcher wrap(EngineConfig engineConfig, Engine.Searcher engineSearcher) throws IOException {
|
||||||
DirectoryReader reader = wrap((DirectoryReader) engineSearcher.reader());
|
final ElasticsearchDirectoryReader elasticsearchDirectoryReader = ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(engineSearcher.reader());
|
||||||
IndexSearcher innerIndexSearcher = new IndexSearcher(reader);
|
if (elasticsearchDirectoryReader == null) {
|
||||||
|
throw new IllegalStateException("Can't wrap non elasticsearch directory reader");
|
||||||
|
}
|
||||||
|
DirectoryReader reader = wrap((DirectoryReader)engineSearcher.reader());
|
||||||
|
IndexSearcher innerIndexSearcher = new IndexSearcher(new CacheFriendlyReaderWrapper(reader, elasticsearchDirectoryReader));
|
||||||
innerIndexSearcher.setQueryCache(engineConfig.getQueryCache());
|
innerIndexSearcher.setQueryCache(engineConfig.getQueryCache());
|
||||||
innerIndexSearcher.setQueryCachingPolicy(engineConfig.getQueryCachingPolicy());
|
innerIndexSearcher.setQueryCachingPolicy(engineConfig.getQueryCachingPolicy());
|
||||||
innerIndexSearcher.setSimilarity(engineConfig.getSimilarity());
|
innerIndexSearcher.setSimilarity(engineConfig.getSimilarity());
|
||||||
|
@ -72,10 +82,47 @@ public interface IndexSearcherWrapper {
|
||||||
return new Engine.Searcher(engineSearcher.source(), indexSearcher) {
|
return new Engine.Searcher(engineSearcher.source(), indexSearcher) {
|
||||||
@Override
|
@Override
|
||||||
public void close() throws ElasticsearchException {
|
public void close() throws ElasticsearchException {
|
||||||
engineSearcher.close();
|
try {
|
||||||
|
reader().close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ElasticsearchException("failed to close reader", e);
|
||||||
|
} finally {
|
||||||
|
engineSearcher.close();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final class CacheFriendlyReaderWrapper extends FilterDirectoryReader {
|
||||||
|
private final ElasticsearchDirectoryReader elasticsearchReader;
|
||||||
|
|
||||||
|
private CacheFriendlyReaderWrapper(DirectoryReader in, ElasticsearchDirectoryReader elasticsearchReader) throws IOException {
|
||||||
|
super(in, new SubReaderWrapper() {
|
||||||
|
@Override
|
||||||
|
public LeafReader wrap(LeafReader reader) {
|
||||||
|
return reader;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
this.elasticsearchReader = elasticsearchReader;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
|
||||||
|
return new CacheFriendlyReaderWrapper(in, elasticsearchReader);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doClose() throws IOException {
|
||||||
|
// don't close here - mimic the MultiReader#doClose = false behavior that FilterDirectoryReader doesn't have
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object getCoreCacheKey() {
|
||||||
|
// this is important = we always use the ES reader core cache key on top level
|
||||||
|
return elasticsearchReader.getCoreCacheKey();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,7 @@ public final class ShardUtils {
|
||||||
*/
|
*/
|
||||||
@Nullable
|
@Nullable
|
||||||
public static ShardId extractShardId(LeafReader reader) {
|
public static ShardId extractShardId(LeafReader reader) {
|
||||||
final ElasticsearchLeafReader esReader = getElasticsearchLeafReader(reader);
|
final ElasticsearchLeafReader esReader = ElasticsearchLeafReader.getElasticsearchLeafReader(reader);
|
||||||
if (esReader != null) {
|
if (esReader != null) {
|
||||||
assert reader.getRefCount() > 0 : "ElasticsearchLeafReader is already closed";
|
assert reader.getRefCount() > 0 : "ElasticsearchLeafReader is already closed";
|
||||||
return esReader.shardId();
|
return esReader.shardId();
|
||||||
|
@ -48,7 +48,7 @@ public final class ShardUtils {
|
||||||
*/
|
*/
|
||||||
@Nullable
|
@Nullable
|
||||||
public static ShardId extractShardId(IndexReader reader) {
|
public static ShardId extractShardId(IndexReader reader) {
|
||||||
final ElasticsearchDirectoryReader esReader = getElasticsearchDirectoryReader(reader);
|
final ElasticsearchDirectoryReader esReader = ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(reader);
|
||||||
if (esReader != null) {
|
if (esReader != null) {
|
||||||
return esReader.shardId();
|
return esReader.shardId();
|
||||||
}
|
}
|
||||||
|
@ -58,34 +58,6 @@ public final class ShardUtils {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ElasticsearchLeafReader getElasticsearchLeafReader(LeafReader reader) {
|
|
||||||
if (reader instanceof FilterLeafReader) {
|
|
||||||
if (reader instanceof ElasticsearchLeafReader) {
|
|
||||||
return (ElasticsearchLeafReader) reader;
|
|
||||||
} else {
|
|
||||||
// We need to use FilterLeafReader#getDelegate and not FilterLeafReader#unwrap, because
|
|
||||||
// If there are multiple levels of filtered leaf readers then with the unwrap() method it immediately
|
|
||||||
// returns the most inner leaf reader and thus skipping of over any other filtered leaf reader that
|
|
||||||
// may be instance of ElasticsearchLeafReader. This can cause us to miss the shardId.
|
|
||||||
return getElasticsearchLeafReader(((FilterLeafReader) reader).getDelegate());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static ElasticsearchDirectoryReader getElasticsearchDirectoryReader(IndexReader reader) {
|
|
||||||
if (reader instanceof FilterDirectoryReader) {
|
|
||||||
if (reader instanceof ElasticsearchDirectoryReader) {
|
|
||||||
return (ElasticsearchDirectoryReader) reader;
|
|
||||||
} else {
|
|
||||||
// We need to use FilterDirectoryReader#getDelegate and not FilterDirectoryReader#unwrap, because
|
|
||||||
// If there are multiple levels of filtered leaf readers then with the unwrap() method it immediately
|
|
||||||
// returns the most inner leaf reader and thus skipping of over any other filtered leaf reader that
|
|
||||||
// may be instance of ElasticsearchLeafReader. This can cause us to miss the shardId.
|
|
||||||
return getElasticsearchDirectoryReader(((FilterDirectoryReader) reader).getDelegate());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.common.cache.*;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||||
|
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.MemorySizeValue;
|
import org.elasticsearch.common.unit.MemorySizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
@ -248,7 +249,7 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis
|
||||||
if (!registeredClosedListeners.containsKey(cleanupKey)) {
|
if (!registeredClosedListeners.containsKey(cleanupKey)) {
|
||||||
Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE);
|
Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE);
|
||||||
if (previous == null) {
|
if (previous == null) {
|
||||||
context.searcher().getIndexReader().addReaderClosedListener(cleanupKey);
|
ElasticsearchDirectoryReader.addReaderCloseListener(context.searcher().getIndexReader(), cleanupKey);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.elasticsearch.common.cache.RemovalNotification;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.logging.ESLogger;
|
import org.elasticsearch.common.logging.ESLogger;
|
||||||
|
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
@ -172,7 +173,7 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
|
||||||
final Key key = new Key(this, indexReader.getCoreCacheKey(), shardId);
|
final Key key = new Key(this, indexReader.getCoreCacheKey(), shardId);
|
||||||
//noinspection unchecked
|
//noinspection unchecked
|
||||||
final Accountable accountable = cache.computeIfAbsent(key, k -> {
|
final Accountable accountable = cache.computeIfAbsent(key, k -> {
|
||||||
indexReader.addReaderClosedListener(IndexFieldCache.this);
|
ElasticsearchDirectoryReader.addReaderCloseListener(indexReader, IndexFieldCache.this);
|
||||||
for (Listener listener : this.listeners) {
|
for (Listener listener : this.listeners) {
|
||||||
k.listeners.add(listener);
|
k.listeners.add(listener);
|
||||||
}
|
}
|
||||||
|
|
|
@ -80,7 +80,7 @@ public class VersionsTests extends ESTestCase {
|
||||||
public void testVersions() throws Exception {
|
public void testVersions() throws Exception {
|
||||||
Directory dir = newDirectory();
|
Directory dir = newDirectory();
|
||||||
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
|
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
|
||||||
DirectoryReader directoryReader = DirectoryReader.open(writer, true);
|
DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1));
|
||||||
MatcherAssert.assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND));
|
MatcherAssert.assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND));
|
||||||
|
|
||||||
Document doc = new Document();
|
Document doc = new Document();
|
||||||
|
@ -148,7 +148,7 @@ public class VersionsTests extends ESTestCase {
|
||||||
docs.add(doc);
|
docs.add(doc);
|
||||||
|
|
||||||
writer.updateDocuments(new Term(UidFieldMapper.NAME, "1"), docs);
|
writer.updateDocuments(new Term(UidFieldMapper.NAME, "1"), docs);
|
||||||
DirectoryReader directoryReader = DirectoryReader.open(writer, true);
|
DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1));
|
||||||
assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(5l));
|
assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(5l));
|
||||||
assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(5l));
|
assertThat(Versions.loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(5l));
|
||||||
|
|
||||||
|
@ -174,7 +174,7 @@ public class VersionsTests extends ESTestCase {
|
||||||
Directory dir = newDirectory();
|
Directory dir = newDirectory();
|
||||||
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
|
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
|
||||||
|
|
||||||
DirectoryReader directoryReader = DirectoryReader.open(writer, true);
|
DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1));
|
||||||
MatcherAssert.assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND));
|
MatcherAssert.assertThat(Versions.loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND));
|
||||||
|
|
||||||
Document doc = new Document();
|
Document doc = new Document();
|
||||||
|
@ -286,7 +286,7 @@ public class VersionsTests extends ESTestCase {
|
||||||
|
|
||||||
// Force merge and check versions
|
// Force merge and check versions
|
||||||
iw.forceMerge(1, true);
|
iw.forceMerge(1, true);
|
||||||
final LeafReader ir = SlowCompositeReaderWrapper.wrap(DirectoryReader.open(iw.getDirectory()));
|
final LeafReader ir = SlowCompositeReaderWrapper.wrap(ElasticsearchDirectoryReader.wrap(DirectoryReader.open(iw.getDirectory()), new ShardId("foo", 1)));
|
||||||
final NumericDocValues versions = ir.getNumericDocValues(VersionFieldMapper.NAME);
|
final NumericDocValues versions = ir.getNumericDocValues(VersionFieldMapper.NAME);
|
||||||
assertThat(versions, notNullValue());
|
assertThat(versions, notNullValue());
|
||||||
for (int i = 0; i < ir.maxDoc(); ++i) {
|
for (int i = 0; i < ir.maxDoc(); ++i) {
|
||||||
|
|
|
@ -54,7 +54,7 @@ public class IndexModuleTests extends ModuleTestCase {
|
||||||
assertInstanceBinding(module, IndexMetaData.class, (x) -> x == meta);
|
assertInstanceBinding(module, IndexMetaData.class, (x) -> x == meta);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final class Wrapper implements IndexSearcherWrapper {
|
public static final class Wrapper extends IndexSearcherWrapper {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DirectoryReader wrap(DirectoryReader reader) {
|
public DirectoryReader wrap(DirectoryReader reader) {
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.lucene.document.StringField;
|
||||||
import org.apache.lucene.index.*;
|
import org.apache.lucene.index.*;
|
||||||
import org.apache.lucene.search.Filter;
|
import org.apache.lucene.search.Filter;
|
||||||
import org.apache.lucene.store.RAMDirectory;
|
import org.apache.lucene.store.RAMDirectory;
|
||||||
|
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.index.IndexService;
|
import org.elasticsearch.index.IndexService;
|
||||||
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
|
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
|
||||||
|
@ -35,6 +36,7 @@ import org.elasticsearch.index.mapper.Mapper.BuilderContext;
|
||||||
import org.elasticsearch.index.mapper.MapperBuilders;
|
import org.elasticsearch.index.mapper.MapperBuilders;
|
||||||
import org.elasticsearch.index.mapper.MapperService;
|
import org.elasticsearch.index.mapper.MapperService;
|
||||||
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
|
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
|
||||||
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
|
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
|
||||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -112,7 +114,7 @@ public abstract class AbstractFieldDataTestCase extends ESSingleNodeTestCase {
|
||||||
if (readerContext != null) {
|
if (readerContext != null) {
|
||||||
readerContext.reader().close();
|
readerContext.reader().close();
|
||||||
}
|
}
|
||||||
topLevelReader = DirectoryReader.open(writer, true);
|
topLevelReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1));
|
||||||
LeafReader reader = SlowCompositeReaderWrapper.wrap(topLevelReader);
|
LeafReader reader = SlowCompositeReaderWrapper.wrap(topLevelReader);
|
||||||
readerContext = reader.getContext();
|
readerContext = reader.getContext();
|
||||||
return readerContext;
|
return readerContext;
|
||||||
|
|
|
@ -0,0 +1,172 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.index.shard;
|
||||||
|
|
||||||
|
import org.apache.lucene.document.Document;
|
||||||
|
import org.apache.lucene.document.Field;
|
||||||
|
import org.apache.lucene.document.StringField;
|
||||||
|
import org.apache.lucene.document.TextField;
|
||||||
|
import org.apache.lucene.index.*;
|
||||||
|
import org.apache.lucene.search.IndexSearcher;
|
||||||
|
import org.apache.lucene.search.QueryCachingPolicy;
|
||||||
|
import org.apache.lucene.search.TermQuery;
|
||||||
|
import org.apache.lucene.search.TopDocs;
|
||||||
|
import org.apache.lucene.search.similarities.DefaultSimilarity;
|
||||||
|
import org.apache.lucene.store.Directory;
|
||||||
|
import org.apache.lucene.util.IOUtils;
|
||||||
|
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.index.engine.Engine;
|
||||||
|
import org.elasticsearch.index.engine.EngineConfig;
|
||||||
|
import org.elasticsearch.index.engine.EngineException;
|
||||||
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class IndexSearcherWrapperTests extends ESTestCase {
|
||||||
|
private static final EngineConfig ENGINE_CONFIG = new EngineConfig(null, null, null, Settings.EMPTY, null, null, null, null, null, null, new DefaultSimilarity(), null, null, null, null, QueryCachingPolicy.ALWAYS_CACHE, null);
|
||||||
|
|
||||||
|
public void testReaderCloseListenerIsCalled() throws IOException {
|
||||||
|
Directory dir = newDirectory();
|
||||||
|
IndexWriterConfig iwc = newIndexWriterConfig();
|
||||||
|
IndexWriter writer = new IndexWriter(dir, iwc);
|
||||||
|
Document doc = new Document();
|
||||||
|
doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
|
||||||
|
doc.add(new TextField("field", "doc", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
|
||||||
|
writer.addDocument(doc);
|
||||||
|
DirectoryReader open = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1));
|
||||||
|
IndexSearcher searcher = new IndexSearcher(open);
|
||||||
|
assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits);
|
||||||
|
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {
|
||||||
|
@Override
|
||||||
|
public DirectoryReader wrap(DirectoryReader reader) throws IOException {
|
||||||
|
return new FieldMaskingReader("field", reader);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws EngineException {
|
||||||
|
return searcher;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
final int sourceRefCount = open.getRefCount();
|
||||||
|
final AtomicInteger count = new AtomicInteger();
|
||||||
|
final AtomicInteger outerCount = new AtomicInteger();
|
||||||
|
try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) {
|
||||||
|
final Engine.Searcher wrap = wrapper.wrap(ENGINE_CONFIG, engineSearcher);
|
||||||
|
assertEquals(1, wrap.reader().getRefCount());
|
||||||
|
ElasticsearchDirectoryReader.addReaderCloseListener(wrap.reader(), reader -> {
|
||||||
|
if (reader == open) {
|
||||||
|
count.incrementAndGet();
|
||||||
|
}
|
||||||
|
outerCount.incrementAndGet();
|
||||||
|
});
|
||||||
|
assertEquals(0, wrap.searcher().search(new TermQuery(new Term("field", "doc")), 1).totalHits);
|
||||||
|
wrap.close();
|
||||||
|
assertFalse("wrapped reader is closed", wrap.reader().tryIncRef());
|
||||||
|
assertEquals(sourceRefCount, open.getRefCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
IOUtils.close(open, writer, dir);
|
||||||
|
assertEquals(1, outerCount.get());
|
||||||
|
assertEquals(1, count.get());
|
||||||
|
assertEquals(0, open.getRefCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testIsCacheable() throws IOException {
|
||||||
|
Directory dir = newDirectory();
|
||||||
|
IndexWriterConfig iwc = newIndexWriterConfig();
|
||||||
|
IndexWriter writer = new IndexWriter(dir, iwc);
|
||||||
|
Document doc = new Document();
|
||||||
|
doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
|
||||||
|
doc.add(new TextField("field", "doc", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
|
||||||
|
writer.addDocument(doc);
|
||||||
|
DirectoryReader open = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1));
|
||||||
|
IndexSearcher searcher = new IndexSearcher(open);
|
||||||
|
assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits);
|
||||||
|
searcher.setSimilarity(iwc.getSimilarity());
|
||||||
|
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {
|
||||||
|
@Override
|
||||||
|
public DirectoryReader wrap(DirectoryReader reader) throws IOException {
|
||||||
|
return new FieldMaskingReader("field", reader);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws EngineException {
|
||||||
|
return searcher;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
final ConcurrentHashMap<Object, TopDocs> cache = new ConcurrentHashMap<>();
|
||||||
|
try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) {
|
||||||
|
try (final Engine.Searcher wrap = wrapper.wrap(ENGINE_CONFIG, engineSearcher)) {
|
||||||
|
ElasticsearchDirectoryReader.addReaderCloseListener(wrap.reader(), reader -> {
|
||||||
|
cache.remove(reader.getCoreCacheKey());
|
||||||
|
});
|
||||||
|
TopDocs search = wrap.searcher().search(new TermQuery(new Term("field", "doc")), 1);
|
||||||
|
cache.put(wrap.reader().getCoreCacheKey(), search);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(1, cache.size());
|
||||||
|
IOUtils.close(open, writer, dir);
|
||||||
|
assertEquals(0, cache.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testNoWrap() throws IOException {
|
||||||
|
Directory dir = newDirectory();
|
||||||
|
IndexWriterConfig iwc = newIndexWriterConfig();
|
||||||
|
IndexWriter writer = new IndexWriter(dir, iwc);
|
||||||
|
Document doc = new Document();
|
||||||
|
doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
|
||||||
|
doc.add(new TextField("field", "doc", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
|
||||||
|
writer.addDocument(doc);
|
||||||
|
DirectoryReader open = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1));
|
||||||
|
IndexSearcher searcher = new IndexSearcher(open);
|
||||||
|
assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits);
|
||||||
|
searcher.setSimilarity(iwc.getSimilarity());
|
||||||
|
IndexSearcherWrapper wrapper = new IndexSearcherWrapper();
|
||||||
|
try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) {
|
||||||
|
final Engine.Searcher wrap = wrapper.wrap(ENGINE_CONFIG, engineSearcher);
|
||||||
|
assertSame(wrap, engineSearcher);
|
||||||
|
}
|
||||||
|
IOUtils.close(open, writer, dir);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class FieldMaskingReader extends FilterDirectoryReader {
|
||||||
|
private final String field;
|
||||||
|
public FieldMaskingReader(String field, DirectoryReader in) throws IOException {
|
||||||
|
super(in, new SubReaderWrapper() {
|
||||||
|
@Override
|
||||||
|
public LeafReader wrap(LeafReader reader) {
|
||||||
|
return new FieldFilterLeafReader(reader, Collections.singleton(field), true);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
this.field = field;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
|
||||||
|
return new FieldMaskingReader(field, in);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -945,7 +945,8 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
||||||
getResult = newShard.get(new Engine.Get(false, new Term(UidFieldMapper.NAME, Uid.createUid("test", "1"))));
|
getResult = newShard.get(new Engine.Get(false, new Term(UidFieldMapper.NAME, Uid.createUid("test", "1"))));
|
||||||
assertTrue(getResult.exists());
|
assertTrue(getResult.exists());
|
||||||
assertNotNull(getResult.searcher()); // make sure get uses the wrapped reader
|
assertNotNull(getResult.searcher()); // make sure get uses the wrapped reader
|
||||||
assertTrue(getResult.searcher().reader() instanceof FieldMaskingReader);
|
assertTrue(getResult.searcher().reader() instanceof FilterDirectoryReader);
|
||||||
|
assertTrue(((FilterDirectoryReader)getResult.searcher().reader()).getDelegate() instanceof FieldMaskingReader);
|
||||||
getResult.release();
|
getResult.release();
|
||||||
newShard.close("just do it", randomBoolean());
|
newShard.close("just do it", randomBoolean());
|
||||||
}
|
}
|
||||||
|
|
|
@ -90,3 +90,4 @@ java.net.InetSocketAddress#getHostName() @ Use getHostString() instead, which av
|
||||||
@defaultMessage Do not violate java's access system
|
@defaultMessage Do not violate java's access system
|
||||||
java.lang.reflect.AccessibleObject#setAccessible(boolean)
|
java.lang.reflect.AccessibleObject#setAccessible(boolean)
|
||||||
java.lang.reflect.AccessibleObject#setAccessible(java.lang.reflect.AccessibleObject[], boolean)
|
java.lang.reflect.AccessibleObject#setAccessible(java.lang.reflect.AccessibleObject[], boolean)
|
||||||
|
|
||||||
|
|
|
@ -36,6 +36,10 @@ org.apache.lucene.index.IndexReader#decRef()
|
||||||
org.apache.lucene.index.IndexReader#incRef()
|
org.apache.lucene.index.IndexReader#incRef()
|
||||||
org.apache.lucene.index.IndexReader#tryIncRef()
|
org.apache.lucene.index.IndexReader#tryIncRef()
|
||||||
|
|
||||||
|
@defaultMessage Close listeners can only installed via ElasticsearchDirectoryReader#addReaderCloseListener
|
||||||
|
org.apache.lucene.index.IndexReader#addReaderClosedListener(org.apache.lucene.index.IndexReader$ReaderClosedListener)
|
||||||
|
org.apache.lucene.index.IndexReader#removeReaderClosedListener(org.apache.lucene.index.IndexReader$ReaderClosedListener)
|
||||||
|
|
||||||
@defaultMessage Pass the precision step from the mappings explicitly instead
|
@defaultMessage Pass the precision step from the mappings explicitly instead
|
||||||
org.apache.lucene.search.NumericRangeQuery#newDoubleRange(java.lang.String,java.lang.Double,java.lang.Double,boolean,boolean)
|
org.apache.lucene.search.NumericRangeQuery#newDoubleRange(java.lang.String,java.lang.Double,java.lang.Double,boolean,boolean)
|
||||||
org.apache.lucene.search.NumericRangeQuery#newFloatRange(java.lang.String,java.lang.Float,java.lang.Float,boolean,boolean)
|
org.apache.lucene.search.NumericRangeQuery#newFloatRange(java.lang.String,java.lang.Float,java.lang.Float,boolean,boolean)
|
||||||
|
|
Loading…
Reference in New Issue