Ensure close is called under lock in the case of an engine failure
Until today we did close the engine without aqcuireing the write lock since most calls were still holding a read lock. This commit removes the code that holds on to the readlock when failing the engine which means we can simply call #close()
This commit is contained in:
parent
099b9c6b06
commit
be14968c44
|
@ -85,7 +85,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
|
@ -118,7 +117,9 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
private final CodecService codecService;
|
||||
|
||||
|
||||
private final ReadWriteLock rwl = new ReentrantReadWriteLock();
|
||||
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
|
||||
private final InternalLock readLock = new InternalLock(rwl.readLock());
|
||||
private final InternalLock writeLock = new InternalLock(rwl.writeLock());
|
||||
|
||||
private volatile IndexWriter indexWriter;
|
||||
|
||||
|
@ -154,7 +155,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
|
||||
private volatile boolean failOnMergeFailure;
|
||||
private Throwable failedEngine = null;
|
||||
private final Object failedEngineMutex = new Object();
|
||||
private final Lock failEngineLock = new ReentrantLock();
|
||||
private final CopyOnWriteArrayList<FailedEngineListener> failedEngineListeners = new CopyOnWriteArrayList<>();
|
||||
|
||||
private final AtomicLong translogIdGenerator = new AtomicLong();
|
||||
|
@ -208,15 +209,12 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
@Override
|
||||
public void updateIndexingBufferSize(ByteSizeValue indexingBufferSize) {
|
||||
ByteSizeValue preValue = this.indexingBufferSize;
|
||||
rwl.readLock().lock();
|
||||
try {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
this.indexingBufferSize = indexingBufferSize;
|
||||
IndexWriter indexWriter = this.indexWriter;
|
||||
if (indexWriter != null) {
|
||||
indexWriter.getConfig().setRAMBufferSizeMB(this.indexingBufferSize.mbFrac());
|
||||
}
|
||||
} finally {
|
||||
rwl.readLock().unlock();
|
||||
}
|
||||
if (preValue.bytes() != indexingBufferSize.bytes()) {
|
||||
// its inactive, make sure we do a full flush in this case, since the memory
|
||||
|
@ -245,8 +243,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
|
||||
@Override
|
||||
public void start() throws EngineException {
|
||||
rwl.writeLock().lock();
|
||||
try {
|
||||
try (InternalLock _ = writeLock.acquire()) {
|
||||
if (indexWriter != null) {
|
||||
throw new EngineAlreadyStartedException(shardId);
|
||||
}
|
||||
|
@ -292,8 +289,6 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
}
|
||||
throw new EngineCreationFailureException(shardId, "failed to open reader on writer", e);
|
||||
}
|
||||
} finally {
|
||||
rwl.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -314,8 +309,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
}
|
||||
|
||||
public GetResult get(Get get) throws EngineException {
|
||||
rwl.readLock().lock();
|
||||
try {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
if (get.realtime()) {
|
||||
VersionValue versionValue = versionMap.get(versionKey(get.uid()));
|
||||
if (versionValue != null) {
|
||||
|
@ -369,16 +363,12 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
Releasables.close(searcher);
|
||||
return GetResult.NOT_EXISTS;
|
||||
}
|
||||
|
||||
} finally {
|
||||
rwl.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void create(Create create) throws EngineException {
|
||||
rwl.readLock().lock();
|
||||
try {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
IndexWriter writer = this.indexWriter;
|
||||
if (writer == null) {
|
||||
throw new EngineClosedException(shardId, failedEngine);
|
||||
|
@ -387,18 +377,15 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
dirty = true;
|
||||
possibleMergeNeeded = true;
|
||||
flushNeeded = true;
|
||||
} catch (IOException e) {
|
||||
throw new CreateFailedEngineException(shardId, create, e);
|
||||
} catch (OutOfMemoryError e) {
|
||||
failEngine(e);
|
||||
throw new CreateFailedEngineException(shardId, create, e);
|
||||
} catch (IllegalStateException e) {
|
||||
if (e.getMessage().contains("OutOfMemoryError")) {
|
||||
failEngine(e);
|
||||
} catch (OutOfMemoryError | IllegalStateException | IOException t) {
|
||||
maybeFailEngine(t);
|
||||
throw new CreateFailedEngineException(shardId, create, t);
|
||||
}
|
||||
throw new CreateFailedEngineException(shardId, create, e);
|
||||
} finally {
|
||||
rwl.readLock().unlock();
|
||||
}
|
||||
|
||||
private void maybeFailEngine(Throwable t) {
|
||||
if (t instanceof OutOfMemoryError || (t instanceof IllegalStateException && t.getMessage().contains("OutOfMemoryError"))) {
|
||||
failEngine(t);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -464,8 +451,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
|
||||
@Override
|
||||
public void index(Index index) throws EngineException {
|
||||
rwl.readLock().lock();
|
||||
try {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
IndexWriter writer = this.indexWriter;
|
||||
if (writer == null) {
|
||||
throw new EngineClosedException(shardId, failedEngine);
|
||||
|
@ -475,18 +461,9 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
dirty = true;
|
||||
possibleMergeNeeded = true;
|
||||
flushNeeded = true;
|
||||
} catch (IOException e) {
|
||||
throw new IndexFailedEngineException(shardId, index, e);
|
||||
} catch (OutOfMemoryError e) {
|
||||
failEngine(e);
|
||||
throw new IndexFailedEngineException(shardId, index, e);
|
||||
} catch (IllegalStateException e) {
|
||||
if (e.getMessage().contains("OutOfMemoryError")) {
|
||||
failEngine(e);
|
||||
}
|
||||
throw new IndexFailedEngineException(shardId, index, e);
|
||||
} finally {
|
||||
rwl.readLock().unlock();
|
||||
} catch (OutOfMemoryError | IllegalStateException | IOException t) {
|
||||
maybeFailEngine(t);
|
||||
throw new IndexFailedEngineException(shardId, index, t);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -546,8 +523,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
|
||||
@Override
|
||||
public void delete(Delete delete) throws EngineException {
|
||||
rwl.readLock().lock();
|
||||
try {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
IndexWriter writer = this.indexWriter;
|
||||
if (writer == null) {
|
||||
throw new EngineClosedException(shardId, failedEngine);
|
||||
|
@ -556,18 +532,9 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
dirty = true;
|
||||
possibleMergeNeeded = true;
|
||||
flushNeeded = true;
|
||||
} catch (IOException e) {
|
||||
throw new DeleteFailedEngineException(shardId, delete, e);
|
||||
} catch (OutOfMemoryError e) {
|
||||
failEngine(e);
|
||||
throw new DeleteFailedEngineException(shardId, delete, e);
|
||||
} catch (IllegalStateException e) {
|
||||
if (e.getMessage().contains("OutOfMemoryError")) {
|
||||
failEngine(e);
|
||||
}
|
||||
throw new DeleteFailedEngineException(shardId, delete, e);
|
||||
} finally {
|
||||
rwl.readLock().unlock();
|
||||
} catch (OutOfMemoryError | IllegalStateException | IOException t) {
|
||||
maybeFailEngine(t);
|
||||
throw new DeleteFailedEngineException(shardId, delete, t);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -620,8 +587,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
|
||||
@Override
|
||||
public void delete(DeleteByQuery delete) throws EngineException {
|
||||
rwl.readLock().lock();
|
||||
try {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
IndexWriter writer = this.indexWriter;
|
||||
if (writer == null) {
|
||||
throw new EngineClosedException(shardId);
|
||||
|
@ -643,10 +609,9 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
dirty = true;
|
||||
possibleMergeNeeded = true;
|
||||
flushNeeded = true;
|
||||
} catch (IOException e) {
|
||||
throw new DeleteByQueryFailedEngineException(shardId, delete, e);
|
||||
} finally {
|
||||
rwl.readLock().unlock();
|
||||
} catch (Throwable t) {
|
||||
maybeFailEngine(t);
|
||||
throw new DeleteByQueryFailedEngineException(shardId, delete, t);
|
||||
}
|
||||
//TODO: This is heavy, since we refresh, but we really have to...
|
||||
refreshVersioningTable(System.currentTimeMillis());
|
||||
|
@ -703,14 +668,8 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
}
|
||||
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
|
||||
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
|
||||
rwl.readLock().lock();
|
||||
try {
|
||||
// this engine always acts as if waitForOperations=true
|
||||
IndexWriter currentWriter = indexWriter;
|
||||
if (currentWriter == null) {
|
||||
throw new EngineClosedException(shardId, failedEngine);
|
||||
}
|
||||
try {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
// maybeRefresh will only allow one refresh to execute, and the rest will "pass through",
|
||||
// but, we want to make sure not to loose ant refresh calls, if one is taking time
|
||||
synchronized (refreshMutex) {
|
||||
|
@ -725,26 +684,11 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
}
|
||||
} catch (AlreadyClosedException e) {
|
||||
// an index writer got replaced on us, ignore
|
||||
} catch (OutOfMemoryError e) {
|
||||
failEngine(e);
|
||||
throw new RefreshFailedEngineException(shardId, e);
|
||||
} catch (IllegalStateException e) {
|
||||
if (e.getMessage().contains("OutOfMemoryError")) {
|
||||
failEngine(e);
|
||||
}
|
||||
throw new RefreshFailedEngineException(shardId, e);
|
||||
} catch (Throwable e) {
|
||||
if (indexWriter == null) {
|
||||
throw new EngineClosedException(shardId, failedEngine);
|
||||
} else if (currentWriter != indexWriter) {
|
||||
// an index writer got replaced on us, ignore
|
||||
} else {
|
||||
failEngine(e);
|
||||
throw new RefreshFailedEngineException(shardId, e);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
rwl.readLock().unlock();
|
||||
} catch (EngineClosedException e) {
|
||||
throw e;
|
||||
} catch (Throwable t) {
|
||||
failEngine(t);
|
||||
throw new RefreshFailedEngineException(shardId, t);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -766,9 +710,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
flushLock.lock();
|
||||
try {
|
||||
if (flush.type() == Flush.Type.NEW_WRITER) {
|
||||
rwl.writeLock().lock();
|
||||
try {
|
||||
ensureOpen();
|
||||
try (InternalLock _ = writeLock.acquire()) {
|
||||
if (onGoingRecoveries.get() > 0) {
|
||||
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
|
||||
}
|
||||
|
@ -778,7 +720,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
// that's ok if the index writer failed and is in inconsistent state
|
||||
// we will get an exception on a dirty operation, and will cause the shard
|
||||
// to be allocated to a different node
|
||||
indexWriter.close(false);
|
||||
currentIndexWriter().close(false);
|
||||
indexWriter = createWriter();
|
||||
|
||||
// commit on a just opened writer will commit even if there are no changes done to it
|
||||
|
@ -799,24 +741,13 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
logger.warn("Failed to close current SearcherManager", t);
|
||||
}
|
||||
refreshVersioningTable(threadPool.estimatedTimeInMillis());
|
||||
} catch (OutOfMemoryError e) {
|
||||
failEngine(e);
|
||||
throw new FlushFailedEngineException(shardId, e);
|
||||
} catch (IllegalStateException e) {
|
||||
if (e.getMessage().contains("OutOfMemoryError")) {
|
||||
failEngine(e);
|
||||
} catch (Throwable t) {
|
||||
throw new FlushFailedEngineException(shardId, t);
|
||||
}
|
||||
throw new FlushFailedEngineException(shardId, e);
|
||||
} catch (Throwable e) {
|
||||
throw new FlushFailedEngineException(shardId, e);
|
||||
}
|
||||
} finally {
|
||||
rwl.writeLock().unlock();
|
||||
}
|
||||
} else if (flush.type() == Flush.Type.COMMIT_TRANSLOG) {
|
||||
rwl.readLock().lock();
|
||||
try {
|
||||
ensureOpen();
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
final IndexWriter indexWriter = currentIndexWriter();
|
||||
if (onGoingRecoveries.get() > 0) {
|
||||
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
|
||||
}
|
||||
|
@ -833,30 +764,18 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
// so items added to current will still be around for realtime get
|
||||
// when tans overrides it
|
||||
translog.makeTransientCurrent();
|
||||
} catch (OutOfMemoryError e) {
|
||||
translog.revertTransient();
|
||||
failEngine(e);
|
||||
throw new FlushFailedEngineException(shardId, e);
|
||||
} catch (IllegalStateException e) {
|
||||
if (e.getMessage().contains("OutOfMemoryError")) {
|
||||
failEngine(e);
|
||||
}
|
||||
throw new FlushFailedEngineException(shardId, e);
|
||||
} catch (Throwable e) {
|
||||
translog.revertTransient();
|
||||
throw new FlushFailedEngineException(shardId, e);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
rwl.readLock().unlock();
|
||||
}
|
||||
} else if (flush.type() == Flush.Type.COMMIT) {
|
||||
// note, its ok to just commit without cleaning the translog, its perfectly fine to replay a
|
||||
// translog on an index that was opened on a committed point in time that is "in the future"
|
||||
// of that translog
|
||||
rwl.readLock().lock();
|
||||
try {
|
||||
ensureOpen();
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
final IndexWriter indexWriter = currentIndexWriter();
|
||||
// we allow to *just* commit if there is an ongoing recovery happening...
|
||||
// its ok to use this, only a flush will cause a new translogId, and we are locked here from
|
||||
// other flushes use flushLock
|
||||
|
@ -864,50 +783,50 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
long translogId = translog.currentId();
|
||||
indexWriter.setCommitData(MapBuilder.<String, String>newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)).map());
|
||||
indexWriter.commit();
|
||||
} catch (OutOfMemoryError e) {
|
||||
translog.revertTransient();
|
||||
failEngine(e);
|
||||
throw new FlushFailedEngineException(shardId, e);
|
||||
} catch (IllegalStateException e) {
|
||||
if (e.getMessage().contains("OutOfMemoryError")) {
|
||||
failEngine(e);
|
||||
}
|
||||
throw new FlushFailedEngineException(shardId, e);
|
||||
} catch (Throwable e) {
|
||||
throw new FlushFailedEngineException(shardId, e);
|
||||
}
|
||||
} finally {
|
||||
rwl.readLock().unlock();
|
||||
}
|
||||
} else {
|
||||
throw new ElasticsearchIllegalStateException("flush type [" + flush.type() + "] not supported");
|
||||
}
|
||||
|
||||
// reread the last committed segment infos
|
||||
rwl.readLock().lock();
|
||||
try {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
readLastCommittedSegmentsInfo();
|
||||
} catch (Throwable e) {
|
||||
if (!closed) {
|
||||
logger.warn("failed to read latest segment infos on flush", e);
|
||||
}
|
||||
} finally {
|
||||
rwl.readLock().unlock();
|
||||
}
|
||||
|
||||
} catch (FlushFailedEngineException ex){
|
||||
maybeFailEngine(ex.getCause());
|
||||
throw ex;
|
||||
} finally {
|
||||
flushLock.unlock();
|
||||
flushing.decrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
private void ensureOpen() {
|
||||
if (indexWriter == null) {
|
||||
throw new EngineClosedException(shardId, failedEngine);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current index writer. This method will never return <code>null</code>
|
||||
* @throws EngineClosedException if the engine is already closed
|
||||
*/
|
||||
private IndexWriter currentIndexWriter() {
|
||||
final IndexWriter writer = indexWriter;
|
||||
if (writer == null) {
|
||||
throw new EngineClosedException(shardId, failedEngine);
|
||||
}
|
||||
return writer;
|
||||
}
|
||||
|
||||
private void refreshVersioningTable(long time) {
|
||||
// we need to refresh in order to clear older version values
|
||||
refresh(new Refresh("version_table").force(true));
|
||||
|
@ -938,22 +857,11 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
return;
|
||||
}
|
||||
possibleMergeNeeded = false;
|
||||
rwl.readLock().lock();
|
||||
try {
|
||||
ensureOpen();
|
||||
Merges.maybeMerge(indexWriter);
|
||||
} catch (OutOfMemoryError e) {
|
||||
failEngine(e);
|
||||
throw new OptimizeFailedEngineException(shardId, e);
|
||||
} catch (IllegalStateException e) {
|
||||
if (e.getMessage().contains("OutOfMemoryError")) {
|
||||
failEngine(e);
|
||||
}
|
||||
throw new OptimizeFailedEngineException(shardId, e);
|
||||
} catch (Throwable e) {
|
||||
throw new OptimizeFailedEngineException(shardId, e);
|
||||
} finally {
|
||||
rwl.readLock().unlock();
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
Merges.maybeMerge(currentIndexWriter());
|
||||
} catch (Throwable t) {
|
||||
maybeFailEngine(t);
|
||||
throw new OptimizeFailedEngineException(shardId, t);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -964,16 +872,15 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
}
|
||||
if (optimizeMutex.compareAndSet(false, true)) {
|
||||
ElasticsearchMergePolicy elasticsearchMergePolicy = null;
|
||||
rwl.readLock().lock();
|
||||
try {
|
||||
ensureOpen();
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
final IndexWriter writer = currentIndexWriter();
|
||||
|
||||
if (indexWriter.getConfig().getMergePolicy() instanceof ElasticsearchMergePolicy) {
|
||||
elasticsearchMergePolicy = (ElasticsearchMergePolicy) indexWriter.getConfig().getMergePolicy();
|
||||
if (writer.getConfig().getMergePolicy() instanceof ElasticsearchMergePolicy) {
|
||||
elasticsearchMergePolicy = (ElasticsearchMergePolicy) writer.getConfig().getMergePolicy();
|
||||
}
|
||||
if (optimize.force() && elasticsearchMergePolicy == null) {
|
||||
throw new ElasticsearchIllegalStateException("The `force` flag can only be used if the merge policy is an instance of "
|
||||
+ ElasticsearchMergePolicy.class.getSimpleName() + ", got [" + indexWriter.getConfig().getMergePolicy().getClass().getName() + "]");
|
||||
+ ElasticsearchMergePolicy.class.getSimpleName() + ", got [" + writer.getConfig().getMergePolicy().getClass().getName() + "]");
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -986,34 +893,27 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
elasticsearchMergePolicy.setForce(true);
|
||||
}
|
||||
if (optimize.onlyExpungeDeletes()) {
|
||||
Merges.forceMergeDeletes(indexWriter, false);
|
||||
Merges.forceMergeDeletes(writer, false);
|
||||
} else if (optimize.maxNumSegments() <= 0) {
|
||||
Merges.maybeMerge(indexWriter);
|
||||
Merges.maybeMerge(writer);
|
||||
possibleMergeNeeded = false;
|
||||
} else {
|
||||
Merges.forceMerge(indexWriter, optimize.maxNumSegments(), false);
|
||||
Merges.forceMerge(writer, optimize.maxNumSegments(), false);
|
||||
}
|
||||
} catch (OutOfMemoryError e) {
|
||||
failEngine(e);
|
||||
throw new OptimizeFailedEngineException(shardId, e);
|
||||
} catch (IllegalStateException e) {
|
||||
if (e.getMessage().contains("OutOfMemoryError")) {
|
||||
failEngine(e);
|
||||
}
|
||||
throw new OptimizeFailedEngineException(shardId, e);
|
||||
} catch (Throwable e) {
|
||||
throw new OptimizeFailedEngineException(shardId, e);
|
||||
} catch (Throwable t) {
|
||||
maybeFailEngine(t);
|
||||
throw new OptimizeFailedEngineException(shardId, t);
|
||||
} finally {
|
||||
if (elasticsearchMergePolicy != null) {
|
||||
elasticsearchMergePolicy.setForce(false);
|
||||
}
|
||||
rwl.readLock().unlock();
|
||||
optimizeMutex.set(false);
|
||||
}
|
||||
|
||||
}
|
||||
// wait for the merges outside of the read lock
|
||||
if (optimize.waitForMerge()) {
|
||||
indexWriter.waitForMerges();
|
||||
currentIndexWriter().waitForMerges();
|
||||
}
|
||||
if (optimize.flush()) {
|
||||
flush(new Flush().force(true).waitIfOngoing(true));
|
||||
|
@ -1023,15 +923,12 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
|
||||
@Override
|
||||
public SnapshotIndexCommit snapshotIndex() throws EngineException {
|
||||
rwl.readLock().lock();
|
||||
try {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
flush(new Flush().type(Flush.Type.COMMIT).waitIfOngoing(true));
|
||||
ensureOpen();
|
||||
return deletionPolicy.snapshot();
|
||||
} catch (IOException e) {
|
||||
throw new SnapshotFailedEngineException(shardId, e);
|
||||
} finally {
|
||||
rwl.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1039,14 +936,11 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
public void recover(RecoveryHandler recoveryHandler) throws EngineException {
|
||||
// take a write lock here so it won't happen while a flush is in progress
|
||||
// this means that next commits will not be allowed once the lock is released
|
||||
rwl.writeLock().lock();
|
||||
try {
|
||||
try (InternalLock _ = writeLock.acquire()) {
|
||||
if (closed) {
|
||||
throw new EngineClosedException(shardId);
|
||||
}
|
||||
onGoingRecoveries.startRecovery();
|
||||
} finally {
|
||||
rwl.writeLock().unlock();
|
||||
}
|
||||
|
||||
SnapshotIndexCommit phase1Snapshot;
|
||||
|
@ -1061,10 +955,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
recoveryHandler.phase1(phase1Snapshot);
|
||||
} catch (Throwable e) {
|
||||
Releasables.closeWhileHandlingException(onGoingRecoveries, phase1Snapshot);
|
||||
if (closed) {
|
||||
e = new EngineClosedException(shardId, e);
|
||||
}
|
||||
throw new RecoveryEngineException(shardId, 1, "Execution failed", e);
|
||||
throw new RecoveryEngineException(shardId, 1, "Execution failed", wrapIfClosed(e));
|
||||
}
|
||||
|
||||
Translog.Snapshot phase2Snapshot;
|
||||
|
@ -1072,23 +963,17 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
phase2Snapshot = translog.snapshot();
|
||||
} catch (Throwable e) {
|
||||
Releasables.closeWhileHandlingException(onGoingRecoveries, phase1Snapshot);
|
||||
if (closed) {
|
||||
e = new EngineClosedException(shardId, e);
|
||||
}
|
||||
throw new RecoveryEngineException(shardId, 2, "Snapshot failed", e);
|
||||
throw new RecoveryEngineException(shardId, 2, "Snapshot failed", wrapIfClosed(e));
|
||||
}
|
||||
|
||||
try {
|
||||
recoveryHandler.phase2(phase2Snapshot);
|
||||
} catch (Throwable e) {
|
||||
Releasables.closeWhileHandlingException(onGoingRecoveries, phase1Snapshot, phase2Snapshot);
|
||||
if (closed) {
|
||||
e = new EngineClosedException(shardId, e);
|
||||
}
|
||||
throw new RecoveryEngineException(shardId, 2, "Execution failed", e);
|
||||
throw new RecoveryEngineException(shardId, 2, "Execution failed", wrapIfClosed(e));
|
||||
}
|
||||
|
||||
rwl.writeLock().lock();
|
||||
writeLock.acquire();
|
||||
Translog.Snapshot phase3Snapshot = null;
|
||||
boolean success = false;
|
||||
try {
|
||||
|
@ -1096,14 +981,20 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
recoveryHandler.phase3(phase3Snapshot);
|
||||
success = true;
|
||||
} catch (Throwable e) {
|
||||
throw new RecoveryEngineException(shardId, 3, "Execution failed", e);
|
||||
throw new RecoveryEngineException(shardId, 3, "Execution failed", wrapIfClosed(e));
|
||||
} finally {
|
||||
Releasables.close(success, onGoingRecoveries);
|
||||
rwl.writeLock().unlock();
|
||||
Releasables.close(success, phase1Snapshot, phase2Snapshot, phase3Snapshot);
|
||||
Releasables.close(success, onGoingRecoveries, writeLock, phase1Snapshot,
|
||||
phase2Snapshot, phase3Snapshot); // hmm why can't we use try-with here?
|
||||
}
|
||||
}
|
||||
|
||||
private Throwable wrapIfClosed(Throwable t) {
|
||||
if (closed) {
|
||||
return new EngineClosedException(shardId, t);
|
||||
}
|
||||
return t;
|
||||
}
|
||||
|
||||
private static long getReaderRamBytesUsed(AtomicReaderContext reader) {
|
||||
final SegmentReader segmentReader = SegmentReaderUtils.segmentReader(reader.reader());
|
||||
return segmentReader.ramBytesUsed();
|
||||
|
@ -1111,8 +1002,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
|
||||
@Override
|
||||
public SegmentsStats segmentsStats() {
|
||||
rwl.readLock().lock();
|
||||
try {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
Searcher searcher = acquireSearcher("segments_stats");
|
||||
try {
|
||||
|
@ -1124,15 +1014,12 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
} finally {
|
||||
searcher.close();
|
||||
}
|
||||
} finally {
|
||||
rwl.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Segment> segments() {
|
||||
rwl.readLock().lock();
|
||||
try {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
Map<String, Segment> segments = new HashMap<>();
|
||||
|
||||
|
@ -1208,44 +1095,12 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
}
|
||||
|
||||
return Arrays.asList(segmentsArr);
|
||||
} finally {
|
||||
rwl.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws ElasticsearchException {
|
||||
rwl.writeLock().lock();
|
||||
try {
|
||||
innerClose();
|
||||
} finally {
|
||||
rwl.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
class FailEngineOnMergeFailure implements MergeSchedulerProvider.FailureListener {
|
||||
@Override
|
||||
public void onFailedMerge(MergePolicy.MergeException e) {
|
||||
failEngine(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void failEngine(Throwable failure) {
|
||||
synchronized (failedEngineMutex) {
|
||||
if (failedEngine != null) {
|
||||
return;
|
||||
}
|
||||
logger.warn("failed engine", failure);
|
||||
failedEngine = failure;
|
||||
for (FailedEngineListener listener : failedEngineListeners) {
|
||||
listener.onFailedEngine(shardId, failure);
|
||||
}
|
||||
// TODO - should we acquire the writeLock here?
|
||||
innerClose();
|
||||
}
|
||||
}
|
||||
|
||||
private void innerClose() {
|
||||
try (InternalLock _ = writeLock.acquire()) {
|
||||
if (!closed) {
|
||||
try {
|
||||
closed = true;
|
||||
|
@ -1273,6 +1128,36 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class FailEngineOnMergeFailure implements MergeSchedulerProvider.FailureListener {
|
||||
@Override
|
||||
public void onFailedMerge(MergePolicy.MergeException e) {
|
||||
failEngine(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void failEngine(Throwable failure) {
|
||||
if (failEngineLock.tryLock()) {
|
||||
assert !readLock.assertLockIsHeld() : "readLock is held by a thread that tries to fail the engine";
|
||||
if (failedEngine != null) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
logger.warn("failed engine", failure);
|
||||
failedEngine = failure;
|
||||
for (FailedEngineListener listener : failedEngineListeners) {
|
||||
listener.onFailedEngine(shardId, failure);
|
||||
}
|
||||
} finally {
|
||||
// close the engine whatever happens...
|
||||
close();
|
||||
}
|
||||
|
||||
} else {
|
||||
logger.debug("Tried to fail engine but could not acquire lock - engine should be failed by now", failure);
|
||||
}
|
||||
}
|
||||
|
||||
private HashedBytesRef versionKey(Term uid) {
|
||||
return new HashedBytesRef(uid.bytes());
|
||||
|
@ -1396,8 +1281,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
!codecName.equals(InternalEngine.this.codecName) ||
|
||||
failOnMergeFailure != InternalEngine.this.failOnMergeFailure ||
|
||||
codecBloomLoad != codecService.isLoadBloomFilter()) {
|
||||
rwl.readLock().lock();
|
||||
try {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
if (indexConcurrency != InternalEngine.this.indexConcurrency) {
|
||||
logger.info("updating index.index_concurrency from [{}] to [{}]", InternalEngine.this.indexConcurrency, indexConcurrency);
|
||||
InternalEngine.this.indexConcurrency = indexConcurrency;
|
||||
|
@ -1420,8 +1304,6 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
// we need to flush in this case, to load/unload the bloom filters
|
||||
requiresFlushing = true;
|
||||
}
|
||||
} finally {
|
||||
rwl.readLock().unlock();
|
||||
}
|
||||
if (requiresFlushing) {
|
||||
flush(new Flush().type(Flush.Type.NEW_WRITER));
|
||||
|
@ -1607,4 +1489,43 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
}
|
||||
}
|
||||
|
||||
private static final class InternalLock implements Releasable {
|
||||
private final ThreadLocal<Boolean> lockIsHeld;
|
||||
private final Lock lock;
|
||||
|
||||
InternalLock(Lock lock) {
|
||||
ThreadLocal<Boolean> tl = null;
|
||||
assert (tl = new ThreadLocal<>()) != null;
|
||||
lockIsHeld = tl;
|
||||
this.lock = lock;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
lock.unlock();
|
||||
assert onAssertRelease();
|
||||
}
|
||||
|
||||
InternalLock acquire() throws EngineException {
|
||||
lock.lock();
|
||||
assert onAssertLock();
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
protected boolean onAssertRelease() {
|
||||
lockIsHeld.set(Boolean.FALSE);
|
||||
return true;
|
||||
}
|
||||
|
||||
protected boolean onAssertLock() {
|
||||
lockIsHeld.remove();
|
||||
return true;
|
||||
}
|
||||
|
||||
boolean assertLockIsHeld() {
|
||||
Boolean aBoolean = lockIsHeld.get();
|
||||
return aBoolean != null && aBoolean.booleanValue();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue