enforce that wrappers delegate core cache key and ban getCombinedCoreAndDeletesKey() entirely
This commit is contained in:
parent
ec60018e34
commit
cac073dafa
|
@ -108,6 +108,4 @@ public final class ElasticsearchDirectoryReader extends FilterDirectoryReader {
|
|||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -52,11 +52,6 @@ public final class ElasticsearchLeafReader extends FilterLeafReader {
|
|||
return in.getCoreCacheKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getCombinedCoreAndDeletesKey() {
|
||||
return in.getCombinedCoreAndDeletesKey();
|
||||
}
|
||||
|
||||
public static ElasticsearchLeafReader getElasticsearchLeafReader(LeafReader reader) {
|
||||
if (reader instanceof FilterLeafReader) {
|
||||
if (reader instanceof ElasticsearchLeafReader) {
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.lucene.index.FilterDirectoryReader;
|
|||
import org.apache.lucene.index.LeafReader;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.EngineConfig;
|
||||
|
@ -67,8 +66,20 @@ public class IndexSearcherWrapper {
|
|||
if (elasticsearchDirectoryReader == null) {
|
||||
throw new IllegalStateException("Can't wrap non elasticsearch directory reader");
|
||||
}
|
||||
DirectoryReader reader = wrap(engineSearcher.getDirectoryReader());
|
||||
IndexSearcher innerIndexSearcher = new IndexSearcher(new CacheFriendlyReaderWrapper(reader, elasticsearchDirectoryReader));
|
||||
NonClosingReaderWrapper nonClosingReaderWrapper = new NonClosingReaderWrapper(engineSearcher.getDirectoryReader());
|
||||
DirectoryReader reader = wrap(nonClosingReaderWrapper);
|
||||
if (reader != nonClosingReaderWrapper) {
|
||||
if (reader.getCoreCacheKey() != elasticsearchDirectoryReader.getCoreCacheKey()) {
|
||||
throw new IllegalStateException("wrapped directory reader doesn't delegate IndexReader#getCoreCacheKey, wrappers must override this method and delegate" +
|
||||
" to the original readers core cache key. Wrapped readers can't used as cache keys since their are used only per request which would lead to subtile bugs");
|
||||
}
|
||||
if (ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(reader) != elasticsearchDirectoryReader) {
|
||||
// prevent that somebody wraps with a non-filter reader
|
||||
throw new IllegalStateException("wrapped directory reader hides actual ElasticsearchDirectoryReader but shouldn't");
|
||||
}
|
||||
}
|
||||
|
||||
IndexSearcher innerIndexSearcher = new IndexSearcher(reader);
|
||||
innerIndexSearcher.setQueryCache(engineConfig.getQueryCache());
|
||||
innerIndexSearcher.setQueryCachingPolicy(engineConfig.getQueryCachingPolicy());
|
||||
innerIndexSearcher.setSimilarity(engineConfig.getSimilarity());
|
||||
|
@ -76,14 +87,16 @@ public class IndexSearcherWrapper {
|
|||
// For example if IndexSearcher#rewrite() is overwritten than also IndexSearcher#createNormalizedWeight needs to be overwritten
|
||||
// This needs to be fixed before we can allow the IndexSearcher from Engine to be wrapped multiple times
|
||||
IndexSearcher indexSearcher = wrap(engineConfig, innerIndexSearcher);
|
||||
if (reader == engineSearcher.reader() && indexSearcher == innerIndexSearcher) {
|
||||
if (reader == nonClosingReaderWrapper && indexSearcher == innerIndexSearcher) {
|
||||
return engineSearcher;
|
||||
} else {
|
||||
Engine.Searcher newSearcher = new Engine.Searcher(engineSearcher.source(), indexSearcher) {
|
||||
final Engine.Searcher newSearcher = new Engine.Searcher(engineSearcher.source(), indexSearcher) {
|
||||
@Override
|
||||
public void close() throws ElasticsearchException {
|
||||
try {
|
||||
reader().close();
|
||||
// we close the reader to make sure wrappers can release resources if needed....
|
||||
// our NonClosingReaderWrapper makes sure that our reader is not closed
|
||||
} catch (IOException e) {
|
||||
throw new ElasticsearchException("failed to close reader", e);
|
||||
} finally {
|
||||
|
@ -92,28 +105,24 @@ public class IndexSearcherWrapper {
|
|||
|
||||
}
|
||||
};
|
||||
// TODO should this be a real exception? this checks that our wrapper doesn't wrap in it's own ElasticsearchDirectoryReader
|
||||
assert ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(newSearcher.getDirectoryReader()) == elasticsearchDirectoryReader : "Wrapper hides actual ElasticsearchDirectoryReader but shouldn't";
|
||||
return newSearcher;
|
||||
}
|
||||
}
|
||||
|
||||
final class CacheFriendlyReaderWrapper extends FilterDirectoryReader {
|
||||
private final ElasticsearchDirectoryReader elasticsearchReader;
|
||||
final class NonClosingReaderWrapper extends FilterDirectoryReader {
|
||||
|
||||
private CacheFriendlyReaderWrapper(DirectoryReader in, ElasticsearchDirectoryReader elasticsearchReader) throws IOException {
|
||||
private NonClosingReaderWrapper(DirectoryReader in) throws IOException {
|
||||
super(in, new SubReaderWrapper() {
|
||||
@Override
|
||||
public LeafReader wrap(LeafReader reader) {
|
||||
return reader;
|
||||
}
|
||||
});
|
||||
this.elasticsearchReader = elasticsearchReader;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
|
||||
return new CacheFriendlyReaderWrapper(in, elasticsearchReader);
|
||||
return new NonClosingReaderWrapper(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -123,8 +132,7 @@ public class IndexSearcherWrapper {
|
|||
|
||||
@Override
|
||||
public Object getCoreCacheKey() {
|
||||
// this is important = we always use the ES reader core cache key on top level
|
||||
return elasticsearchReader.getCoreCacheKey();
|
||||
return in.getCoreCacheKey();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -67,10 +67,6 @@ public class ESDirectoryReaderTests extends ESTestCase {
|
|||
assertEquals(1, ir2.numDocs());
|
||||
assertEquals(1, ir2.leaves().size());
|
||||
assertSame(ir.leaves().get(0).reader().getCoreCacheKey(), ir2.leaves().get(0).reader().getCoreCacheKey());
|
||||
|
||||
// this is kind of stupid, but for now its here
|
||||
assertNotSame(ir.leaves().get(0).reader().getCombinedCoreAndDeletesKey(), ir2.leaves().get(0).reader().getCombinedCoreAndDeletesKey());
|
||||
|
||||
IOUtils.close(ir, ir2, iw, dir);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,23 +58,24 @@ public class IndexSearcherWrapperTests extends ESTestCase {
|
|||
DirectoryReader open = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1));
|
||||
IndexSearcher searcher = new IndexSearcher(open);
|
||||
assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits);
|
||||
final AtomicInteger closeCalls = new AtomicInteger(0);
|
||||
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {
|
||||
@Override
|
||||
public DirectoryReader wrap(DirectoryReader reader) throws IOException {
|
||||
return new FieldMaskingReader("field", reader);
|
||||
return new FieldMaskingReader("field", reader, closeCalls);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws EngineException {
|
||||
return searcher;
|
||||
}
|
||||
|
||||
};
|
||||
final int sourceRefCount = open.getRefCount();
|
||||
final AtomicInteger count = new AtomicInteger();
|
||||
final AtomicInteger outerCount = new AtomicInteger();
|
||||
try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) {
|
||||
// sometimes double wrap....
|
||||
final Engine.Searcher wrap = randomBoolean() ? wrapper.wrap(ENGINE_CONFIG, engineSearcher) : wrapper.wrap(ENGINE_CONFIG, wrapper.wrap(ENGINE_CONFIG, engineSearcher));
|
||||
final Engine.Searcher wrap = wrapper.wrap(ENGINE_CONFIG, engineSearcher);
|
||||
assertEquals(1, wrap.reader().getRefCount());
|
||||
ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), reader -> {
|
||||
if (reader == open) {
|
||||
|
@ -87,11 +88,13 @@ public class IndexSearcherWrapperTests extends ESTestCase {
|
|||
assertFalse("wrapped reader is closed", wrap.reader().tryIncRef());
|
||||
assertEquals(sourceRefCount, open.getRefCount());
|
||||
}
|
||||
assertEquals(1, closeCalls.get());
|
||||
|
||||
IOUtils.close(open, writer, dir);
|
||||
assertEquals(1, outerCount.get());
|
||||
assertEquals(1, count.get());
|
||||
assertEquals(0, open.getRefCount());
|
||||
assertEquals(1, closeCalls.get());
|
||||
}
|
||||
|
||||
public void testIsCacheable() throws IOException {
|
||||
|
@ -106,10 +109,11 @@ public class IndexSearcherWrapperTests extends ESTestCase {
|
|||
IndexSearcher searcher = new IndexSearcher(open);
|
||||
assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits);
|
||||
searcher.setSimilarity(iwc.getSimilarity());
|
||||
final AtomicInteger closeCalls = new AtomicInteger(0);
|
||||
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {
|
||||
@Override
|
||||
public DirectoryReader wrap(DirectoryReader reader) throws IOException {
|
||||
return new FieldMaskingReader("field", reader);
|
||||
return new FieldMaskingReader("field", reader, closeCalls);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -127,10 +131,12 @@ public class IndexSearcherWrapperTests extends ESTestCase {
|
|||
cache.put(wrap.reader().getCoreCacheKey(), search);
|
||||
}
|
||||
}
|
||||
assertEquals(1, closeCalls.get());
|
||||
|
||||
assertEquals(1, cache.size());
|
||||
IOUtils.close(open, writer, dir);
|
||||
assertEquals(0, cache.size());
|
||||
assertEquals(1, closeCalls.get());
|
||||
}
|
||||
|
||||
public void testNoWrap() throws IOException {
|
||||
|
@ -153,21 +159,119 @@ public class IndexSearcherWrapperTests extends ESTestCase {
|
|||
IOUtils.close(open, writer, dir);
|
||||
}
|
||||
|
||||
public void testWrappedReaderMustDelegateCoreCacheKey() throws IOException {
|
||||
Directory dir = newDirectory();
|
||||
IndexWriterConfig iwc = newIndexWriterConfig();
|
||||
IndexWriter writer = new IndexWriter(dir, iwc);
|
||||
Document doc = new Document();
|
||||
doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
|
||||
doc.add(new TextField("field", "doc", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
|
||||
writer.addDocument(doc);
|
||||
DirectoryReader open = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", 1));
|
||||
IndexSearcher searcher = new IndexSearcher(open);
|
||||
assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits);
|
||||
searcher.setSimilarity(iwc.getSimilarity());
|
||||
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {
|
||||
@Override
|
||||
protected DirectoryReader wrap(DirectoryReader reader) throws IOException {
|
||||
return new BrokenWrapper(reader, false);
|
||||
}
|
||||
};
|
||||
try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) {
|
||||
try {
|
||||
wrapper.wrap(ENGINE_CONFIG, engineSearcher);
|
||||
fail("reader must delegate cache key");
|
||||
} catch (IllegalStateException ex) {
|
||||
// all is well
|
||||
}
|
||||
}
|
||||
wrapper = new IndexSearcherWrapper() {
|
||||
@Override
|
||||
protected DirectoryReader wrap(DirectoryReader reader) throws IOException {
|
||||
return new BrokenWrapper(reader, true);
|
||||
}
|
||||
};
|
||||
try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) {
|
||||
try {
|
||||
wrapper.wrap(ENGINE_CONFIG, engineSearcher);
|
||||
fail("reader must delegate cache key");
|
||||
} catch (IllegalStateException ex) {
|
||||
// all is well
|
||||
}
|
||||
}
|
||||
IOUtils.close(open, writer, dir);
|
||||
}
|
||||
|
||||
private static class FieldMaskingReader extends FilterDirectoryReader {
|
||||
private final String field;
|
||||
public FieldMaskingReader(String field, DirectoryReader in) throws IOException {
|
||||
private final AtomicInteger closeCalls;
|
||||
|
||||
public FieldMaskingReader(String field, DirectoryReader in, AtomicInteger closeCalls) throws IOException {
|
||||
super(in, new SubReaderWrapper() {
|
||||
@Override
|
||||
public LeafReader wrap(LeafReader reader) {
|
||||
return new FieldFilterLeafReader(reader, Collections.singleton(field), true);
|
||||
}
|
||||
});
|
||||
this.closeCalls = closeCalls;
|
||||
this.field = field;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
|
||||
return new FieldMaskingReader(field, in);
|
||||
return new FieldMaskingReader(field, in, closeCalls);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getCoreCacheKey() {
|
||||
return in.getCoreCacheKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() throws IOException {
|
||||
super.doClose();
|
||||
closeCalls.incrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
private static class BrokenWrapper extends FilterDirectoryReader {
|
||||
|
||||
private final boolean hideDelegate;
|
||||
|
||||
public BrokenWrapper(DirectoryReader in, boolean hideDelegate) throws IOException {
|
||||
super(in, new SubReaderWrapper() {
|
||||
@Override
|
||||
public LeafReader wrap(LeafReader reader) {
|
||||
return reader;
|
||||
}
|
||||
});
|
||||
this.hideDelegate = hideDelegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
|
||||
return new BrokenWrapper(in, hideDelegate);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DirectoryReader getDelegate() {
|
||||
if (hideDelegate) {
|
||||
try {
|
||||
return ElasticsearchDirectoryReader.wrap(super.getDelegate(), new ShardId("foo", 1));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
return super.getDelegate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getCoreCacheKey() {
|
||||
if (hideDelegate == false) {
|
||||
return super.getCoreCacheKey();
|
||||
} else {
|
||||
return in.getCoreCacheKey();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -88,6 +88,7 @@ import java.nio.file.Files;
|
|||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
|
@ -928,27 +929,26 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
|
||||
IndexServicesProvider newProvider = new IndexServicesProvider(indexServices.getIndicesLifecycle(), indexServices.getThreadPool(), indexServices.getMapperService(), indexServices.getQueryParserService(), indexServices.getIndexCache(), indexServices.getIndicesQueryCache(), indexServices.getCodecService(), indexServices.getTermVectorsService(), indexServices.getIndexFieldDataService(), indexServices.getWarmer(), indexServices.getSimilarityService(), indexServices.getFactory(), indexServices.getBigArrays(), wrapper, indexServices.getIndexingMemoryController());
|
||||
IndexShard newShard = new IndexShard(shard.shardId(), shard.indexSettings, shard.shardPath(), shard.store(), newProvider);
|
||||
|
||||
ShardRoutingHelper.reinit(routing);
|
||||
newShard.updateRoutingEntry(routing, false);
|
||||
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
|
||||
assertTrue(newShard.recoverFromStore(routing, localNode));
|
||||
routing = new ShardRouting(routing);
|
||||
ShardRoutingHelper.moveToStarted(routing);
|
||||
newShard.updateRoutingEntry(routing, true);
|
||||
try (Engine.Searcher searcher = newShard.acquireSearcher("test")) {
|
||||
TopDocs search = searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10);
|
||||
assertEquals(search.totalHits, 0);
|
||||
search = searcher.searcher().search(new TermQuery(new Term("foobar", "bar")), 10);
|
||||
assertEquals(search.totalHits, 1);
|
||||
}
|
||||
getResult = newShard.get(new Engine.Get(false, new Term(UidFieldMapper.NAME, Uid.createUid("test", "1"))));
|
||||
assertTrue(getResult.exists());
|
||||
assertNotNull(getResult.searcher()); // make sure get uses the wrapped reader
|
||||
assertTrue(getResult.searcher().reader() instanceof FilterDirectoryReader);
|
||||
assertTrue(((FilterDirectoryReader) getResult.searcher().reader()).getDelegate() instanceof FieldMaskingReader);
|
||||
getResult.release();
|
||||
try {
|
||||
ShardRoutingHelper.reinit(routing);
|
||||
newShard.updateRoutingEntry(routing, false);
|
||||
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
|
||||
assertTrue(newShard.recoverFromStore(routing, localNode));
|
||||
routing = new ShardRouting(routing);
|
||||
ShardRoutingHelper.moveToStarted(routing);
|
||||
newShard.updateRoutingEntry(routing, true);
|
||||
try (Engine.Searcher searcher = newShard.acquireSearcher("test")) {
|
||||
TopDocs search = searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10);
|
||||
assertEquals(search.totalHits, 0);
|
||||
search = searcher.searcher().search(new TermQuery(new Term("foobar", "bar")), 10);
|
||||
assertEquals(search.totalHits, 1);
|
||||
}
|
||||
getResult = newShard.get(new Engine.Get(false, new Term(UidFieldMapper.NAME, Uid.createUid("test", "1"))));
|
||||
assertTrue(getResult.exists());
|
||||
assertNotNull(getResult.searcher()); // make sure get uses the wrapped reader
|
||||
assertTrue(getResult.searcher().reader() instanceof FieldMaskingReader);
|
||||
getResult.release();
|
||||
|
||||
// test global ordinals are evicted
|
||||
MappedFieldType foo = newShard.mapperService().indexName("foo");
|
||||
IndexFieldData.Global ifd = shard.indexFieldDataService().getForField(foo);
|
||||
|
@ -967,6 +967,9 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
newShard.refresh("test");
|
||||
assertEquals(shard.fieldData().stats("foo").getMemorySizeInBytes(), before.getMemorySizeInBytes());
|
||||
assertEquals(shard.fieldData().stats("foo").getEvictions(), before.getEvictions());
|
||||
} catch (Throwable t) {
|
||||
t.printStackTrace();
|
||||
throw t;
|
||||
} finally {
|
||||
newShard.close("just do it", randomBoolean());
|
||||
}
|
||||
|
@ -981,17 +984,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
private final String filteredField = field;
|
||||
@Override
|
||||
public LeafReader wrap(LeafReader reader) {
|
||||
return new FilterLeafReader(reader) {
|
||||
@Override
|
||||
public Fields fields() throws IOException {
|
||||
return new FilterFields(super.fields()) {
|
||||
@Override
|
||||
public Terms terms(String field) throws IOException {
|
||||
return filteredField.equals(field) ? null : super.terms(field);
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
return new FieldFilterLeafReader(reader, Collections.singleton(field), true);
|
||||
}
|
||||
});
|
||||
this.field = field;
|
||||
|
@ -1002,5 +995,10 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
|
||||
return new FieldMaskingReader(field, in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getCoreCacheKey() {
|
||||
return in.getCoreCacheKey();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -169,11 +169,6 @@ public final class MockEngineSupport {
|
|||
return in.getCoreCacheKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getCombinedCoreAndDeletesKey() {
|
||||
return in.getCombinedCoreAndDeletesKey();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public Engine.Searcher wrapSearcher(String source, Engine.Searcher engineSearcher, IndexSearcher searcher, SearcherManager manager) {
|
||||
|
|
|
@ -91,3 +91,5 @@ java.net.InetSocketAddress#getHostName() @ Use getHostString() instead, which av
|
|||
java.lang.reflect.AccessibleObject#setAccessible(boolean)
|
||||
java.lang.reflect.AccessibleObject#setAccessible(java.lang.reflect.AccessibleObject[], boolean)
|
||||
|
||||
@defaultMessage this should not have been added to lucene in the first place
|
||||
org.apache.lucene.index.IndexReader#getCombinedCoreAndDeletesKey()
|
Loading…
Reference in New Issue