[ENGINE] Fix deadlock problems when API flush and finish recovery happens concurrently

Unfortunately the lock order is important in the current flush codehe. We have to acquire the readlock fist otherwise
if we are flushing at the end of the recovery while holding the write lock we can deadlock if:
 * Thread 1: flushes via API and gets the flush lock but blocks on the readlock since Thread 2 has the writeLock
 * Thread 2: flushes at the end of the recovery holding the writeLock and blocks on the flushLock owned by Thread 2

This commit acquires the read lock first which would be done further down anyway for the time of the flush.
As a sideeffect we can now safely flush on calling close() while holding the writeLock.
This commit is contained in:
Simon Willnauer 2015-02-11 12:21:21 +01:00
parent 2f0d158692
commit 0b0cd1c46c
2 changed files with 53 additions and 55 deletions

View File

@ -105,7 +105,6 @@ public class InternalEngine extends Engine {
// we use flushNeeded here, since if there are no changes, then the commit won't write
// will not really happen, and then the commitUserData and the new translog will not be reflected
private volatile boolean flushNeeded = false;
private final AtomicInteger flushing = new AtomicInteger();
private final Lock flushLock = new ReentrantLock();
protected final FlushingRecoveryCounter onGoingRecoveries;
@ -674,24 +673,35 @@ public class InternalEngine extends Engine {
private void flush(boolean commitTranslog, boolean force, boolean waitIfOngoing) throws EngineException {
ensureOpen();
updateIndexWriterSettings();
if (commitTranslog) {
// check outside the lock as well so we can check without blocking on the write lock
if (onGoingRecoveries.get() > 0) {
throw new FlushNotAllowedEngineException(shardId, "recovery is in progress, flush with committing translog is not allowed");
}
}
int currentFlushing = flushing.incrementAndGet();
if (currentFlushing > 1 && waitIfOngoing == false) {
flushing.decrementAndGet();
throw new FlushNotAllowedEngineException(shardId, "already flushing...");
}
flushLock.lock();
try {
if (commitTranslog) {
try (ReleasableLock _ = readLock.acquire()) {
ensureOpen();
/*
* Unfortunately the lock order is important here. We have to acquire the readlock fist otherwise
* if we are flushing at the end of the recovery while holding the write lock we can deadlock if:
* Thread 1: flushes via API and gets the flush lock but blocks on the readlock since Thread 2 has the writeLock
* Thread 2: flushes at the end of the recovery holding the writeLock and blocks on the flushLock owned by Thread 1
*/
try (ReleasableLock _ = readLock.acquire()) {
ensureOpen();
updateIndexWriterSettings();
if (flushLock.tryLock() == false) {
// if we can't get the lock right away we block if needed otherwise barf
if (waitIfOngoing) {
logger.trace("waiting fore in-flight flush to finish");
flushLock.lock();
logger.trace("acquired flush lock after blocking");
} else {
throw new FlushNotAllowedEngineException(shardId, "already flushing...");
}
} else {
logger.trace("acquired flush lock immediately");
}
try {
if (commitTranslog) {
if (onGoingRecoveries.get() > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
}
@ -719,20 +729,10 @@ public class InternalEngine extends Engine {
throw new FlushFailedEngineException(shardId, e);
}
}
}
// We don't have to do this here; we do it defensively to make sure that even if wall clock time is misbehaving
// (e.g., moves backwards) we will at least still sometimes prune deleted tombstones:
if (engineConfig.isEnableGcDeletes()) {
pruneDeletedTombstones();
}
} else {
// note, its ok to just commit without cleaning the translog, its perfectly fine to replay a
// translog on an index that was opened on a committed point in time that is "in the future"
// of that translog
try (ReleasableLock _ = readLock.acquire()) {
ensureOpen();
} else {
// note, its ok to just commit without cleaning the translog, its perfectly fine to replay a
// translog on an index that was opened on a committed point in time that is "in the future"
// of that translog
// we allow to *just* commit if there is an ongoing recovery happening...
// its ok to use this, only a flush will cause a new translogId, and we are locked here from
// other flushes use flushLock
@ -743,36 +743,30 @@ public class InternalEngine extends Engine {
} catch (Throwable e) {
throw new FlushFailedEngineException(shardId, e);
}
}
// We don't have to do this here; we do it defensively to make sure that even if wall clock time is misbehaving
// (e.g., moves backwards) we will at least still sometimes prune deleted tombstones:
if (engineConfig.isEnableGcDeletes()) {
pruneDeletedTombstones();
}
}
// reread the last committed segment infos
store.incRef();
try (ReleasableLock _ = readLock.acquire()) {
ensureOpen();
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
} catch (Throwable e) {
if (isClosed.get() == false) {
logger.warn("failed to read latest segment infos on flush", e);
if (Lucene.isCorruptionException(e)) {
throw new FlushFailedEngineException(shardId, e);
// reread the last committed segment infos
try {
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
} catch (Throwable e) {
if (isClosed.get() == false) {
logger.warn("failed to read latest segment infos on flush", e);
if (Lucene.isCorruptionException(e)) {
throw new FlushFailedEngineException(shardId, e);
}
}
}
} catch (FlushFailedEngineException ex) {
maybeFailEngine("flush", ex);
throw ex;
} finally {
store.decRef();
flushLock.unlock();
}
} catch (FlushFailedEngineException ex) {
maybeFailEngine("flush", ex);
throw ex;
} finally {
flushLock.unlock();
flushing.decrementAndGet();
}
// We don't have to do this here; we do it defensively to make sure that even if wall clock time is misbehaving
// (e.g., moves backwards) we will at least still sometimes prune deleted tombstones:
if (engineConfig.isEnableGcDeletes()) {
pruneDeletedTombstones();
}
}

View File

@ -1044,10 +1044,14 @@ public class IndexShard extends AbstractIndexShardComponent {
}
private void checkIndex() throws IndexShardException {
try {
doCheckIndex();
} catch (IOException e) {
throw new IndexShardException(shardId, "exception during checkindex", e);
if (store.tryIncRef()) {
try {
doCheckIndex();
} catch (IOException e) {
throw new IndexShardException(shardId, "exception during checkindex", e);
} finally {
store.decRef();
}
}
}