Move up acquireSearcher logic to Engine (#33453)

By moving the logic to acquire the searcher up to the engine
it's simpler to build new engines that are for instance read-only.
This commit is contained in:
Simon Willnauer 2018-09-06 18:48:05 +02:00 committed by GitHub
parent 8ce4ceb59e
commit c6c456e8cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 40 additions and 48 deletions

View File

@ -33,6 +33,7 @@ import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader; import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.Term; import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IOContext;
@ -569,7 +570,31 @@ public abstract class Engine implements Closeable {
* *
* @see Searcher#close() * @see Searcher#close()
*/ */
public abstract Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException; public Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException {
/* Acquire order here is store -> manager since we need
* to make sure that the store is not closed before
* the searcher is acquired. */
if (store.tryIncRef() == false) {
throw new AlreadyClosedException(shardId + " store is closed", failedEngine.get());
}
Releasable releasable = store::decRef;
try {
EngineSearcher engineSearcher = new EngineSearcher(source, getReferenceManager(scope), store, logger);
releasable = null; // success - hand over the reference to the engine searcher
return engineSearcher;
} catch (AlreadyClosedException ex) {
throw ex;
} catch (Exception ex) {
maybeFailEngine("acquire_searcher", ex);
ensureOpen(ex); // throw EngineCloseException here if we are already closed
logger.error(() -> new ParameterizedMessage("failed to acquire searcher, source {}", source), ex);
throw new EngineException(shardId, "failed to acquire searcher, source " + source, ex);
} finally {
Releasables.close(releasable);
}
}
protected abstract ReferenceManager<IndexSearcher> getReferenceManager(SearcherScope scope);
public enum SearcherScope { public enum SearcherScope {
EXTERNAL, INTERNAL EXTERNAL, INTERNAL

View File

@ -20,7 +20,6 @@
package org.elasticsearch.index.engine; package org.elasticsearch.index.engine;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.document.Field; import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.DirectoryReader;
@ -52,7 +51,6 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.LoggerInfoStream; import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
@ -1447,19 +1445,11 @@ public class InternalEngine extends Engine {
if (store.tryIncRef()) { if (store.tryIncRef()) {
// increment the ref just to ensure nobody closes the store during a refresh // increment the ref just to ensure nobody closes the store during a refresh
try { try {
switch (scope) {
case EXTERNAL:
// even though we maintain 2 managers we really do the heavy-lifting only once. // even though we maintain 2 managers we really do the heavy-lifting only once.
// the second refresh will only do the extra work we have to do for warming caches etc. // the second refresh will only do the extra work we have to do for warming caches etc.
externalSearcherManager.maybeRefreshBlocking(); ReferenceManager<IndexSearcher> referenceManager = getReferenceManager(scope);
// the break here is intentional we never refresh both internal / external together // it is intentional that we never refresh both internal / external together
break; referenceManager.maybeRefreshBlocking();
case INTERNAL:
internalSearcherManager.maybeRefreshBlocking();
break;
default:
throw new IllegalArgumentException("unknown scope: " + scope);
}
} finally { } finally {
store.decRef(); store.decRef();
} }
@ -2010,38 +2000,15 @@ public class InternalEngine extends Engine {
} }
@Override @Override
public Searcher acquireSearcher(String source, SearcherScope scope) { protected final ReferenceManager<IndexSearcher> getReferenceManager(SearcherScope scope) {
/* Acquire order here is store -> manager since we need
* to make sure that the store is not closed before
* the searcher is acquired. */
if (store.tryIncRef() == false) {
throw new AlreadyClosedException(shardId + " store is closed", failedEngine.get());
}
Releasable releasable = store::decRef;
try {
final ReferenceManager<IndexSearcher> referenceManager;
switch (scope) { switch (scope) {
case INTERNAL: case INTERNAL:
referenceManager = internalSearcherManager; return internalSearcherManager;
break;
case EXTERNAL: case EXTERNAL:
referenceManager = externalSearcherManager; return externalSearcherManager;
break;
default: default:
throw new IllegalStateException("unknown scope: " + scope); throw new IllegalStateException("unknown scope: " + scope);
} }
EngineSearcher engineSearcher = new EngineSearcher(source, referenceManager, store, logger);
releasable = null; // success - hand over the reference to the engine searcher
return engineSearcher;
} catch (AlreadyClosedException ex) {
throw ex;
} catch (Exception ex) {
ensureOpen(ex); // throw EngineCloseException here if we are already closed
logger.error(() -> new ParameterizedMessage("failed to acquire searcher, source {}", source), ex);
throw new EngineException(shardId, "failed to acquire searcher, source " + source, ex);
} finally {
Releasables.close(releasable);
}
} }
private long loadCurrentVersionFromIndex(Term uid) throws IOException { private long loadCurrentVersionFromIndex(Term uid) throws IOException {

View File

@ -1337,7 +1337,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
} }
private void failStoreIfCorrupted(Exception e) { private void failStoreIfCorrupted(Exception e) {
if (e instanceof CorruptIndexException || e instanceof IndexFormatTooOldException || e instanceof IndexFormatTooNewException) { if (Lucene.isCorruptionException(e)) {
try { try {
store.markStoreCorrupted((IOException) e); store.markStoreCorrupted((IOException) e);
} catch (IOException inner) { } catch (IOException inner) {