diff --git a/src/main/java/org/elasticsearch/index/engine/Engine.java b/src/main/java/org/elasticsearch/index/engine/Engine.java index 987ef8c2402..00c256d0bb4 100644 --- a/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -59,6 +59,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * @@ -72,6 +73,10 @@ public abstract class Engine implements Closeable { protected final AtomicBoolean isClosed = new AtomicBoolean(false); protected final FailedEngineListener failedEngineListener; protected final SnapshotDeletionPolicy deletionPolicy; + protected final ReentrantLock failEngineLock = new ReentrantLock(); + protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); + protected final ReleasableLock readLock = new ReleasableLock(rwl.readLock()); + protected final ReleasableLock writeLock = new ReleasableLock(rwl.writeLock()); protected volatile Throwable failedEngine = null; @@ -416,7 +421,45 @@ public abstract class Engine implements Closeable { public abstract void recover(RecoveryHandler recoveryHandler) throws EngineException; /** fail engine due to some error. the engine will also be closed. */ - public abstract void failEngine(String reason, Throwable failure); + public void failEngine(String reason, Throwable failure) { + assert failure != null; + if (failEngineLock.tryLock()) { + store.incRef(); + try { + try { + // we just go and close this engine - no way to recover + closeNoLock("engine failed on: [" + reason + "]"); + // we first mark the store as corrupted before we notify any listeners + // this must happen first otherwise we might try to reallocate so quickly + // on the same node that we don't see the corrupted marker file when + // the shard is initializing + if (Lucene.isCorruptionException(failure)) { + try { + store.markStoreCorrupted(ExceptionsHelper.unwrapCorruption(failure)); + } catch (IOException e) { + logger.warn("Couldn't marks store corrupted", e); + } + } + } finally { + if (failedEngine != null) { + logger.debug("tried to fail engine but engine is already failed. ignoring. [{}]", reason, failure); + return; + } + logger.warn("failed engine [{}]", failure, reason); + // we must set a failure exception, generate one if not supplied + failedEngine = failure; + failedEngineListener.onFailedEngine(shardId, reason, failure); + } + } catch (Throwable t) { + // don't bubble up these exceptions up + logger.warn("failEngine threw exception", t); + } finally { + store.decRef(); + } + } else { + logger.debug("tried to fail engine but could not acquire lock - engine should be failed by now [{}]", reason, failure); + } + } /** Check whether the engine should be failed */ protected boolean maybeFailEngine(String source, Throwable t) { @@ -963,4 +1006,17 @@ public abstract class Engine implements Closeable { } protected abstract SearcherManager getSearcherManager(); + + protected abstract void closeNoLock(String reason) throws ElasticsearchException; + + @Override + public void close() throws IOException { + if (isClosed.get() == false) { // don't acquire the write lock if we are already closed + logger.debug("close now acquiring writeLock"); + try (ReleasableLock _ = writeLock.acquire()) { + logger.debug("close acquired writeLock"); + closeNoLock("api"); + } + } + } } diff --git a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 3879eae7f29..c8b925fc97b 100644 --- a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -80,10 +80,6 @@ public class InternalEngine extends Engine { private final MergePolicyProvider mergePolicyProvider; private final MergeSchedulerProvider mergeScheduler; - private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); - private final ReleasableLock readLock = new ReleasableLock(rwl.readLock()); - private final ReleasableLock writeLock = new ReleasableLock(rwl.writeLock()); - private final IndexWriter indexWriter; private final SearcherFactory searcherFactory; @@ -101,7 +97,6 @@ public class InternalEngine extends Engine { private final LiveVersionMap versionMap; private final Object[] dirtyLocks; - private final ReentrantLock failEngineLock = new ReentrantLock(); private final AtomicLong translogIdGenerator = new AtomicLong(); private final AtomicBoolean versionMapRefreshPending = new AtomicBoolean(); @@ -892,23 +887,12 @@ public class InternalEngine extends Engine { } } - @Override - public void close() throws ElasticsearchException { - if (isClosed.get() == false) { // don't acquire the write lock if we are already closed - logger.trace("close now acquire writeLock"); - try (ReleasableLock _ = writeLock.acquire()) { - logger.trace("close now acquired writeLock"); - closeNoLock("api"); - } - } - } - /** * Closes the engine without acquiring the write lock. This should only be * called while the write lock is hold or in a disaster condition ie. if the engine * is failed. */ - private void closeNoLock(String reason) throws ElasticsearchException { + protected final void closeNoLock(String reason) throws ElasticsearchException { if (isClosed.compareAndSet(false, true)) { assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself"; try { @@ -938,47 +922,6 @@ public class InternalEngine extends Engine { } } - @Override - public void failEngine(String reason, Throwable failure) { - assert failure != null; - if (failEngineLock.tryLock()) { - store.incRef(); - try { - try { - // we just go and close this engine - no way to recover - closeNoLock("engine failed on: [" + reason + "]"); - // we first mark the store as corrupted before we notify any listeners - // this must happen first otherwise we might try to reallocate so quickly - // on the same node that we don't see the corrupted marker file when - // the shard is initializing - if (Lucene.isCorruptionException(failure)) { - try { - store.markStoreCorrupted(ExceptionsHelper.unwrapCorruption(failure)); - } catch (IOException e) { - logger.warn("Couldn't marks store corrupted", e); - } - } - } finally { - if (failedEngine != null) { - logger.debug("tried to fail engine but engine is already failed. ignoring. [{}]", reason, failure); - return; - } - logger.warn("failed engine [{}]", failure, reason); - // we must set a failure exception, generate one if not supplied - failedEngine = failure; - failedEngineListener.onFailedEngine(shardId, reason, failure); - } - } catch (Throwable t) { - // don't bubble up these exceptions up - logger.warn("failEngine threw exception", t); - } finally { - store.decRef(); - } - } else { - logger.debug("tried to fail engine but could not acquire lock - engine should be failed by now [{}]", reason, failure); - } - } - @Override protected SearcherManager getSearcherManager() { return searcherManager; diff --git a/src/test/java/org/elasticsearch/test/engine/MockInternalEngine.java b/src/test/java/org/elasticsearch/test/engine/MockInternalEngine.java index a710a2e3933..e9e1037dff7 100644 --- a/src/test/java/org/elasticsearch/test/engine/MockInternalEngine.java +++ b/src/test/java/org/elasticsearch/test/engine/MockInternalEngine.java @@ -33,6 +33,7 @@ import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ElasticsearchIntegrationTest; +import java.io.IOException; import java.lang.reflect.Constructor; import java.util.Map; import java.util.Random; @@ -76,7 +77,7 @@ public class MockInternalEngine extends InternalEngine { } @Override - public void close() { + public void close() throws IOException { try { super.close(); } finally {