Simplify Engine.Searcher creation (#28728)
Today we have several levels of indirection to acquire an Engine.Searcher. We first acquire a the reference manager for the scope then acquire an IndexSearcher and then create a searcher for the engine based on that. This change simplifies the creation into a single method call instead of 3 different ones.
This commit is contained in:
parent
13a8ba4740
commit
779bc6fd5c
|
@ -34,7 +34,6 @@ import org.apache.lucene.index.SegmentInfos;
|
|||
import org.apache.lucene.index.SegmentReader;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.ReferenceManager;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
|
@ -156,10 +155,6 @@ public abstract class Engine implements Closeable {
|
|||
return IndexWriter.SOURCE_MERGE.equals(source);
|
||||
}
|
||||
|
||||
protected Searcher newSearcher(String source, IndexSearcher searcher, ReferenceManager<IndexSearcher> manager) {
|
||||
return new EngineSearcher(source, searcher, manager, store, logger);
|
||||
}
|
||||
|
||||
public final EngineConfig config() {
|
||||
return engineConfig;
|
||||
}
|
||||
|
@ -510,38 +505,7 @@ public abstract class Engine implements Closeable {
|
|||
*
|
||||
* @see Searcher#close()
|
||||
*/
|
||||
public final Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException {
|
||||
boolean success = false;
|
||||
/* Acquire order here is store -> manager since we need
|
||||
* to make sure that the store is not closed before
|
||||
* the searcher is acquired. */
|
||||
store.incRef();
|
||||
try {
|
||||
final ReferenceManager<IndexSearcher> manager = getSearcherManager(source, scope); // can never be null
|
||||
/* This might throw NPE but that's fine we will run ensureOpen()
|
||||
* in the catch block and throw the right exception */
|
||||
final IndexSearcher searcher = manager.acquire();
|
||||
try {
|
||||
final Searcher retVal = newSearcher(source, searcher, manager);
|
||||
success = true;
|
||||
return retVal;
|
||||
} finally {
|
||||
if (!success) {
|
||||
manager.release(searcher);
|
||||
}
|
||||
}
|
||||
} catch (AlreadyClosedException ex) {
|
||||
throw ex;
|
||||
} catch (Exception ex) {
|
||||
ensureOpen(); // throw EngineCloseException here if we are already closed
|
||||
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to acquire searcher, source {}", source), ex);
|
||||
throw new EngineException(shardId, "failed to acquire searcher, source " + source, ex);
|
||||
} finally {
|
||||
if (!success) { // release the ref in the case of an error...
|
||||
store.decRef();
|
||||
}
|
||||
}
|
||||
}
|
||||
public abstract Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException;
|
||||
|
||||
public enum SearcherScope {
|
||||
EXTERNAL, INTERNAL
|
||||
|
@ -557,10 +521,18 @@ public abstract class Engine implements Closeable {
|
|||
|
||||
public abstract void syncTranslog() throws IOException;
|
||||
|
||||
protected void ensureOpen() {
|
||||
protected final void ensureOpen(Exception suppressed) {
|
||||
if (isClosed.get()) {
|
||||
throw new AlreadyClosedException(shardId + " engine is closed", failedEngine.get());
|
||||
AlreadyClosedException ace = new AlreadyClosedException(shardId + " engine is closed", failedEngine.get());
|
||||
if (suppressed != null) {
|
||||
ace.addSuppressed(suppressed);
|
||||
}
|
||||
throw ace;
|
||||
}
|
||||
}
|
||||
|
||||
protected final void ensureOpen() {
|
||||
ensureOpen(null);
|
||||
}
|
||||
|
||||
/** get commits stats for the last commit */
|
||||
|
@ -785,13 +757,8 @@ public abstract class Engine implements Closeable {
|
|||
the store is closed so we need to make sure we increment it here
|
||||
*/
|
||||
try {
|
||||
ReferenceManager<IndexSearcher> manager = getSearcherManager("refresh_needed", SearcherScope.EXTERNAL);
|
||||
final IndexSearcher searcher = manager.acquire();
|
||||
try {
|
||||
final IndexReader r = searcher.getIndexReader();
|
||||
return ((DirectoryReader) r).isCurrent() == false;
|
||||
} finally {
|
||||
manager.release(searcher);
|
||||
try (Searcher searcher = acquireSearcher("refresh_needed", SearcherScope.EXTERNAL)) {
|
||||
return searcher.getDirectoryReader().isCurrent() == false;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.error("failed to access searcher manager", e);
|
||||
|
@ -1341,8 +1308,6 @@ public abstract class Engine implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
protected abstract ReferenceManager<IndexSearcher> getSearcherManager(String source, SearcherScope scope);
|
||||
|
||||
/**
|
||||
* Method to close the engine while the write lock is held.
|
||||
* Must decrement the supplied when closing work is done and resources are
|
||||
|
|
|
@ -26,23 +26,24 @@ import org.apache.lucene.search.SearcherManager;
|
|||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* Searcher for an Engine
|
||||
*/
|
||||
public class EngineSearcher extends Engine.Searcher {
|
||||
private final ReferenceManager<IndexSearcher> manager;
|
||||
final class EngineSearcher extends Engine.Searcher {
|
||||
private final AtomicBoolean released = new AtomicBoolean(false);
|
||||
private final Store store;
|
||||
private final Logger logger;
|
||||
private final ReferenceManager<IndexSearcher> referenceManager;
|
||||
|
||||
public EngineSearcher(String source, IndexSearcher searcher, ReferenceManager<IndexSearcher> manager, Store store, Logger logger) {
|
||||
super(source, searcher);
|
||||
this.manager = manager;
|
||||
EngineSearcher(String source, ReferenceManager<IndexSearcher> searcherReferenceManager, Store store, Logger logger) throws IOException {
|
||||
super(source, searcherReferenceManager.acquire());
|
||||
this.store = store;
|
||||
this.logger = logger;
|
||||
this.referenceManager = searcherReferenceManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -56,7 +57,7 @@ public class EngineSearcher extends Engine.Searcher {
|
|||
return;
|
||||
}
|
||||
try {
|
||||
manager.release(this.searcher());
|
||||
referenceManager.release(searcher());
|
||||
} catch (IOException e) {
|
||||
throw new IllegalStateException("Cannot close", e);
|
||||
} catch (AlreadyClosedException e) {
|
||||
|
|
|
@ -20,9 +20,10 @@
|
|||
package org.elasticsearch.index.engine;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.apache.lucene.index.IndexFormatTooOldException;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.index.IndexWriterConfig;
|
||||
|
@ -50,6 +51,7 @@ import org.elasticsearch.common.Nullable;
|
|||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.lucene.LoggerInfoStream;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
|
||||
|
@ -1683,7 +1685,7 @@ public class InternalEngine extends Engine {
|
|||
* and expected. We don't hold any locks while we block on forceMerge otherwise it would block
|
||||
* closing the engine as well. If we are not closed we pass it on to failOnTragicEvent which ensures
|
||||
* we are handling a tragic even exception here */
|
||||
ensureOpen();
|
||||
ensureOpen(ex);
|
||||
failOnTragicEvent(ex);
|
||||
throw ex;
|
||||
} catch (Exception e) {
|
||||
|
@ -1859,15 +1861,36 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected ReferenceManager<IndexSearcher> getSearcherManager(String source, SearcherScope scope) {
|
||||
public Searcher acquireSearcher(String source, SearcherScope scope) {
|
||||
/* Acquire order here is store -> manager since we need
|
||||
* to make sure that the store is not closed before
|
||||
* the searcher is acquired. */
|
||||
store.incRef();
|
||||
Releasable releasable = store::decRef;
|
||||
try {
|
||||
final ReferenceManager<IndexSearcher> referenceManager;
|
||||
switch (scope) {
|
||||
case INTERNAL:
|
||||
return internalSearcherManager;
|
||||
referenceManager = internalSearcherManager;
|
||||
break;
|
||||
case EXTERNAL:
|
||||
return externalSearcherManager;
|
||||
referenceManager = externalSearcherManager;
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("unknown scope: " + scope);
|
||||
}
|
||||
EngineSearcher engineSearcher = new EngineSearcher(source, referenceManager, store, logger);
|
||||
releasable = null; // success - hand over the reference to the engine searcher
|
||||
return engineSearcher;
|
||||
} catch (AlreadyClosedException ex) {
|
||||
throw ex;
|
||||
} catch (Exception ex) {
|
||||
ensureOpen(ex); // throw EngineCloseException here if we are already closed
|
||||
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to acquire searcher, source {}", source), ex);
|
||||
throw new EngineException(shardId, "failed to acquire searcher, source " + source, ex);
|
||||
} finally {
|
||||
Releasables.close(releasable);
|
||||
}
|
||||
}
|
||||
|
||||
private long loadCurrentVersionFromIndex(Term uid) throws IOException {
|
||||
|
|
|
@ -133,9 +133,8 @@ public final class MockEngineSupport {
|
|||
}
|
||||
}
|
||||
|
||||
public AssertingIndexSearcher newSearcher(String source, IndexSearcher searcher,
|
||||
ReferenceManager<IndexSearcher> manager) throws EngineException {
|
||||
IndexReader reader = searcher.getIndexReader();
|
||||
public AssertingIndexSearcher newSearcher(Engine.Searcher searcher) throws EngineException {
|
||||
IndexReader reader = searcher.reader();
|
||||
IndexReader wrappedReader = reader;
|
||||
assert reader != null;
|
||||
if (reader instanceof DirectoryReader && mockContext.wrapReader) {
|
||||
|
@ -143,7 +142,7 @@ public final class MockEngineSupport {
|
|||
}
|
||||
// this executes basic query checks and asserts that weights are normalized only once etc.
|
||||
final AssertingIndexSearcher assertingIndexSearcher = new AssertingIndexSearcher(mockContext.random, wrappedReader);
|
||||
assertingIndexSearcher.setSimilarity(searcher.getSimilarity(true));
|
||||
assertingIndexSearcher.setSimilarity(searcher.searcher().getSimilarity(true));
|
||||
assertingIndexSearcher.setQueryCache(filterCache);
|
||||
assertingIndexSearcher.setQueryCachingPolicy(filterCachingPolicy);
|
||||
return assertingIndexSearcher;
|
||||
|
@ -183,10 +182,9 @@ public final class MockEngineSupport {
|
|||
|
||||
}
|
||||
|
||||
public Engine.Searcher wrapSearcher(String source, Engine.Searcher engineSearcher, IndexSearcher searcher,
|
||||
ReferenceManager<IndexSearcher> manager) {
|
||||
final AssertingIndexSearcher assertingIndexSearcher = newSearcher(source, searcher, manager);
|
||||
assertingIndexSearcher.setSimilarity(searcher.getSimilarity(true));
|
||||
public Engine.Searcher wrapSearcher(String source, Engine.Searcher engineSearcher) {
|
||||
final AssertingIndexSearcher assertingIndexSearcher = newSearcher(engineSearcher);
|
||||
assertingIndexSearcher.setSimilarity(engineSearcher.searcher().getSimilarity(true));
|
||||
// pass the original searcher to the super.newSearcher() method to make sure this is the searcher that will
|
||||
// be released later on. If we wrap an index reader here must not pass the wrapped version to the manager
|
||||
// on release otherwise the reader will be closed too early. - good news, stuff will fail all over the place if we don't get this right here
|
||||
|
|
|
@ -19,9 +19,7 @@
|
|||
package org.elasticsearch.test.engine;
|
||||
|
||||
import org.apache.lucene.index.FilterDirectoryReader;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.ReferenceManager;
|
||||
import org.apache.lucene.search.SearcherManager;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.EngineConfig;
|
||||
import org.elasticsearch.index.engine.EngineException;
|
||||
import org.elasticsearch.index.engine.InternalEngine;
|
||||
|
@ -79,8 +77,8 @@ final class MockInternalEngine extends InternalEngine {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Searcher newSearcher(String source, IndexSearcher searcher, ReferenceManager<IndexSearcher> manager) throws EngineException {
|
||||
final Searcher engineSearcher = super.newSearcher(source, searcher, manager);
|
||||
return support().wrapSearcher(source, engineSearcher, searcher, manager);
|
||||
public Searcher acquireSearcher(String source, SearcherScope scope) {
|
||||
final Searcher engineSearcher = super.acquireSearcher(source, scope);
|
||||
return support().wrapSearcher(source, engineSearcher);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue