diff --git a/src/main/java/org/elasticsearch/index/engine/Engine.java b/src/main/java/org/elasticsearch/index/engine/Engine.java index 8a7933fc0ac..987ef8c2402 100644 --- a/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -26,30 +26,36 @@ import org.apache.lucene.search.Query; import org.apache.lucene.search.SearcherManager; import org.apache.lucene.search.join.BitDocIdSetFilter; import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.Accountables; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Preconditions; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.uid.Versions; -import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.ParseContext.Document; import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import java.io.Closeable; -import java.util.Arrays; -import java.util.List; -import java.util.Map; +import java.io.IOException; +import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -59,8 +65,15 @@ import java.util.concurrent.locks.ReentrantLock; */ public abstract class Engine implements Closeable { + protected final ShardId shardId; protected final ESLogger logger; protected final EngineConfig engineConfig; + protected final Store store; + protected final AtomicBoolean isClosed = new AtomicBoolean(false); + protected final FailedEngineListener failedEngineListener; + protected final SnapshotDeletionPolicy deletionPolicy; + + protected volatile Throwable failedEngine = null; protected Engine(EngineConfig engineConfig) { Preconditions.checkNotNull(engineConfig.getStore(), "Store must be provided to the engine"); @@ -68,7 +81,11 @@ public abstract class Engine implements Closeable { Preconditions.checkNotNull(engineConfig.getTranslog(), "Translog must be provided to the engine"); this.engineConfig = engineConfig; + this.shardId = engineConfig.getShardId(); + this.store = engineConfig.getStore(); this.logger = Loggers.getLogger(getClass(), engineConfig.getIndexSettings(), engineConfig.getShardId()); + this.failedEngineListener = engineConfig.getFailedEngineListener(); + this.deletionPolicy = engineConfig.getDeletionPolicy(); } /** Returns 0 in the case where accountable is null, otherwise returns {@code ramBytesUsed()} */ @@ -107,7 +124,7 @@ public abstract class Engine implements Closeable { } protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) { - return new EngineSearcher(source, searcher, manager, engineConfig.getStore(), logger); + return new EngineSearcher(source, searcher, manager, store, logger); } public final EngineConfig config() { @@ -181,6 +198,35 @@ public abstract class Engine implements Closeable { public abstract void delete(DeleteByQuery delete) throws EngineException; + final protected GetResult getFromSearcher(Get get) throws EngineException { + final Searcher searcher = acquireSearcher("get"); + final Versions.DocIdAndVersion docIdAndVersion; + try { + docIdAndVersion = Versions.loadDocIdAndVersion(searcher.reader(), get.uid()); + } catch (Throwable e) { + Releasables.closeWhileHandlingException(searcher); + //TODO: A better exception goes here + throw new EngineException(shardId, "Couldn't resolve version", e); + } + + if (docIdAndVersion != null) { + if (get.versionType().isVersionConflictForReads(docIdAndVersion.version, get.version())) { + Releasables.close(searcher); + Uid uid = Uid.createUid(get.uid().text()); + throw new VersionConflictEngineException(shardId, uid.type(), uid.id(), docIdAndVersion.version, get.version()); + } + } + + if (docIdAndVersion != null) { + // don't release the searcher on this path, it is the + // responsibility of the caller to call GetResult.release + return new GetResult(searcher, docIdAndVersion); + } else { + Releasables.close(searcher); + return GetResult.NOT_EXISTS; + } + } + public abstract GetResult get(Get get) throws EngineException; /** @@ -190,22 +236,144 @@ public abstract class Engine implements Closeable { * * @see Searcher#close() */ - public abstract Searcher acquireSearcher(String source) throws EngineException; + public final Searcher acquireSearcher(String source) throws EngineException { + boolean success = false; + /* Acquire order here is store -> manager since we need + * to make sure that the store is not closed before + * the searcher is acquired. */ + store.incRef(); + try { + final SearcherManager manager = getSearcherManager(); // can never be null + /* This might throw NPE but that's fine we will run ensureOpen() + * in the catch block and throw the right exception */ + final IndexSearcher searcher = manager.acquire(); + try { + final Searcher retVal = newSearcher(source, searcher, manager); + success = true; + return retVal; + } finally { + if (!success) { + manager.release(searcher); + } + } + } catch (EngineClosedException ex) { + throw ex; + } catch (Throwable ex) { + ensureOpen(); // throw EngineCloseException here if we are already closed + logger.error("failed to acquire searcher, source {}", ex, source); + throw new EngineException(shardId, "failed to acquire searcher, source " + source, ex); + } finally { + if (!success) { // release the ref in the case of an error... + store.decRef(); + } + } + } + + protected void ensureOpen() { + if (isClosed.get()) { + throw new EngineClosedException(shardId, failedEngine); + } + } /** * Global stats on segments. */ public abstract SegmentsStats segmentsStats(); + protected Segment[] getSegmentInfo(SegmentInfos lastCommittedSegmentInfos, boolean verbose) { + ensureOpen(); + Map segments = new HashMap<>(); + + // first, go over and compute the search ones... + Searcher searcher = acquireSearcher("segments"); + try { + for (LeafReaderContext reader : searcher.reader().leaves()) { + SegmentCommitInfo info = segmentReader(reader.reader()).getSegmentInfo(); + assert !segments.containsKey(info.info.name); + Segment segment = new Segment(info.info.name); + segment.search = true; + segment.docCount = reader.reader().numDocs(); + segment.delDocCount = reader.reader().numDeletedDocs(); + segment.version = info.info.getVersion(); + segment.compound = info.info.getUseCompoundFile(); + try { + segment.sizeInBytes = info.sizeInBytes(); + } catch (IOException e) { + logger.trace("failed to get size for [{}]", e, info.info.name); + } + final SegmentReader segmentReader = segmentReader(reader.reader()); + segment.memoryInBytes = segmentReader.ramBytesUsed(); + if (verbose) { + segment.ramTree = Accountables.namedAccountable("root", segmentReader); + } + // TODO: add more fine grained mem stats values to per segment info here + segments.put(info.info.name, segment); + } + } finally { + searcher.close(); + } + + // now, correlate or add the committed ones... + if (lastCommittedSegmentInfos != null) { + SegmentInfos infos = lastCommittedSegmentInfos; + for (SegmentCommitInfo info : infos) { + Segment segment = segments.get(info.info.name); + if (segment == null) { + segment = new Segment(info.info.name); + segment.search = false; + segment.committed = true; + segment.docCount = info.info.getDocCount(); + segment.delDocCount = info.getDelCount(); + segment.version = info.info.getVersion(); + segment.compound = info.info.getUseCompoundFile(); + try { + segment.sizeInBytes = info.sizeInBytes(); + } catch (IOException e) { + logger.trace("failed to get size for [{}]", e, info.info.name); + } + segments.put(info.info.name, segment); + } else { + segment.committed = true; + } + } + } + + Segment[] segmentsArr = segments.values().toArray(new Segment[segments.values().size()]); + Arrays.sort(segmentsArr, new Comparator() { + @Override + public int compare(Segment o1, Segment o2) { + return (int) (o1.getGeneration() - o2.getGeneration()); + } + }); + + return segmentsArr; + } + /** * The list of segments in the engine. */ public abstract List segments(boolean verbose); - /** - * Returns true if a refresh is really needed. - */ - public abstract boolean refreshNeeded(); + public final boolean refreshNeeded() { + if (store.tryIncRef()) { + /* + we need to inc the store here since searcherManager.isSearcherCurrent() + acquires a searcher internally and that might keep a file open on the + store. this violates the assumption that all files are closed when + the store is closed so we need to make sure we increment it here + */ + try { + return !getSearcherManager().isSearcherCurrent(); + } catch (IOException e) { + logger.error("failed to access searcher manager", e); + failEngine("failed to access searcher manager", e); + throw new EngineException(shardId, "failed to access searcher manager", e); + } finally { + store.decRef(); + } + } + return false; + } /** * Refreshes the engine for new search operations to reflect the latest @@ -250,6 +418,34 @@ public abstract class Engine implements Closeable { /** fail engine due to some error. the engine will also be closed. */ public abstract void failEngine(String reason, Throwable failure); + /** Check whether the engine should be failed */ + protected boolean maybeFailEngine(String source, Throwable t) { + if (Lucene.isCorruptionException(t)) { + if (engineConfig.isFailEngineOnCorruption()) { + failEngine("corrupt file detected source: [" + source + "]", t); + return true; + } else { + logger.warn("corrupt file detected source: [{}] but [{}] is set to [{}]", t, source, + EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, engineConfig.isFailEngineOnCorruption()); + } + } else if (ExceptionsHelper.isOOM(t)) { + failEngine("out of memory", t); + return true; + } + return false; + } + + /** Wrap a Throwable in an {@code EngineClosedException} if the engine is already closed */ + protected Throwable wrapIfClosed(Throwable t) { + if (isClosed.get()) { + if (t != failedEngine && failedEngine != null) { + t.addSuppressed(failedEngine); + } + return new EngineClosedException(shardId, t); + } + return t; + } + public static interface FailedEngineListener { void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t); } @@ -765,4 +961,6 @@ public abstract class Engine implements Closeable { } } } + + protected abstract SearcherManager getSearcherManager(); } diff --git a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index eef06724ecd..3879eae7f29 100644 --- a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -25,7 +25,6 @@ import org.apache.lucene.index.IndexWriter.IndexReaderWarmer; import org.apache.lucene.search.*; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.LockObtainFailedException; -import org.apache.lucene.util.Accountables; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; @@ -34,18 +33,13 @@ import org.elasticsearch.cluster.routing.DjbHashFunction; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.LoggerInfoStream; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.math.MathUtils; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ReleasableLock; -import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.indexing.ShardIndexingService; import org.elasticsearch.index.mapper.Uid; @@ -54,8 +48,6 @@ import org.elasticsearch.index.merge.policy.ElasticsearchMergePolicy; import org.elasticsearch.index.merge.policy.MergePolicyProvider; import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider; import org.elasticsearch.index.search.nested.IncludeNestedDocsQuery; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesWarmer; import org.elasticsearch.threadpool.ThreadPool; @@ -75,7 +67,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; */ public class InternalEngine extends Engine { - protected final ShardId shardId; private final FailEngineOnMergeFailure mergeSchedulerFailureListener; private final MergeSchedulerListener mergeSchedulerListener; @@ -85,8 +76,6 @@ public class InternalEngine extends Engine { private final ShardIndexingService indexingService; @Nullable private final IndicesWarmer warmer; - private final Store store; - private final SnapshotDeletionPolicy deletionPolicy; private final Translog translog; private final MergePolicyProvider mergePolicyProvider; private final MergeSchedulerProvider mergeScheduler; @@ -100,7 +89,6 @@ public class InternalEngine extends Engine { private final SearcherFactory searcherFactory; private final SearcherManager searcherManager; - private final AtomicBoolean isClosed = new AtomicBoolean(false); private final AtomicBoolean optimizeMutex = new AtomicBoolean(); // we use flushNeeded here, since if there are no changes, then the commit won't write // will not really happen, and then the commitUserData and the new translog will not be reflected @@ -113,9 +101,7 @@ public class InternalEngine extends Engine { private final LiveVersionMap versionMap; private final Object[] dirtyLocks; - private volatile Throwable failedEngine = null; private final ReentrantLock failEngineLock = new ReentrantLock(); - private final FailedEngineListener failedEngineListener; private final AtomicLong translogIdGenerator = new AtomicLong(); private final AtomicBoolean versionMapRefreshPending = new AtomicBoolean(); @@ -126,8 +112,6 @@ public class InternalEngine extends Engine { public InternalEngine(EngineConfig engineConfig) throws EngineException { super(engineConfig); - this.store = engineConfig.getStore(); - this.shardId = engineConfig.getShardId(); this.versionMap = new LiveVersionMap(); store.incRef(); IndexWriter writer = null; @@ -138,7 +122,6 @@ public class InternalEngine extends Engine { this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis(); this.indexingService = engineConfig.getIndexingService(); this.warmer = engineConfig.getWarmer(); - this.deletionPolicy = engineConfig.getDeletionPolicy(); this.translog = engineConfig.getTranslog(); this.mergePolicyProvider = engineConfig.getMergePolicyProvider(); this.mergeScheduler = engineConfig.getMergeScheduler(); @@ -147,7 +130,6 @@ public class InternalEngine extends Engine { dirtyLocks[i] = new Object(); } - this.failedEngineListener = engineConfig.getFailedEngineListener(); throttle = new IndexThrottle(); this.searcherFactory = new SearchFactory(engineConfig); try { @@ -251,31 +233,7 @@ public class InternalEngine extends Engine { } // no version, get the version from the index, we know that we refresh on flush - final Searcher searcher = acquireSearcher("get"); - final Versions.DocIdAndVersion docIdAndVersion; - try { - docIdAndVersion = Versions.loadDocIdAndVersion(searcher.reader(), get.uid()); - } catch (Throwable e) { - Releasables.closeWhileHandlingException(searcher); - //TODO: A better exception goes here - throw new EngineException(shardId, "Couldn't resolve version", e); - } - - if (docIdAndVersion != null) { - if (get.versionType().isVersionConflictForReads(docIdAndVersion.version, get.version())) { - Releasables.close(searcher); - Uid uid = Uid.createUid(get.uid().text()); - throw new VersionConflictEngineException(shardId, uid.type(), uid.id(), docIdAndVersion.version, get.version()); - } - } - - if (docIdAndVersion != null) { - // don't release the searcher on this path, it is the responsability of the caller to call GetResult.release - return new GetResult(searcher, docIdAndVersion); - } else { - Releasables.close(searcher); - return GetResult.NOT_EXISTS; - } + return getFromSearcher(get); } } @@ -579,63 +537,6 @@ public class InternalEngine extends Engine { refresh("delete_by_query"); } - @Override - public final Searcher acquireSearcher(String source) throws EngineException { - boolean success = false; - /* Acquire order here is store -> manager since we need - * to make sure that the store is not closed before - * the searcher is acquired. */ - store.incRef(); - try { - final SearcherManager manager = this.searcherManager; // can never be null - assert manager != null : "SearcherManager is null"; - /* This might throw NPE but that's fine we will run ensureOpen() - * in the catch block and throw the right exception */ - final IndexSearcher searcher = manager.acquire(); - try { - final Searcher retVal = newSearcher(source, searcher, manager); - success = true; - return retVal; - } finally { - if (!success) { - manager.release(searcher); - } - } - } catch (EngineClosedException ex) { - throw ex; - } catch (Throwable ex) { - ensureOpen(); // throw EngineCloseException here if we are already closed - logger.error("failed to acquire searcher, source {}", ex, source); - throw new EngineException(shardId, "failed to acquire searcher, source " + source, ex); - } finally { - if (!success) { // release the ref in the case of an error... - store.decRef(); - } - } - } - - @Override - public boolean refreshNeeded() { - if (store.tryIncRef()) { - /* - we need to inc the store here since searcherManager.isSearcherCurrent() - acquires a searcher internally and that might keep a file open on the - store. this violates the assumption that all files are closed when - the store is closed so we need to make sure we increment it here - */ - try { - return !searcherManager.isSearcherCurrent(); - } catch (IOException e) { - logger.error("failed to access searcher manager", e); - failEngine("failed to access searcher manager", e); - throw new EngineException(shardId, "failed to access searcher manager", e); - } finally { - store.decRef(); - } - } - return false; - } - @Override public void refresh(String source) throws EngineException { // we obtain a read lock here, since we don't want a flush to happen while we are refreshing @@ -770,12 +671,6 @@ public class InternalEngine extends Engine { } } - private void ensureOpen() { - if (isClosed.get()) { - throw new EngineClosedException(shardId, failedEngine); - } - } - private void pruneDeletedTombstones() { long timeMSec = engineConfig.getThreadPool().estimatedTimeInMillis(); @@ -858,7 +753,6 @@ public class InternalEngine extends Engine { waitForMerges(flush, upgrade); } - @Override public SnapshotIndexCommit snapshotIndex() throws EngineException { // we have to flush outside of the readlock otherwise we might have a problem upgrading @@ -931,18 +825,15 @@ public class InternalEngine extends Engine { } } - private boolean maybeFailEngine(String source, Throwable t) { - if (Lucene.isCorruptionException(t)) { - if (engineConfig.isFailEngineOnCorruption()) { - failEngine("corrupt file detected source: [" + source + "]", t); - return true; - } else { - logger.warn("corrupt file detected source: [{}] but [{}] is set to [{}]", t, source, EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, engineConfig.isFailEngineOnCorruption()); - } - } else if (ExceptionsHelper.isOOM(t)) { - failEngine("out of memory", t); + @Override + protected boolean maybeFailEngine(String source, Throwable t) { + boolean shouldFail = super.maybeFailEngine(source, t); + if (shouldFail) { return true; - } else if (t instanceof AlreadyClosedException) { + } + + // Check for AlreadyClosedException + if (t instanceof AlreadyClosedException) { // if we are already closed due to some tragic exception // we need to fail the engine. it might have already been failed before // but we are double-checking it's failed and closed @@ -959,16 +850,6 @@ public class InternalEngine extends Engine { return false; } - private Throwable wrapIfClosed(Throwable t) { - if (isClosed.get()) { - if (t != failedEngine && failedEngine != null) { - t.addSuppressed(failedEngine); - } - return new EngineClosedException(shardId, t); - } - return t; - } - @Override public SegmentsStats segmentsStats() { ensureOpen(); @@ -993,70 +874,7 @@ public class InternalEngine extends Engine { @Override public List segments(boolean verbose) { try (ReleasableLock _ = readLock.acquire()) { - ensureOpen(); - Map segments = new HashMap<>(); - - // first, go over and compute the search ones... - Searcher searcher = acquireSearcher("segments"); - try { - for (LeafReaderContext reader : searcher.reader().leaves()) { - SegmentCommitInfo info = segmentReader(reader.reader()).getSegmentInfo(); - assert !segments.containsKey(info.info.name); - Segment segment = new Segment(info.info.name); - segment.search = true; - segment.docCount = reader.reader().numDocs(); - segment.delDocCount = reader.reader().numDeletedDocs(); - segment.version = info.info.getVersion(); - segment.compound = info.info.getUseCompoundFile(); - try { - segment.sizeInBytes = info.sizeInBytes(); - } catch (IOException e) { - logger.trace("failed to get size for [{}]", e, info.info.name); - } - final SegmentReader segmentReader = segmentReader(reader.reader()); - segment.memoryInBytes = segmentReader.ramBytesUsed(); - if (verbose) { - segment.ramTree = Accountables.namedAccountable("root", segmentReader); - } - // TODO: add more fine grained mem stats values to per segment info here - segments.put(info.info.name, segment); - } - } finally { - searcher.close(); - } - - // now, correlate or add the committed ones... - if (lastCommittedSegmentInfos != null) { - SegmentInfos infos = lastCommittedSegmentInfos; - for (SegmentCommitInfo info : infos) { - Segment segment = segments.get(info.info.name); - if (segment == null) { - segment = new Segment(info.info.name); - segment.search = false; - segment.committed = true; - segment.docCount = info.info.getDocCount(); - segment.delDocCount = info.getDelCount(); - segment.version = info.info.getVersion(); - segment.compound = info.info.getUseCompoundFile(); - try { - segment.sizeInBytes = info.sizeInBytes(); - } catch (IOException e) { - logger.trace("failed to get size for [{}]", e, info.info.name); - } - segments.put(info.info.name, segment); - } else { - segment.committed = true; - } - } - } - - Segment[] segmentsArr = segments.values().toArray(new Segment[segments.values().size()]); - Arrays.sort(segmentsArr, new Comparator() { - @Override - public int compare(Segment o1, Segment o2) { - return (int) (o1.getGeneration() - o2.getGeneration()); - } - }); + Segment[] segmentsArr = getSegmentInfo(lastCommittedSegmentInfos, verbose); // fill in the merges flag Set onGoingMerges = mergeScheduler.onGoingMerges(); @@ -1070,7 +888,6 @@ public class InternalEngine extends Engine { } } } - return Arrays.asList(segmentsArr); } } @@ -1162,6 +979,11 @@ public class InternalEngine extends Engine { } } + @Override + protected SearcherManager getSearcherManager() { + return searcherManager; + } + private Object dirtyLock(BytesRef uid) { int hash = DjbHashFunction.DJB_HASH(uid.bytes, uid.offset, uid.length); return dirtyLocks[MathUtils.mod(hash, dirtyLocks.length)];