Fold EngineSearcher into Engine.Searcher (#34082)

EngineSearcher can be easily folded into Engine.Searcher which removes
a level of inheritance that is necessary for most of it's subclasses.
This change folds it into Engine.Searcher and removes the dependency on
ReferenceManager.
This commit is contained in:
Simon Willnauer 2018-09-27 09:06:04 +02:00 committed by GitHub
parent 0301062c6e
commit bda7bc145b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 60 additions and 110 deletions

View File

@ -43,6 +43,7 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Accountables;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.index.IndexRequest;
@ -663,7 +664,15 @@ public abstract class Engine implements Closeable {
}
Releasable releasable = store::decRef;
try {
EngineSearcher engineSearcher = new EngineSearcher(source, getReferenceManager(scope), store, logger);
ReferenceManager<IndexSearcher> referenceManager = getReferenceManager(scope);
Searcher engineSearcher = new Searcher(source, referenceManager.acquire(),
s -> {
try {
referenceManager.release(s);
} finally {
store.decRef();
}
}, logger);
releasable = null; // success - hand over the reference to the engine searcher
return engineSearcher;
} catch (AlreadyClosedException ex) {
@ -1167,40 +1176,67 @@ public abstract class Engine implements Closeable {
}
public static class Searcher implements Releasable {
private final String source;
private final IndexSearcher searcher;
private final AtomicBoolean released = new AtomicBoolean(false);
private final Logger logger;
private final IOUtils.IOConsumer<IndexSearcher> onClose;
public Searcher(String source, IndexSearcher searcher) {
public Searcher(String source, IndexSearcher searcher, Logger logger) {
this(source, searcher, s -> s.getIndexReader().close(), logger);
}
public Searcher(String source, IndexSearcher searcher, IOUtils.IOConsumer<IndexSearcher> onClose, Logger logger) {
this.source = source;
this.searcher = searcher;
this.onClose = onClose;
this.logger = logger;
}
/**
* The source that caused this searcher to be acquired.
*/
public String source() {
public final String source() {
return source;
}
public IndexReader reader() {
public final IndexReader reader() {
return searcher.getIndexReader();
}
public DirectoryReader getDirectoryReader() {
public final DirectoryReader getDirectoryReader() {
if (reader() instanceof DirectoryReader) {
return (DirectoryReader) reader();
}
throw new IllegalStateException("Can't use " + reader().getClass() + " as a directory reader");
}
public IndexSearcher searcher() {
public final IndexSearcher searcher() {
return searcher;
}
@Override
public void close() {
// Nothing to close here
if (released.compareAndSet(false, true) == false) {
/* In general, searchers should never be released twice or this would break reference counting. There is one rare case
* when it might happen though: when the request and the Reaper thread would both try to release it in a very short amount
* of time, this is why we only log a warning instead of throwing an exception.
*/
logger.warn("Searcher was released twice", new IllegalStateException("Double release"));
return;
}
try {
onClose.accept(searcher());
} catch (IOException e) {
throw new IllegalStateException("Cannot close", e);
} catch (AlreadyClosedException e) {
// This means there's a bug somewhere: don't suppress it
throw new AssertionError(e);
}
}
public final Logger getLogger() {
return logger;
}
}

View File

@ -1,68 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.index.store.Store;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Searcher for an Engine
*/
final class EngineSearcher extends Engine.Searcher {
private final AtomicBoolean released = new AtomicBoolean(false);
private final Store store;
private final Logger logger;
private final ReferenceManager<IndexSearcher> referenceManager;
EngineSearcher(String source, ReferenceManager<IndexSearcher> searcherReferenceManager, Store store, Logger logger) throws IOException {
super(source, searcherReferenceManager.acquire());
this.store = store;
this.logger = logger;
this.referenceManager = searcherReferenceManager;
}
@Override
public void close() {
if (!released.compareAndSet(false, true)) {
/* In general, searchers should never be released twice or this would break reference counting. There is one rare case
* when it might happen though: when the request and the Reaper thread would both try to release it in a very short amount
* of time, this is why we only log a warning instead of throwing an exception.
*/
logger.warn("Searcher was released twice", new IllegalStateException("Double release"));
return;
}
try {
referenceManager.release(searcher());
} catch (IOException e) {
throw new IllegalStateException("Cannot close", e);
} catch (AlreadyClosedException e) {
// This means there's a bug somewhere: don't suppress it
throw new AssertionError(e);
} finally {
store.decRef();
}
}
}

View File

@ -606,7 +606,7 @@ public class InternalEngine extends Engine {
// in the case of a already pruned translog generation we might get null here - yet very unlikely
TranslogLeafReader reader = new TranslogLeafReader((Translog.Index) operation, engineConfig
.getIndexSettings().getIndexVersionCreated());
return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader)),
return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader), logger),
new VersionsAndSeqNoResolver.DocIdAndVersion(0, ((Translog.Index) operation).version(), reader, 0));
}
} catch (IOException e) {
@ -2085,7 +2085,7 @@ public class InternalEngine extends Engine {
if (warmer != null) {
try {
assert searcher.getIndexReader() instanceof ElasticsearchDirectoryReader : "this class needs an ElasticsearchDirectoryReader but got: " + searcher.getIndexReader().getClass();
warmer.warm(new Searcher("top_reader_warming", searcher));
warmer.warm(new Searcher("top_reader_warming", searcher, s -> {}, logger));
} catch (Exception e) {
if (isEngineClosed.get() == false) {
logger.warn("failed to prepare/warm", e);

View File

@ -24,8 +24,8 @@ import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.search.IndexSearcher;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.engine.Engine;
import java.io.IOException;
@ -97,21 +97,10 @@ public class IndexSearcherWrapper {
if (reader == nonClosingReaderWrapper && indexSearcher == innerIndexSearcher) {
return engineSearcher;
} else {
return new Engine.Searcher(engineSearcher.source(), indexSearcher) {
@Override
public void close() throws ElasticsearchException {
try {
reader().close();
// we close the reader to make sure wrappers can release resources if needed....
// our NonClosingReaderWrapper makes sure that our reader is not closed
} catch (IOException e) {
throw new ElasticsearchException("failed to close reader", e);
} finally {
engineSearcher.close();
}
}
};
// we close the reader to make sure wrappers can release resources if needed....
// our NonClosingReaderWrapper makes sure that our reader is not closed
return new Engine.Searcher(engineSearcher.source(), indexSearcher, s -> IOUtils.close(s.getIndexReader(), engineSearcher),
engineSearcher.getLogger());
}
}

View File

@ -73,7 +73,7 @@ public class IndexSearcherWrapperTests extends ESTestCase {
final int sourceRefCount = open.getRefCount();
final AtomicInteger count = new AtomicInteger();
final AtomicInteger outerCount = new AtomicInteger();
try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) {
try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher, s -> {}, logger)) {
final Engine.Searcher wrap = wrapper.wrap(engineSearcher);
assertEquals(1, wrap.reader().getRefCount());
ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), key -> {
@ -121,7 +121,7 @@ public class IndexSearcherWrapperTests extends ESTestCase {
}
};
final ConcurrentHashMap<Object, TopDocs> cache = new ConcurrentHashMap<>();
try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) {
try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher, s -> {}, logger)) {
try (Engine.Searcher wrap = wrapper.wrap(engineSearcher)) {
ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), key -> {
cache.remove(key);
@ -151,7 +151,7 @@ public class IndexSearcherWrapperTests extends ESTestCase {
assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits.value);
searcher.setSimilarity(iwc.getSimilarity());
IndexSearcherWrapper wrapper = new IndexSearcherWrapper();
try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher)) {
try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher, logger)) {
final Engine.Searcher wrap = wrapper.wrap(engineSearcher);
assertSame(wrap, engineSearcher);
}

View File

@ -110,7 +110,7 @@ public class DefaultSearchContextTests extends ESTestCase {
try (Directory dir = newDirectory();
RandomIndexWriter w = new RandomIndexWriter(random(), dir);
IndexReader reader = w.getReader();
Engine.Searcher searcher = new Engine.Searcher("test", new IndexSearcher(reader))) {
Engine.Searcher searcher = new Engine.Searcher("test", new IndexSearcher(reader), logger)) {
DefaultSearchContext context1 = new DefaultSearchContext(1L, shardSearchRequest, null, searcher, null, indexService,
indexShard, bigArrays, null, timeout, null, null, Version.CURRENT);

View File

@ -82,7 +82,7 @@ public class QueryProfilerTests extends ESTestCase {
}
reader = w.getReader();
w.close();
Engine.Searcher engineSearcher = new Engine.Searcher("test", new IndexSearcher(reader));
Engine.Searcher engineSearcher = new Engine.Searcher("test", new IndexSearcher(reader), null);
searcher = new ContextIndexSearcher(engineSearcher, IndexSearcher.getDefaultQueryCache(), MAYBE_CACHE_POLICY);
}
@ -363,7 +363,7 @@ public class QueryProfilerTests extends ESTestCase {
public void testApproximations() throws IOException {
QueryProfiler profiler = new QueryProfiler();
Engine.Searcher engineSearcher = new Engine.Searcher("test", new IndexSearcher(reader));
Engine.Searcher engineSearcher = new Engine.Searcher("test", new IndexSearcher(reader), logger);
// disable query caching since we want to test approximations, which won't
// be exposed on a cached entry
ContextIndexSearcher searcher = new ContextIndexSearcher(engineSearcher, null, MAYBE_CACHE_POLICY);

View File

@ -240,7 +240,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
}
protected SearchContext createSearchContext(IndexSearcher indexSearcher, IndexSettings indexSettings) {
Engine.Searcher searcher = new Engine.Searcher("aggregator_test", indexSearcher);
Engine.Searcher searcher = new Engine.Searcher("aggregator_test", indexSearcher, logger);
QueryCache queryCache = new DisabledQueryCache(indexSettings);
QueryCachingPolicy queryCachingPolicy = new QueryCachingPolicy() {
@Override

View File

@ -38,10 +38,8 @@ class AssertingSearcher extends Engine.Searcher {
private final Logger logger;
private final AtomicBoolean closed = new AtomicBoolean(false);
AssertingSearcher(IndexSearcher indexSearcher, final Engine.Searcher wrappedSearcher,
ShardId shardId,
Logger logger) {
super(wrappedSearcher.source(), indexSearcher);
AssertingSearcher(IndexSearcher indexSearcher, final Engine.Searcher wrappedSearcher, ShardId shardId, Logger logger) {
super(wrappedSearcher.source(), indexSearcher, s -> {throw new AssertionError();}, logger);
// we only use the given index searcher here instead of the IS of the wrapped searcher. the IS might be a wrapped searcher
// with a wrapped reader.
this.wrappedSearcher = wrappedSearcher;
@ -52,11 +50,6 @@ class AssertingSearcher extends Engine.Searcher {
"IndexReader#getRefCount() was [" + initialRefCount + "] expected a value > [0] - reader is already closed";
}
@Override
public String source() {
return wrappedSearcher.source();
}
@Override
public void close() {
synchronized (lock) {