[CORE] Move as much as possible into abstract Engine
This paves the way for more shared code between the `InternalEngine` and `ShadowEngine` by way of the abstract `Engine` class. No actual functionality has been changed.
This commit is contained in:
parent
8cba6c3abb
commit
305ba33710
|
@ -26,30 +26,36 @@ import org.apache.lucene.search.Query;
|
|||
import org.apache.lucene.search.SearcherManager;
|
||||
import org.apache.lucene.search.join.BitDocIdSetFilter;
|
||||
import org.apache.lucene.util.Accountable;
|
||||
import org.apache.lucene.util.Accountables;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Preconditions;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
|
||||
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||
import org.elasticsearch.index.mapper.ParseContext.Document;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.mapper.Uid;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
@ -59,8 +65,15 @@ import java.util.concurrent.locks.ReentrantLock;
|
|||
*/
|
||||
public abstract class Engine implements Closeable {
|
||||
|
||||
protected final ShardId shardId;
|
||||
protected final ESLogger logger;
|
||||
protected final EngineConfig engineConfig;
|
||||
protected final Store store;
|
||||
protected final AtomicBoolean isClosed = new AtomicBoolean(false);
|
||||
protected final FailedEngineListener failedEngineListener;
|
||||
protected final SnapshotDeletionPolicy deletionPolicy;
|
||||
|
||||
protected volatile Throwable failedEngine = null;
|
||||
|
||||
protected Engine(EngineConfig engineConfig) {
|
||||
Preconditions.checkNotNull(engineConfig.getStore(), "Store must be provided to the engine");
|
||||
|
@ -68,7 +81,11 @@ public abstract class Engine implements Closeable {
|
|||
Preconditions.checkNotNull(engineConfig.getTranslog(), "Translog must be provided to the engine");
|
||||
|
||||
this.engineConfig = engineConfig;
|
||||
this.shardId = engineConfig.getShardId();
|
||||
this.store = engineConfig.getStore();
|
||||
this.logger = Loggers.getLogger(getClass(), engineConfig.getIndexSettings(), engineConfig.getShardId());
|
||||
this.failedEngineListener = engineConfig.getFailedEngineListener();
|
||||
this.deletionPolicy = engineConfig.getDeletionPolicy();
|
||||
}
|
||||
|
||||
/** Returns 0 in the case where accountable is null, otherwise returns {@code ramBytesUsed()} */
|
||||
|
@ -107,7 +124,7 @@ public abstract class Engine implements Closeable {
|
|||
}
|
||||
|
||||
protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) {
|
||||
return new EngineSearcher(source, searcher, manager, engineConfig.getStore(), logger);
|
||||
return new EngineSearcher(source, searcher, manager, store, logger);
|
||||
}
|
||||
|
||||
public final EngineConfig config() {
|
||||
|
@ -181,6 +198,35 @@ public abstract class Engine implements Closeable {
|
|||
|
||||
public abstract void delete(DeleteByQuery delete) throws EngineException;
|
||||
|
||||
final protected GetResult getFromSearcher(Get get) throws EngineException {
|
||||
final Searcher searcher = acquireSearcher("get");
|
||||
final Versions.DocIdAndVersion docIdAndVersion;
|
||||
try {
|
||||
docIdAndVersion = Versions.loadDocIdAndVersion(searcher.reader(), get.uid());
|
||||
} catch (Throwable e) {
|
||||
Releasables.closeWhileHandlingException(searcher);
|
||||
//TODO: A better exception goes here
|
||||
throw new EngineException(shardId, "Couldn't resolve version", e);
|
||||
}
|
||||
|
||||
if (docIdAndVersion != null) {
|
||||
if (get.versionType().isVersionConflictForReads(docIdAndVersion.version, get.version())) {
|
||||
Releasables.close(searcher);
|
||||
Uid uid = Uid.createUid(get.uid().text());
|
||||
throw new VersionConflictEngineException(shardId, uid.type(), uid.id(), docIdAndVersion.version, get.version());
|
||||
}
|
||||
}
|
||||
|
||||
if (docIdAndVersion != null) {
|
||||
// don't release the searcher on this path, it is the
|
||||
// responsibility of the caller to call GetResult.release
|
||||
return new GetResult(searcher, docIdAndVersion);
|
||||
} else {
|
||||
Releasables.close(searcher);
|
||||
return GetResult.NOT_EXISTS;
|
||||
}
|
||||
}
|
||||
|
||||
public abstract GetResult get(Get get) throws EngineException;
|
||||
|
||||
/**
|
||||
|
@ -190,22 +236,144 @@ public abstract class Engine implements Closeable {
|
|||
*
|
||||
* @see Searcher#close()
|
||||
*/
|
||||
public abstract Searcher acquireSearcher(String source) throws EngineException;
|
||||
public final Searcher acquireSearcher(String source) throws EngineException {
|
||||
boolean success = false;
|
||||
/* Acquire order here is store -> manager since we need
|
||||
* to make sure that the store is not closed before
|
||||
* the searcher is acquired. */
|
||||
store.incRef();
|
||||
try {
|
||||
final SearcherManager manager = getSearcherManager(); // can never be null
|
||||
/* This might throw NPE but that's fine we will run ensureOpen()
|
||||
* in the catch block and throw the right exception */
|
||||
final IndexSearcher searcher = manager.acquire();
|
||||
try {
|
||||
final Searcher retVal = newSearcher(source, searcher, manager);
|
||||
success = true;
|
||||
return retVal;
|
||||
} finally {
|
||||
if (!success) {
|
||||
manager.release(searcher);
|
||||
}
|
||||
}
|
||||
} catch (EngineClosedException ex) {
|
||||
throw ex;
|
||||
} catch (Throwable ex) {
|
||||
ensureOpen(); // throw EngineCloseException here if we are already closed
|
||||
logger.error("failed to acquire searcher, source {}", ex, source);
|
||||
throw new EngineException(shardId, "failed to acquire searcher, source " + source, ex);
|
||||
} finally {
|
||||
if (!success) { // release the ref in the case of an error...
|
||||
store.decRef();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void ensureOpen() {
|
||||
if (isClosed.get()) {
|
||||
throw new EngineClosedException(shardId, failedEngine);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Global stats on segments.
|
||||
*/
|
||||
public abstract SegmentsStats segmentsStats();
|
||||
|
||||
protected Segment[] getSegmentInfo(SegmentInfos lastCommittedSegmentInfos, boolean verbose) {
|
||||
ensureOpen();
|
||||
Map<String, Segment> segments = new HashMap<>();
|
||||
|
||||
// first, go over and compute the search ones...
|
||||
Searcher searcher = acquireSearcher("segments");
|
||||
try {
|
||||
for (LeafReaderContext reader : searcher.reader().leaves()) {
|
||||
SegmentCommitInfo info = segmentReader(reader.reader()).getSegmentInfo();
|
||||
assert !segments.containsKey(info.info.name);
|
||||
Segment segment = new Segment(info.info.name);
|
||||
segment.search = true;
|
||||
segment.docCount = reader.reader().numDocs();
|
||||
segment.delDocCount = reader.reader().numDeletedDocs();
|
||||
segment.version = info.info.getVersion();
|
||||
segment.compound = info.info.getUseCompoundFile();
|
||||
try {
|
||||
segment.sizeInBytes = info.sizeInBytes();
|
||||
} catch (IOException e) {
|
||||
logger.trace("failed to get size for [{}]", e, info.info.name);
|
||||
}
|
||||
final SegmentReader segmentReader = segmentReader(reader.reader());
|
||||
segment.memoryInBytes = segmentReader.ramBytesUsed();
|
||||
if (verbose) {
|
||||
segment.ramTree = Accountables.namedAccountable("root", segmentReader);
|
||||
}
|
||||
// TODO: add more fine grained mem stats values to per segment info here
|
||||
segments.put(info.info.name, segment);
|
||||
}
|
||||
} finally {
|
||||
searcher.close();
|
||||
}
|
||||
|
||||
// now, correlate or add the committed ones...
|
||||
if (lastCommittedSegmentInfos != null) {
|
||||
SegmentInfos infos = lastCommittedSegmentInfos;
|
||||
for (SegmentCommitInfo info : infos) {
|
||||
Segment segment = segments.get(info.info.name);
|
||||
if (segment == null) {
|
||||
segment = new Segment(info.info.name);
|
||||
segment.search = false;
|
||||
segment.committed = true;
|
||||
segment.docCount = info.info.getDocCount();
|
||||
segment.delDocCount = info.getDelCount();
|
||||
segment.version = info.info.getVersion();
|
||||
segment.compound = info.info.getUseCompoundFile();
|
||||
try {
|
||||
segment.sizeInBytes = info.sizeInBytes();
|
||||
} catch (IOException e) {
|
||||
logger.trace("failed to get size for [{}]", e, info.info.name);
|
||||
}
|
||||
segments.put(info.info.name, segment);
|
||||
} else {
|
||||
segment.committed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Segment[] segmentsArr = segments.values().toArray(new Segment[segments.values().size()]);
|
||||
Arrays.sort(segmentsArr, new Comparator<Segment>() {
|
||||
@Override
|
||||
public int compare(Segment o1, Segment o2) {
|
||||
return (int) (o1.getGeneration() - o2.getGeneration());
|
||||
}
|
||||
});
|
||||
|
||||
return segmentsArr;
|
||||
}
|
||||
|
||||
/**
|
||||
* The list of segments in the engine.
|
||||
*/
|
||||
public abstract List<Segment> segments(boolean verbose);
|
||||
|
||||
/**
|
||||
* Returns <tt>true</tt> if a refresh is really needed.
|
||||
*/
|
||||
public abstract boolean refreshNeeded();
|
||||
public final boolean refreshNeeded() {
|
||||
if (store.tryIncRef()) {
|
||||
/*
|
||||
we need to inc the store here since searcherManager.isSearcherCurrent()
|
||||
acquires a searcher internally and that might keep a file open on the
|
||||
store. this violates the assumption that all files are closed when
|
||||
the store is closed so we need to make sure we increment it here
|
||||
*/
|
||||
try {
|
||||
return !getSearcherManager().isSearcherCurrent();
|
||||
} catch (IOException e) {
|
||||
logger.error("failed to access searcher manager", e);
|
||||
failEngine("failed to access searcher manager", e);
|
||||
throw new EngineException(shardId, "failed to access searcher manager", e);
|
||||
} finally {
|
||||
store.decRef();
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Refreshes the engine for new search operations to reflect the latest
|
||||
|
@ -250,6 +418,34 @@ public abstract class Engine implements Closeable {
|
|||
/** fail engine due to some error. the engine will also be closed. */
|
||||
public abstract void failEngine(String reason, Throwable failure);
|
||||
|
||||
/** Check whether the engine should be failed */
|
||||
protected boolean maybeFailEngine(String source, Throwable t) {
|
||||
if (Lucene.isCorruptionException(t)) {
|
||||
if (engineConfig.isFailEngineOnCorruption()) {
|
||||
failEngine("corrupt file detected source: [" + source + "]", t);
|
||||
return true;
|
||||
} else {
|
||||
logger.warn("corrupt file detected source: [{}] but [{}] is set to [{}]", t, source,
|
||||
EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, engineConfig.isFailEngineOnCorruption());
|
||||
}
|
||||
} else if (ExceptionsHelper.isOOM(t)) {
|
||||
failEngine("out of memory", t);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/** Wrap a Throwable in an {@code EngineClosedException} if the engine is already closed */
|
||||
protected Throwable wrapIfClosed(Throwable t) {
|
||||
if (isClosed.get()) {
|
||||
if (t != failedEngine && failedEngine != null) {
|
||||
t.addSuppressed(failedEngine);
|
||||
}
|
||||
return new EngineClosedException(shardId, t);
|
||||
}
|
||||
return t;
|
||||
}
|
||||
|
||||
public static interface FailedEngineListener {
|
||||
void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t);
|
||||
}
|
||||
|
@ -765,4 +961,6 @@ public abstract class Engine implements Closeable {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract SearcherManager getSearcherManager();
|
||||
}
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
|
|||
import org.apache.lucene.search.*;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.store.LockObtainFailedException;
|
||||
import org.apache.lucene.util.Accountables;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
|
@ -34,18 +33,13 @@ import org.elasticsearch.cluster.routing.DjbHashFunction;
|
|||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.lucene.LoggerInfoStream;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.math.MathUtils;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
|
||||
import org.elasticsearch.index.indexing.ShardIndexingService;
|
||||
import org.elasticsearch.index.mapper.Uid;
|
||||
|
@ -54,8 +48,6 @@ import org.elasticsearch.index.merge.policy.ElasticsearchMergePolicy;
|
|||
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
|
||||
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
|
||||
import org.elasticsearch.index.search.nested.IncludeNestedDocsQuery;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndicesWarmer;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -75,7 +67,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|||
*/
|
||||
public class InternalEngine extends Engine {
|
||||
|
||||
protected final ShardId shardId;
|
||||
private final FailEngineOnMergeFailure mergeSchedulerFailureListener;
|
||||
private final MergeSchedulerListener mergeSchedulerListener;
|
||||
|
||||
|
@ -85,8 +76,6 @@ public class InternalEngine extends Engine {
|
|||
private final ShardIndexingService indexingService;
|
||||
@Nullable
|
||||
private final IndicesWarmer warmer;
|
||||
private final Store store;
|
||||
private final SnapshotDeletionPolicy deletionPolicy;
|
||||
private final Translog translog;
|
||||
private final MergePolicyProvider mergePolicyProvider;
|
||||
private final MergeSchedulerProvider mergeScheduler;
|
||||
|
@ -100,7 +89,6 @@ public class InternalEngine extends Engine {
|
|||
private final SearcherFactory searcherFactory;
|
||||
private final SearcherManager searcherManager;
|
||||
|
||||
private final AtomicBoolean isClosed = new AtomicBoolean(false);
|
||||
private final AtomicBoolean optimizeMutex = new AtomicBoolean();
|
||||
// we use flushNeeded here, since if there are no changes, then the commit won't write
|
||||
// will not really happen, and then the commitUserData and the new translog will not be reflected
|
||||
|
@ -113,9 +101,7 @@ public class InternalEngine extends Engine {
|
|||
private final LiveVersionMap versionMap;
|
||||
|
||||
private final Object[] dirtyLocks;
|
||||
private volatile Throwable failedEngine = null;
|
||||
private final ReentrantLock failEngineLock = new ReentrantLock();
|
||||
private final FailedEngineListener failedEngineListener;
|
||||
|
||||
private final AtomicLong translogIdGenerator = new AtomicLong();
|
||||
private final AtomicBoolean versionMapRefreshPending = new AtomicBoolean();
|
||||
|
@ -126,8 +112,6 @@ public class InternalEngine extends Engine {
|
|||
|
||||
public InternalEngine(EngineConfig engineConfig) throws EngineException {
|
||||
super(engineConfig);
|
||||
this.store = engineConfig.getStore();
|
||||
this.shardId = engineConfig.getShardId();
|
||||
this.versionMap = new LiveVersionMap();
|
||||
store.incRef();
|
||||
IndexWriter writer = null;
|
||||
|
@ -138,7 +122,6 @@ public class InternalEngine extends Engine {
|
|||
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
|
||||
this.indexingService = engineConfig.getIndexingService();
|
||||
this.warmer = engineConfig.getWarmer();
|
||||
this.deletionPolicy = engineConfig.getDeletionPolicy();
|
||||
this.translog = engineConfig.getTranslog();
|
||||
this.mergePolicyProvider = engineConfig.getMergePolicyProvider();
|
||||
this.mergeScheduler = engineConfig.getMergeScheduler();
|
||||
|
@ -147,7 +130,6 @@ public class InternalEngine extends Engine {
|
|||
dirtyLocks[i] = new Object();
|
||||
}
|
||||
|
||||
this.failedEngineListener = engineConfig.getFailedEngineListener();
|
||||
throttle = new IndexThrottle();
|
||||
this.searcherFactory = new SearchFactory(engineConfig);
|
||||
try {
|
||||
|
@ -251,31 +233,7 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
// no version, get the version from the index, we know that we refresh on flush
|
||||
final Searcher searcher = acquireSearcher("get");
|
||||
final Versions.DocIdAndVersion docIdAndVersion;
|
||||
try {
|
||||
docIdAndVersion = Versions.loadDocIdAndVersion(searcher.reader(), get.uid());
|
||||
} catch (Throwable e) {
|
||||
Releasables.closeWhileHandlingException(searcher);
|
||||
//TODO: A better exception goes here
|
||||
throw new EngineException(shardId, "Couldn't resolve version", e);
|
||||
}
|
||||
|
||||
if (docIdAndVersion != null) {
|
||||
if (get.versionType().isVersionConflictForReads(docIdAndVersion.version, get.version())) {
|
||||
Releasables.close(searcher);
|
||||
Uid uid = Uid.createUid(get.uid().text());
|
||||
throw new VersionConflictEngineException(shardId, uid.type(), uid.id(), docIdAndVersion.version, get.version());
|
||||
}
|
||||
}
|
||||
|
||||
if (docIdAndVersion != null) {
|
||||
// don't release the searcher on this path, it is the responsability of the caller to call GetResult.release
|
||||
return new GetResult(searcher, docIdAndVersion);
|
||||
} else {
|
||||
Releasables.close(searcher);
|
||||
return GetResult.NOT_EXISTS;
|
||||
}
|
||||
return getFromSearcher(get);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -579,63 +537,6 @@ public class InternalEngine extends Engine {
|
|||
refresh("delete_by_query");
|
||||
}
|
||||
|
||||
@Override
|
||||
public final Searcher acquireSearcher(String source) throws EngineException {
|
||||
boolean success = false;
|
||||
/* Acquire order here is store -> manager since we need
|
||||
* to make sure that the store is not closed before
|
||||
* the searcher is acquired. */
|
||||
store.incRef();
|
||||
try {
|
||||
final SearcherManager manager = this.searcherManager; // can never be null
|
||||
assert manager != null : "SearcherManager is null";
|
||||
/* This might throw NPE but that's fine we will run ensureOpen()
|
||||
* in the catch block and throw the right exception */
|
||||
final IndexSearcher searcher = manager.acquire();
|
||||
try {
|
||||
final Searcher retVal = newSearcher(source, searcher, manager);
|
||||
success = true;
|
||||
return retVal;
|
||||
} finally {
|
||||
if (!success) {
|
||||
manager.release(searcher);
|
||||
}
|
||||
}
|
||||
} catch (EngineClosedException ex) {
|
||||
throw ex;
|
||||
} catch (Throwable ex) {
|
||||
ensureOpen(); // throw EngineCloseException here if we are already closed
|
||||
logger.error("failed to acquire searcher, source {}", ex, source);
|
||||
throw new EngineException(shardId, "failed to acquire searcher, source " + source, ex);
|
||||
} finally {
|
||||
if (!success) { // release the ref in the case of an error...
|
||||
store.decRef();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean refreshNeeded() {
|
||||
if (store.tryIncRef()) {
|
||||
/*
|
||||
we need to inc the store here since searcherManager.isSearcherCurrent()
|
||||
acquires a searcher internally and that might keep a file open on the
|
||||
store. this violates the assumption that all files are closed when
|
||||
the store is closed so we need to make sure we increment it here
|
||||
*/
|
||||
try {
|
||||
return !searcherManager.isSearcherCurrent();
|
||||
} catch (IOException e) {
|
||||
logger.error("failed to access searcher manager", e);
|
||||
failEngine("failed to access searcher manager", e);
|
||||
throw new EngineException(shardId, "failed to access searcher manager", e);
|
||||
} finally {
|
||||
store.decRef();
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refresh(String source) throws EngineException {
|
||||
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
|
||||
|
@ -770,12 +671,6 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
}
|
||||
|
||||
private void ensureOpen() {
|
||||
if (isClosed.get()) {
|
||||
throw new EngineClosedException(shardId, failedEngine);
|
||||
}
|
||||
}
|
||||
|
||||
private void pruneDeletedTombstones() {
|
||||
long timeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
|
||||
|
||||
|
@ -858,7 +753,6 @@ public class InternalEngine extends Engine {
|
|||
waitForMerges(flush, upgrade);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public SnapshotIndexCommit snapshotIndex() throws EngineException {
|
||||
// we have to flush outside of the readlock otherwise we might have a problem upgrading
|
||||
|
@ -931,18 +825,15 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean maybeFailEngine(String source, Throwable t) {
|
||||
if (Lucene.isCorruptionException(t)) {
|
||||
if (engineConfig.isFailEngineOnCorruption()) {
|
||||
failEngine("corrupt file detected source: [" + source + "]", t);
|
||||
return true;
|
||||
} else {
|
||||
logger.warn("corrupt file detected source: [{}] but [{}] is set to [{}]", t, source, EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, engineConfig.isFailEngineOnCorruption());
|
||||
}
|
||||
} else if (ExceptionsHelper.isOOM(t)) {
|
||||
failEngine("out of memory", t);
|
||||
@Override
|
||||
protected boolean maybeFailEngine(String source, Throwable t) {
|
||||
boolean shouldFail = super.maybeFailEngine(source, t);
|
||||
if (shouldFail) {
|
||||
return true;
|
||||
} else if (t instanceof AlreadyClosedException) {
|
||||
}
|
||||
|
||||
// Check for AlreadyClosedException
|
||||
if (t instanceof AlreadyClosedException) {
|
||||
// if we are already closed due to some tragic exception
|
||||
// we need to fail the engine. it might have already been failed before
|
||||
// but we are double-checking it's failed and closed
|
||||
|
@ -959,16 +850,6 @@ public class InternalEngine extends Engine {
|
|||
return false;
|
||||
}
|
||||
|
||||
private Throwable wrapIfClosed(Throwable t) {
|
||||
if (isClosed.get()) {
|
||||
if (t != failedEngine && failedEngine != null) {
|
||||
t.addSuppressed(failedEngine);
|
||||
}
|
||||
return new EngineClosedException(shardId, t);
|
||||
}
|
||||
return t;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SegmentsStats segmentsStats() {
|
||||
ensureOpen();
|
||||
|
@ -993,70 +874,7 @@ public class InternalEngine extends Engine {
|
|||
@Override
|
||||
public List<Segment> segments(boolean verbose) {
|
||||
try (ReleasableLock _ = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
Map<String, Segment> segments = new HashMap<>();
|
||||
|
||||
// first, go over and compute the search ones...
|
||||
Searcher searcher = acquireSearcher("segments");
|
||||
try {
|
||||
for (LeafReaderContext reader : searcher.reader().leaves()) {
|
||||
SegmentCommitInfo info = segmentReader(reader.reader()).getSegmentInfo();
|
||||
assert !segments.containsKey(info.info.name);
|
||||
Segment segment = new Segment(info.info.name);
|
||||
segment.search = true;
|
||||
segment.docCount = reader.reader().numDocs();
|
||||
segment.delDocCount = reader.reader().numDeletedDocs();
|
||||
segment.version = info.info.getVersion();
|
||||
segment.compound = info.info.getUseCompoundFile();
|
||||
try {
|
||||
segment.sizeInBytes = info.sizeInBytes();
|
||||
} catch (IOException e) {
|
||||
logger.trace("failed to get size for [{}]", e, info.info.name);
|
||||
}
|
||||
final SegmentReader segmentReader = segmentReader(reader.reader());
|
||||
segment.memoryInBytes = segmentReader.ramBytesUsed();
|
||||
if (verbose) {
|
||||
segment.ramTree = Accountables.namedAccountable("root", segmentReader);
|
||||
}
|
||||
// TODO: add more fine grained mem stats values to per segment info here
|
||||
segments.put(info.info.name, segment);
|
||||
}
|
||||
} finally {
|
||||
searcher.close();
|
||||
}
|
||||
|
||||
// now, correlate or add the committed ones...
|
||||
if (lastCommittedSegmentInfos != null) {
|
||||
SegmentInfos infos = lastCommittedSegmentInfos;
|
||||
for (SegmentCommitInfo info : infos) {
|
||||
Segment segment = segments.get(info.info.name);
|
||||
if (segment == null) {
|
||||
segment = new Segment(info.info.name);
|
||||
segment.search = false;
|
||||
segment.committed = true;
|
||||
segment.docCount = info.info.getDocCount();
|
||||
segment.delDocCount = info.getDelCount();
|
||||
segment.version = info.info.getVersion();
|
||||
segment.compound = info.info.getUseCompoundFile();
|
||||
try {
|
||||
segment.sizeInBytes = info.sizeInBytes();
|
||||
} catch (IOException e) {
|
||||
logger.trace("failed to get size for [{}]", e, info.info.name);
|
||||
}
|
||||
segments.put(info.info.name, segment);
|
||||
} else {
|
||||
segment.committed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Segment[] segmentsArr = segments.values().toArray(new Segment[segments.values().size()]);
|
||||
Arrays.sort(segmentsArr, new Comparator<Segment>() {
|
||||
@Override
|
||||
public int compare(Segment o1, Segment o2) {
|
||||
return (int) (o1.getGeneration() - o2.getGeneration());
|
||||
}
|
||||
});
|
||||
Segment[] segmentsArr = getSegmentInfo(lastCommittedSegmentInfos, verbose);
|
||||
|
||||
// fill in the merges flag
|
||||
Set<OnGoingMerge> onGoingMerges = mergeScheduler.onGoingMerges();
|
||||
|
@ -1070,7 +888,6 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Arrays.asList(segmentsArr);
|
||||
}
|
||||
}
|
||||
|
@ -1162,6 +979,11 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SearcherManager getSearcherManager() {
|
||||
return searcherManager;
|
||||
}
|
||||
|
||||
private Object dirtyLock(BytesRef uid) {
|
||||
int hash = DjbHashFunction.DJB_HASH(uid.bytes, uid.offset, uid.length);
|
||||
return dirtyLocks[MathUtils.mod(hash, dirtyLocks.length)];
|
||||
|
|
Loading…
Reference in New Issue