[CORE] Move as much as possible into abstract Engine
This paves the way for more shared code between the `InternalEngine` and `ShadowEngine` by way of the abstract `Engine` class. No actual functionality has been changed.
@ -26,30 +26,36 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.join.BitDocIdSetFilter;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Accountables;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import java.io.Closeable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -59,8 +65,15 @@ import java.util.concurrent.locks.ReentrantLock;
public abstract class Engine implements Closeable {
protected final ShardId shardId;
protected final ESLogger logger;
protected final EngineConfig engineConfig;
protected final Store store;
protected final AtomicBoolean isClosed = new AtomicBoolean(false);
protected final FailedEngineListener failedEngineListener;
protected final SnapshotDeletionPolicy deletionPolicy;
protected volatile Throwable failedEngine = null;
protected Engine(EngineConfig engineConfig) {
Preconditions.checkNotNull(engineConfig.getStore(), "Store must be provided to the engine");
@ -68,7 +81,11 @@ public abstract class Engine implements Closeable {
Preconditions.checkNotNull(engineConfig.getTranslog(), "Translog must be provided to the engine");
this.engineConfig = engineConfig;
this.shardId = engineConfig.getShardId();
this.store = engineConfig.getStore();
this.logger = Loggers.getLogger(getClass(), engineConfig.getIndexSettings(), engineConfig.getShardId());
this.failedEngineListener = engineConfig.getFailedEngineListener();
this.deletionPolicy = engineConfig.getDeletionPolicy();
/** Returns 0 in the case where accountable is null, otherwise returns {@code ramBytesUsed()} */
@ -107,7 +124,7 @@ public abstract class Engine implements Closeable {
protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) {
return new EngineSearcher(source, searcher, manager, engineConfig.getStore(), logger);
return new EngineSearcher(source, searcher, manager, store, logger);
public final EngineConfig config() {
@ -181,6 +198,35 @@ public abstract class Engine implements Closeable {
public abstract void delete(DeleteByQuery delete) throws EngineException;
final protected GetResult getFromSearcher(Get get) throws EngineException {
final Searcher searcher = acquireSearcher("get");
final Versions.DocIdAndVersion docIdAndVersion;
try {
docIdAndVersion = Versions.loadDocIdAndVersion(searcher.reader(), get.uid());
} catch (Throwable e) {
//TODO: A better exception goes here
throw new EngineException(shardId, "Couldn't resolve version", e);
if (docIdAndVersion != null) {
if (get.versionType().isVersionConflictForReads(docIdAndVersion.version, get.version())) {
Uid uid = Uid.createUid(get.uid().text());
throw new VersionConflictEngineException(shardId, uid.type(), uid.id(), docIdAndVersion.version, get.version());
if (docIdAndVersion != null) {
// don't release the searcher on this path, it is the
// responsibility of the caller to call GetResult.release
return new GetResult(searcher, docIdAndVersion);
} else {
return GetResult.NOT_EXISTS;
public abstract GetResult get(Get get) throws EngineException;
@ -190,22 +236,144 @@ public abstract class Engine implements Closeable {
* @see Searcher#close()
public abstract Searcher acquireSearcher(String source) throws EngineException;
public final Searcher acquireSearcher(String source) throws EngineException {
boolean success = false;
/* Acquire order here is store -> manager since we need
* to make sure that the store is not closed before
* the searcher is acquired. */
try {
final SearcherManager manager = getSearcherManager(); // can never be null
/* This might throw NPE but that's fine we will run ensureOpen()
* in the catch block and throw the right exception */
final IndexSearcher searcher = manager.acquire();
try {
final Searcher retVal = newSearcher(source, searcher, manager);
success = true;
return retVal;
} finally {
if (!success) {
} catch (EngineClosedException ex) {
throw ex;
} catch (Throwable ex) {
ensureOpen(); // throw EngineCloseException here if we are already closed
logger.error("failed to acquire searcher, source {}", ex, source);
throw new EngineException(shardId, "failed to acquire searcher, source " + source, ex);
} finally {
if (!success) { // release the ref in the case of an error...
protected void ensureOpen() {
if (isClosed.get()) {
throw new EngineClosedException(shardId, failedEngine);
* Global stats on segments.
public abstract SegmentsStats segmentsStats();
protected Segment[] getSegmentInfo(SegmentInfos lastCommittedSegmentInfos, boolean verbose) {
Map<String, Segment> segments = new HashMap<>();
// first, go over and compute the search ones...
Searcher searcher = acquireSearcher("segments");
try {
for (LeafReaderContext reader : searcher.reader().leaves()) {
SegmentCommitInfo info = segmentReader(reader.reader()).getSegmentInfo();
assert !segments.containsKey(info.info.name);
Segment segment = new Segment(info.info.name);
segment.search = true;
segment.docCount = reader.reader().numDocs();
segment.delDocCount = reader.reader().numDeletedDocs();
segment.version = info.info.getVersion();
segment.compound = info.info.getUseCompoundFile();
try {
segment.sizeInBytes = info.sizeInBytes();
} catch (IOException e) {
logger.trace("failed to get size for [{}]", e, info.info.name);
final SegmentReader segmentReader = segmentReader(reader.reader());
segment.memoryInBytes = segmentReader.ramBytesUsed();
if (verbose) {
segment.ramTree = Accountables.namedAccountable("root", segmentReader);
// TODO: add more fine grained mem stats values to per segment info here
segments.put(info.info.name, segment);
} finally {
// now, correlate or add the committed ones...
if (lastCommittedSegmentInfos != null) {
SegmentInfos infos = lastCommittedSegmentInfos;
for (SegmentCommitInfo info : infos) {
Segment segment = segments.get(info.info.name);
if (segment == null) {
segment = new Segment(info.info.name);
segment.search = false;
segment.committed = true;
segment.docCount = info.info.getDocCount();
segment.delDocCount = info.getDelCount();
segment.version = info.info.getVersion();
segment.compound = info.info.getUseCompoundFile();
try {
segment.sizeInBytes = info.sizeInBytes();
} catch (IOException e) {
logger.trace("failed to get size for [{}]", e, info.info.name);
segments.put(info.info.name, segment);
} else {
segment.committed = true;
Segment[] segmentsArr = segments.values().toArray(new Segment[segments.values().size()]);
Arrays.sort(segmentsArr, new Comparator<Segment>() {
public int compare(Segment o1, Segment o2) {
return (int) (o1.getGeneration() - o2.getGeneration());
return segmentsArr;
* The list of segments in the engine.
public abstract List<Segment> segments(boolean verbose);
* Returns <tt>true</tt> if a refresh is really needed.
public final boolean refreshNeeded() {
if (store.tryIncRef()) {
we need to inc the store here since searcherManager.isSearcherCurrent()
acquires a searcher internally and that might keep a file open on the
store. this violates the assumption that all files are closed when
the store is closed so we need to make sure we increment it here
public abstract boolean refreshNeeded();
try {
return !getSearcherManager().isSearcherCurrent();
} catch (IOException e) {
logger.error("failed to access searcher manager", e);
failEngine("failed to access searcher manager", e);
throw new EngineException(shardId, "failed to access searcher manager", e);
} finally {
return false;
* Refreshes the engine for new search operations to reflect the latest
@ -250,6 +418,34 @@ public abstract class Engine implements Closeable {
/** fail engine due to some error. the engine will also be closed. */
public abstract void failEngine(String reason, Throwable failure);
/** Check whether the engine should be failed */
protected boolean maybeFailEngine(String source, Throwable t) {
if (Lucene.isCorruptionException(t)) {
if (engineConfig.isFailEngineOnCorruption()) {
failEngine("corrupt file detected source: [" + source + "]", t);
return true;
} else {
logger.warn("corrupt file detected source: [{}] but [{}] is set to [{}]", t, source,
EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, engineConfig.isFailEngineOnCorruption());
} else if (ExceptionsHelper.isOOM(t)) {
failEngine("out of memory", t);
return true;
return false;
/** Wrap a Throwable in an {@code EngineClosedException} if the engine is already closed */
protected Throwable wrapIfClosed(Throwable t) {
if (isClosed.get()) {
if (t != failedEngine && failedEngine != null) {
return new EngineClosedException(shardId, t);
return t;
public static interface FailedEngineListener {
void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t);
@ -765,4 +961,6 @@ public abstract class Engine implements Closeable {
protected abstract SearcherManager getSearcherManager();
@ -25,7 +25,6 @@ import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
import org.apache.lucene.search.*;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.Accountables;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
@ -34,18 +33,13 @@ import org.elasticsearch.cluster.routing.DjbHashFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.math.MathUtils;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.Uid;
@ -54,8 +48,6 @@ import org.elasticsearch.index.merge.policy.ElasticsearchMergePolicy;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.search.nested.IncludeNestedDocsQuery;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.threadpool.ThreadPool;
@ -75,7 +67,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class InternalEngine extends Engine {
protected final ShardId shardId;
private final FailEngineOnMergeFailure mergeSchedulerFailureListener;
private final MergeSchedulerListener mergeSchedulerListener;
@ -85,8 +76,6 @@ public class InternalEngine extends Engine {
private final ShardIndexingService indexingService;
private final IndicesWarmer warmer;
private final Store store;
private final SnapshotDeletionPolicy deletionPolicy;
private final Translog translog;
private final MergePolicyProvider mergePolicyProvider;
private final MergeSchedulerProvider mergeScheduler;
@ -100,7 +89,6 @@ public class InternalEngine extends Engine {
private final SearcherFactory searcherFactory;
private final SearcherManager searcherManager;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final AtomicBoolean optimizeMutex = new AtomicBoolean();
// we use flushNeeded here, since if there are no changes, then the commit won't write
// will not really happen, and then the commitUserData and the new translog will not be reflected
@ -113,9 +101,7 @@ public class InternalEngine extends Engine {
private final LiveVersionMap versionMap;
private final Object[] dirtyLocks;
private volatile Throwable failedEngine = null;
private final ReentrantLock failEngineLock = new ReentrantLock();
private final FailedEngineListener failedEngineListener;
private final AtomicLong translogIdGenerator = new AtomicLong();
private final AtomicBoolean versionMapRefreshPending = new AtomicBoolean();
@ -126,8 +112,6 @@ public class InternalEngine extends Engine {
public InternalEngine(EngineConfig engineConfig) throws EngineException {
this.store = engineConfig.getStore();
this.shardId = engineConfig.getShardId();
this.versionMap = new LiveVersionMap();
IndexWriter writer = null;
@ -138,7 +122,6 @@ public class InternalEngine extends Engine {
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
this.indexingService = engineConfig.getIndexingService();
this.warmer = engineConfig.getWarmer();
this.deletionPolicy = engineConfig.getDeletionPolicy();
this.translog = engineConfig.getTranslog();
this.mergePolicyProvider = engineConfig.getMergePolicyProvider();
this.mergeScheduler = engineConfig.getMergeScheduler();
@ -147,7 +130,6 @@ public class InternalEngine extends Engine {
dirtyLocks[i] = new Object();
this.failedEngineListener = engineConfig.getFailedEngineListener();
throttle = new IndexThrottle();
this.searcherFactory = new SearchFactory(engineConfig);
try {
@ -251,31 +233,7 @@ public class InternalEngine extends Engine {
// no version, get the version from the index, we know that we refresh on flush
final Searcher searcher = acquireSearcher("get");
final Versions.DocIdAndVersion docIdAndVersion;
try {
docIdAndVersion = Versions.loadDocIdAndVersion(searcher.reader(), get.uid());
} catch (Throwable e) {
//TODO: A better exception goes here
throw new EngineException(shardId, "Couldn't resolve version", e);
if (docIdAndVersion != null) {
if (get.versionType().isVersionConflictForReads(docIdAndVersion.version, get.version())) {
Uid uid = Uid.createUid(get.uid().text());
throw new VersionConflictEngineException(shardId, uid.type(), uid.id(), docIdAndVersion.version, get.version());
if (docIdAndVersion != null) {
// don't release the searcher on this path, it is the responsability of the caller to call GetResult.release
return new GetResult(searcher, docIdAndVersion);
} else {
return GetResult.NOT_EXISTS;
return getFromSearcher(get);
@ -579,63 +537,6 @@ public class InternalEngine extends Engine {
public final Searcher acquireSearcher(String source) throws EngineException {
boolean success = false;
/* Acquire order here is store -> manager since we need
* to make sure that the store is not closed before
* the searcher is acquired. */
try {
final SearcherManager manager = this.searcherManager; // can never be null
assert manager != null : "SearcherManager is null";
/* This might throw NPE but that's fine we will run ensureOpen()
* in the catch block and throw the right exception */
final IndexSearcher searcher = manager.acquire();
try {
final Searcher retVal = newSearcher(source, searcher, manager);
success = true;
return retVal;
} finally {
if (!success) {
} catch (EngineClosedException ex) {
throw ex;
} catch (Throwable ex) {
ensureOpen(); // throw EngineCloseException here if we are already closed
logger.error("failed to acquire searcher, source {}", ex, source);
throw new EngineException(shardId, "failed to acquire searcher, source " + source, ex);
} finally {
if (!success) { // release the ref in the case of an error...
public boolean refreshNeeded() {
if (store.tryIncRef()) {
we need to inc the store here since searcherManager.isSearcherCurrent()
acquires a searcher internally and that might keep a file open on the
store. this violates the assumption that all files are closed when
the store is closed so we need to make sure we increment it here
try {
return !searcherManager.isSearcherCurrent();
} catch (IOException e) {
logger.error("failed to access searcher manager", e);
failEngine("failed to access searcher manager", e);
throw new EngineException(shardId, "failed to access searcher manager", e);
} finally {
return false;
public void refresh(String source) throws EngineException {
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
@ -770,12 +671,6 @@ public class InternalEngine extends Engine {
private void ensureOpen() {
if (isClosed.get()) {
throw new EngineClosedException(shardId, failedEngine);
private void pruneDeletedTombstones() {
long timeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
@ -858,7 +753,6 @@ public class InternalEngine extends Engine {
waitForMerges(flush, upgrade);
public SnapshotIndexCommit snapshotIndex() throws EngineException {
// we have to flush outside of the readlock otherwise we might have a problem upgrading
@ -931,18 +825,15 @@ public class InternalEngine extends Engine {
private boolean maybeFailEngine(String source, Throwable t) {
if (Lucene.isCorruptionException(t)) {
if (engineConfig.isFailEngineOnCorruption()) {
failEngine("corrupt file detected source: [" + source + "]", t);
protected boolean maybeFailEngine(String source, Throwable t) {
boolean shouldFail = super.maybeFailEngine(source, t);
if (shouldFail) {
return true;
} else {
logger.warn("corrupt file detected source: [{}] but [{}] is set to [{}]", t, source, EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, engineConfig.isFailEngineOnCorruption());
} else if (ExceptionsHelper.isOOM(t)) {
failEngine("out of memory", t);
return true;
} else if (t instanceof AlreadyClosedException) {
// Check for AlreadyClosedException
if (t instanceof AlreadyClosedException) {
// if we are already closed due to some tragic exception
// we need to fail the engine. it might have already been failed before
// but we are double-checking it's failed and closed
@ -959,16 +850,6 @@ public class InternalEngine extends Engine {
return false;
private Throwable wrapIfClosed(Throwable t) {
if (isClosed.get()) {
if (t != failedEngine && failedEngine != null) {
return new EngineClosedException(shardId, t);
return t;
public SegmentsStats segmentsStats() {
@ -993,70 +874,7 @@ public class InternalEngine extends Engine {
public List<Segment> segments(boolean verbose) {
try (ReleasableLock _ = readLock.acquire()) {
Map<String, Segment> segments = new HashMap<>();
// first, go over and compute the search ones...
Searcher searcher = acquireSearcher("segments");
try {
for (LeafReaderContext reader : searcher.reader().leaves()) {
SegmentCommitInfo info = segmentReader(reader.reader()).getSegmentInfo();
assert !segments.containsKey(info.info.name);
Segment segment = new Segment(info.info.name);
segment.search = true;
segment.docCount = reader.reader().numDocs();
segment.delDocCount = reader.reader().numDeletedDocs();
segment.version = info.info.getVersion();
segment.compound = info.info.getUseCompoundFile();
try {
segment.sizeInBytes = info.sizeInBytes();
} catch (IOException e) {
logger.trace("failed to get size for [{}]", e, info.info.name);
final SegmentReader segmentReader = segmentReader(reader.reader());
segment.memoryInBytes = segmentReader.ramBytesUsed();
if (verbose) {
segment.ramTree = Accountables.namedAccountable("root", segmentReader);
// TODO: add more fine grained mem stats values to per segment info here
segments.put(info.info.name, segment);
} finally {
// now, correlate or add the committed ones...
if (lastCommittedSegmentInfos != null) {
SegmentInfos infos = lastCommittedSegmentInfos;
for (SegmentCommitInfo info : infos) {
Segment segment = segments.get(info.info.name);
if (segment == null) {
segment = new Segment(info.info.name);
segment.search = false;
segment.committed = true;
segment.docCount = info.info.getDocCount();
segment.delDocCount = info.getDelCount();
segment.version = info.info.getVersion();
segment.compound = info.info.getUseCompoundFile();
try {
segment.sizeInBytes = info.sizeInBytes();
} catch (IOException e) {
logger.trace("failed to get size for [{}]", e, info.info.name);
segments.put(info.info.name, segment);
} else {
segment.committed = true;
Segment[] segmentsArr = segments.values().toArray(new Segment[segments.values().size()]);
Arrays.sort(segmentsArr, new Comparator<Segment>() {
public int compare(Segment o1, Segment o2) {
return (int) (o1.getGeneration() - o2.getGeneration());
Segment[] segmentsArr = getSegmentInfo(lastCommittedSegmentInfos, verbose);
// fill in the merges flag
Set<OnGoingMerge> onGoingMerges = mergeScheduler.onGoingMerges();
@ -1070,7 +888,6 @@ public class InternalEngine extends Engine {
return Arrays.asList(segmentsArr);
@ -1162,6 +979,11 @@ public class InternalEngine extends Engine {
protected SearcherManager getSearcherManager() {
return searcherManager;
private Object dirtyLock(BytesRef uid) {
int hash = DjbHashFunction.DJB_HASH(uid.bytes, uid.offset, uid.length);
return dirtyLocks[MathUtils.mod(hash, dirtyLocks.length)];
