[ENGINE] Simplify Engine construction and ref counting

Today the internal engine closes itself it the engine hits an exception
it can not recover from. This complicates a lot of refcounting issues
if such an exception happens during engine creation. This commit
only markes the engine as failed and let the user close it once the exception
bubbles up. Additionally it rolls back the indexwriter to prevent any changes after
the engine is failed.
This commit is contained in:
Simon Willnauer 2015-01-09 01:41:57 +01:00
parent 592f517583
commit d2277d70ff
2 changed files with 96 additions and 113 deletions

View File

@ -26,10 +26,7 @@ import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
import org.apache.lucene.search.*;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Accountables;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.*;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
@ -109,7 +106,8 @@ public class InternalEngine implements Engine {
private final SearcherFactory searcherFactory = new SearchFactory();
private volatile SearcherManager searcherManager;
private volatile boolean closed = false;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private volatile boolean closedOrFailed = false;
// flag indicating if a dirty operation has occurred since the last refresh
private volatile boolean dirty = false;
@ -121,8 +119,6 @@ public class InternalEngine implements Engine {
private final Lock flushLock = new ReentrantLock();
protected final RecoveryCounter onGoingRecoveries = new RecoveryCounter();
// A uid (in the form of BytesRef) to the version map
// we use the hashed variant since we iterate over it and check removal and additions on existing keys
private final LiveVersionMap versionMap;
@ -131,7 +127,7 @@ public class InternalEngine implements Engine {
private final Object refreshMutex = new Object();
private Throwable failedEngine = null;
private volatile Throwable failedEngine = null;
private final Lock failEngineLock = new ReentrantLock();
private final FailedEngineListener failedEngineListener;
@ -146,47 +142,55 @@ public class InternalEngine implements Engine {
Preconditions.checkNotNull(engineConfig.getStore(), "Store must be provided to the engine");
Preconditions.checkNotNull(engineConfig.getDeletionPolicy(), "Snapshot deletion policy must be provided to the engine");
Preconditions.checkNotNull(engineConfig.getTranslog(), "Translog must be provided to the engine");
this.shardId = engineConfig.getShardId();
this.logger = Loggers.getLogger(getClass(), engineConfig.getIndexSettings(), shardId);
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
this.indexingService = engineConfig.getIndexingService();
this.warmer = engineConfig.getWarmer();
this.store = engineConfig.getStore();
this.deletionPolicy = engineConfig.getDeletionPolicy();
this.translog = engineConfig.getTranslog();
this.mergePolicyProvider = engineConfig.getMergePolicyProvider();
this.mergeScheduler = engineConfig.getMergeScheduler();
this.versionMap = new LiveVersionMap();
this.dirtyLocks = new Object[engineConfig.getIndexConcurrency() * 50]; // we multiply it to have enough...
for (int i = 0; i < dirtyLocks.length; i++) {
dirtyLocks[i] = new Object();
}
this.mergeSchedulerFailureListener = new FailEngineOnMergeFailure();
this.mergeSchedulerListener = new MergeSchedulerListener();
this.mergeScheduler.addListener(mergeSchedulerListener);
this.mergeScheduler.addFailureListener(mergeSchedulerFailureListener);
this.failedEngineListener = engineConfig.getFailedEngineListener();
throttle = new IndexThrottle();
this.engineConfig = engineConfig;
listener = new EngineConfig.EngineSettingsListener(logger, engineConfig) {
@Override
protected void onChange() {
updateSettings();
store.incRef();
boolean success = false;
try {
this.shardId = engineConfig.getShardId();
this.logger = Loggers.getLogger(getClass(), engineConfig.getIndexSettings(), shardId);
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
this.indexingService = engineConfig.getIndexingService();
this.warmer = engineConfig.getWarmer();
this.deletionPolicy = engineConfig.getDeletionPolicy();
this.translog = engineConfig.getTranslog();
this.mergePolicyProvider = engineConfig.getMergePolicyProvider();
this.mergeScheduler = engineConfig.getMergeScheduler();
this.versionMap = new LiveVersionMap();
this.dirtyLocks = new Object[engineConfig.getIndexConcurrency() * 50]; // we multiply it to have enough...
for (int i = 0; i < dirtyLocks.length; i++) {
dirtyLocks[i] = new Object();
}
};
engineConfig.getIndexSettingsService().addListener(listener);
final IndexWriter writer = start();
assert indexWriter == null : "IndexWriter already initialized";
indexWriter = writer;
this.failedEngineListener = engineConfig.getFailedEngineListener();
throttle = new IndexThrottle();
this.engineConfig = engineConfig;
listener = new EngineConfig.EngineSettingsListener(logger, engineConfig) {
@Override
protected void onChange() {
updateSettings();
}
};
engineConfig.getIndexSettingsService().addListener(listener);
final IndexWriter writer = start();
assert indexWriter == null : "IndexWriter already initialized";
indexWriter = writer;
this.mergeSchedulerFailureListener = new FailEngineOnMergeFailure();
this.mergeSchedulerListener = new MergeSchedulerListener();
this.mergeScheduler.addListener(mergeSchedulerListener);
this.mergeScheduler.addFailureListener(mergeSchedulerFailureListener);
success = true;
} finally {
if (success == false) {
// failure we need to dec the store reference
store.decRef();
}
}
}
@Override
public void updateIndexingBufferSize(ByteSizeValue indexingBufferSize) {
ByteSizeValue preValue = engineConfig.getIndexingBufferSize();
try (InternalLock _ = readLock.acquire()) {
ensureOpen();
engineConfig.setIndexingBufferSize(indexingBufferSize);
@ -213,7 +217,6 @@ public class InternalEngine implements Engine {
}
private IndexWriter start() throws EngineException {
store.incRef();
boolean success = false;
IndexWriter indexWriter = null;
SearcherManager searcherManager = null;
@ -257,14 +260,13 @@ public class InternalEngine implements Engine {
}
} finally {
if (success == false) { // release everything we created on a failure
store.decRef();
IOUtils.closeWhileHandlingException(indexWriter, searcherManager);
IOUtils.closeWhileHandlingException(searcherManager, indexWriter);
}
}
}
private void updateSettings() {
if (closed == false) {
if (closedOrFailed == false) {
final LiveIndexWriterConfig iwc = indexWriter.getConfig();
iwc.setUseCompoundFile(engineConfig.isCompoundOnFlush());
final boolean concurrencyNeedsUpdate = iwc.getMaxThreadStates() != engineConfig.getIndexConcurrency();
@ -460,7 +462,7 @@ public class InternalEngine implements Engine {
// TODO: we force refresh when versionMap is using > 25% of IW's RAM buffer; should we make this separately configurable?
if (versionMap.ramBytesUsedForRefresh() > 0.25 * engineConfig.getIndexingBufferSize().bytes() && versionMapRefreshPending.getAndSet(true) == false) {
try {
if (closed) {
if (closedOrFailed) {
// no point...
return;
}
@ -694,7 +696,6 @@ public class InternalEngine implements Engine {
@Override
public void refresh(String source, boolean force) throws EngineException {
ensureOpen();
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
try (InternalLock _ = readLock.acquire()) {
@ -712,7 +713,7 @@ public class InternalEngine implements Engine {
}
}
} catch (AlreadyClosedException e) {
// an index writer got replaced on us, ignore
ensureOpen();
} catch (EngineClosedException e) {
throw e;
} catch (Throwable t) {
@ -856,7 +857,7 @@ public class InternalEngine implements Engine {
ensureOpen();
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
} catch (Throwable e) {
if (!closed) {
if (closedOrFailed == false) {
logger.warn("failed to read latest segment infos on flush", e);
if (Lucene.isCorruptionException(e)) {
throw new FlushFailedEngineException(shardId, e);
@ -873,7 +874,7 @@ public class InternalEngine implements Engine {
}
private void ensureOpen() {
if (closed) {
if (closedOrFailed) {
throw new EngineClosedException(shardId, failedEngine);
}
}
@ -884,9 +885,10 @@ public class InternalEngine implements Engine {
* @throws EngineClosedException if the engine is already closed
*/
private IndexWriter currentIndexWriter() {
ensureOpen();
final IndexWriter writer = indexWriter;
if (writer == null) {
assert closed : "Engine is not closed but writer is null";
assert closedOrFailed : "Engine is not closed but writer is null";
throw new EngineClosedException(shardId, failedEngine);
}
return writer;
@ -1006,8 +1008,8 @@ public class InternalEngine implements Engine {
// take a write lock here so it won't happen while a flush is in progress
// this means that next commits will not be allowed once the lock is released
try (InternalLock _ = writeLock.acquire()) {
if (closed) {
throw new EngineClosedException(shardId);
if (closedOrFailed) {
throw new EngineClosedException(shardId, failedEngine);
}
onGoingRecoveries.startRecovery();
}
@ -1077,7 +1079,10 @@ public class InternalEngine implements Engine {
}
private Throwable wrapIfClosed(Throwable t) {
if (closed) {
if (closedOrFailed) {
if (t != failedEngine && failedEngine != null) {
t.addSuppressed(failedEngine);
}
return new EngineClosedException(shardId, t);
}
return t;
@ -1205,9 +1210,9 @@ public class InternalEngine implements Engine {
logger.debug("close now acquire writeLock");
try (InternalLock _ = writeLock.acquire()) {
logger.debug("close acquired writeLock");
if (!closed) {
if (isClosed.compareAndSet(false, true)) {
try {
closed = true;
closedOrFailed = true;
this.versionMap.clear();
logger.debug("close searcherManager");
try {
@ -1228,8 +1233,8 @@ public class InternalEngine implements Engine {
} catch (Throwable e) {
logger.warn("failed to rollback writer on close", e);
} finally {
store.decRef();
indexWriter = null;
store.decRef();
this.mergeScheduler.removeListener(mergeSchedulerListener);
this.mergeScheduler.removeFailureListener(mergeSchedulerFailureListener);
engineConfig.getIndexSettingsService().removeListener(listener);
@ -1239,8 +1244,7 @@ public class InternalEngine implements Engine {
}
LiveIndexWriterConfig currentIndexWriterConfig() {
ensureOpen();
return this.indexWriter.getConfig();
return currentIndexWriter().getConfig();
}
@Override
@ -1248,33 +1252,41 @@ public class InternalEngine implements Engine {
assert failure != null;
if (failEngineLock.tryLock()) {
try {
// we first mark the store as corrupted before we notify any listeners
// this must happen first otherwise we might try to reallocate so quickly
// on the same node that we don't see the corrupted marker file when
// the shard is initializing
if (Lucene.isCorruptionException(failure)) {
try {
store.markStoreCorrupted(ExceptionsHelper.unwrapCorruption(failure));
} catch (IOException e) {
logger.warn("Couldn't marks store corrupted", e);
}
}
} finally {
assert !readLock.assertLockIsHeld() : "readLock is held by a thread that tries to fail the engine";
if (failedEngine != null) {
logger.debug("tried to fail engine but engine is already failed. ignoring. [{}]", reason, failure);
return;
}
try {
// we first mark the store as corrupted before we notify any listeners
// this must happen first otherwise we might try to reallocate so quickly
// on the same node that we don't see the corrupted marker file when
// the shard is initializing
if (Lucene.isCorruptionException(failure)) {
try {
store.markStoreCorrupted(ExceptionsHelper.unwrapCorruption(failure));
} catch (IOException e) {
logger.warn("Couldn't marks store corrupted", e);
}
}
} finally {
if (failedEngine != null) {
logger.debug("tried to fail engine but engine is already failed. ignoring. [{}]", reason, failure);
return;
}
logger.warn("failed engine [{}]", failure, reason);
// we must set a failure exception, generate one if not supplied
failedEngine = failure;
failedEngineListener.onFailedEngine(shardId, reason, failure);
} finally {
}
} catch (Throwable t) {
// don't bubble up these exceptions up
logger.warn("failEngine threw exception", t);
} finally {
closedOrFailed = true;
try (InternalLock _ = readLock.acquire()) {
// we take the readlock here to ensure nobody replaces this IW concurrently.
if (indexWriter != null) {
// we might be not yet be fully constructed - don't call close
close();
indexWriter.rollback();
}
} catch (Throwable t) {
logger.warn("Rolling back indexwriter on engine failure failed", t);
// to be on the safe side we just rollback the IW
}
}
} else {
@ -1347,7 +1359,7 @@ public class InternalEngine implements Engine {
}
} catch (Throwable t) {
// Don't fail a merge if the warm-up failed
if (!closed) {
if (closedOrFailed == false) {
logger.warn("Warm-up failed", t);
}
if (t instanceof Error) {
@ -1471,7 +1483,7 @@ public class InternalEngine implements Engine {
}
warmer.warmTopReader(new IndicesWarmer.WarmerContext(shardId, new SimpleSearcher("warmer", searcher)));
} catch (Throwable e) {
if (!closed) {
if (closedOrFailed == false) {
logger.warn("failed to prepare/warm", e);
}
} finally {
@ -1510,51 +1522,21 @@ public class InternalEngine implements Engine {
}
private static final class InternalLock implements Releasable {
private final ThreadLocal<AtomicInteger> lockIsHeld;
private final Lock lock;
InternalLock(Lock lock) {
ThreadLocal<AtomicInteger> tl = null;
assert (tl = new ThreadLocal<>()) != null;
lockIsHeld = tl;
this.lock = lock;
}
@Override
public void close() {
lock.unlock();
assert onAssertRelease();
}
InternalLock acquire() throws EngineException {
lock.lock();
assert onAssertLock();
return this;
}
protected boolean onAssertRelease() {
AtomicInteger count = lockIsHeld.get();
if (count.decrementAndGet() == 0) {
lockIsHeld.remove();
}
return true;
}
protected boolean onAssertLock() {
AtomicInteger count = lockIsHeld.get();
if (count == null) {
count = new AtomicInteger(0);
lockIsHeld.set(count);
}
count.incrementAndGet();
return true;
}
boolean assertLockIsHeld() {
AtomicInteger count = lockIsHeld.get();
return count != null && count.get() > 0;
}
}
public void activateThrottling() {

View File

@ -1053,7 +1053,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexShar
}
}
} finally {
IOUtils.closeWhileHandlingException(engineSafe()); // we need to close ourself - we failed all bets are off
// close the engine all bets are off... don't use engineSafe() here it can throw an exception
IOUtils.closeWhileHandlingException(currentEngineReference.get());
}
}
}