[ENGINE] Move more methods into abstract Engine

This commit is contained in:
Simon Willnauer 2015-02-17 10:58:01 +01:00
parent 1b8d8da648
commit 2e3c6a9118
3 changed files with 60 additions and 60 deletions

View File

@ -59,6 +59,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
*
@ -72,6 +73,10 @@ public abstract class Engine implements Closeable {
protected final AtomicBoolean isClosed = new AtomicBoolean(false);
protected final FailedEngineListener failedEngineListener;
protected final SnapshotDeletionPolicy deletionPolicy;
protected final ReentrantLock failEngineLock = new ReentrantLock();
protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
protected final ReleasableLock readLock = new ReleasableLock(rwl.readLock());
protected final ReleasableLock writeLock = new ReleasableLock(rwl.writeLock());
protected volatile Throwable failedEngine = null;
@ -416,7 +421,45 @@ public abstract class Engine implements Closeable {
public abstract void recover(RecoveryHandler recoveryHandler) throws EngineException;
/** fail engine due to some error. the engine will also be closed. */
public abstract void failEngine(String reason, Throwable failure);
public void failEngine(String reason, Throwable failure) {
assert failure != null;
if (failEngineLock.tryLock()) {
store.incRef();
try {
try {
// we just go and close this engine - no way to recover
closeNoLock("engine failed on: [" + reason + "]");
// we first mark the store as corrupted before we notify any listeners
// this must happen first otherwise we might try to reallocate so quickly
// on the same node that we don't see the corrupted marker file when
// the shard is initializing
if (Lucene.isCorruptionException(failure)) {
try {
store.markStoreCorrupted(ExceptionsHelper.unwrapCorruption(failure));
} catch (IOException e) {
logger.warn("Couldn't marks store corrupted", e);
}
}
} finally {
if (failedEngine != null) {
logger.debug("tried to fail engine but engine is already failed. ignoring. [{}]", reason, failure);
return;
}
logger.warn("failed engine [{}]", failure, reason);
// we must set a failure exception, generate one if not supplied
failedEngine = failure;
failedEngineListener.onFailedEngine(shardId, reason, failure);
}
} catch (Throwable t) {
// don't bubble up these exceptions up
logger.warn("failEngine threw exception", t);
} finally {
store.decRef();
}
} else {
logger.debug("tried to fail engine but could not acquire lock - engine should be failed by now [{}]", reason, failure);
}
}
/** Check whether the engine should be failed */
protected boolean maybeFailEngine(String source, Throwable t) {
@ -963,4 +1006,17 @@ public abstract class Engine implements Closeable {
}
protected abstract SearcherManager getSearcherManager();
protected abstract void closeNoLock(String reason) throws ElasticsearchException;
@Override
public void close() throws IOException {
if (isClosed.get() == false) { // don't acquire the write lock if we are already closed
logger.debug("close now acquiring writeLock");
try (ReleasableLock _ = writeLock.acquire()) {
logger.debug("close acquired writeLock");
closeNoLock("api");
}
}
}
}

View File

@ -80,10 +80,6 @@ public class InternalEngine extends Engine {
private final MergePolicyProvider mergePolicyProvider;
private final MergeSchedulerProvider mergeScheduler;
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final ReleasableLock readLock = new ReleasableLock(rwl.readLock());
private final ReleasableLock writeLock = new ReleasableLock(rwl.writeLock());
private final IndexWriter indexWriter;
private final SearcherFactory searcherFactory;
@ -101,7 +97,6 @@ public class InternalEngine extends Engine {
private final LiveVersionMap versionMap;
private final Object[] dirtyLocks;
private final ReentrantLock failEngineLock = new ReentrantLock();
private final AtomicLong translogIdGenerator = new AtomicLong();
private final AtomicBoolean versionMapRefreshPending = new AtomicBoolean();
@ -892,23 +887,12 @@ public class InternalEngine extends Engine {
}
}
@Override
public void close() throws ElasticsearchException {
if (isClosed.get() == false) { // don't acquire the write lock if we are already closed
logger.trace("close now acquire writeLock");
try (ReleasableLock _ = writeLock.acquire()) {
logger.trace("close now acquired writeLock");
closeNoLock("api");
}
}
}
/**
* Closes the engine without acquiring the write lock. This should only be
* called while the write lock is hold or in a disaster condition ie. if the engine
* is failed.
*/
private void closeNoLock(String reason) throws ElasticsearchException {
protected final void closeNoLock(String reason) throws ElasticsearchException {
if (isClosed.compareAndSet(false, true)) {
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself";
try {
@ -938,47 +922,6 @@ public class InternalEngine extends Engine {
}
}
@Override
public void failEngine(String reason, Throwable failure) {
assert failure != null;
if (failEngineLock.tryLock()) {
store.incRef();
try {
try {
// we just go and close this engine - no way to recover
closeNoLock("engine failed on: [" + reason + "]");
// we first mark the store as corrupted before we notify any listeners
// this must happen first otherwise we might try to reallocate so quickly
// on the same node that we don't see the corrupted marker file when
// the shard is initializing
if (Lucene.isCorruptionException(failure)) {
try {
store.markStoreCorrupted(ExceptionsHelper.unwrapCorruption(failure));
} catch (IOException e) {
logger.warn("Couldn't marks store corrupted", e);
}
}
} finally {
if (failedEngine != null) {
logger.debug("tried to fail engine but engine is already failed. ignoring. [{}]", reason, failure);
return;
}
logger.warn("failed engine [{}]", failure, reason);
// we must set a failure exception, generate one if not supplied
failedEngine = failure;
failedEngineListener.onFailedEngine(shardId, reason, failure);
}
} catch (Throwable t) {
// don't bubble up these exceptions up
logger.warn("failEngine threw exception", t);
} finally {
store.decRef();
}
} else {
logger.debug("tried to fail engine but could not acquire lock - engine should be failed by now [{}]", reason, failure);
}
}
@Override
protected SearcherManager getSearcherManager() {
return searcherManager;

View File

@ -33,6 +33,7 @@ import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.Map;
import java.util.Random;
@ -76,7 +77,7 @@ public class MockInternalEngine extends InternalEngine {
}
@Override
public void close() {
public void close() throws IOException {
try {
super.close();
} finally {