diff --git a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 5a18494deb3..70da3cdfcfb 100644 --- a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -104,7 +104,6 @@ public class InternalEngine extends Engine { private final SearcherManager searcherManager; private final AtomicBoolean isClosed = new AtomicBoolean(false); - private volatile boolean closedOrFailed = false; private final AtomicBoolean optimizeMutex = new AtomicBoolean(); // we use flushNeeded here, since if there are no changes, then the commit won't write // will not really happen, and then the commitUserData and the new translog will not be reflected @@ -119,7 +118,7 @@ public class InternalEngine extends Engine { private final Object[] dirtyLocks; private volatile Throwable failedEngine = null; - private final Lock failEngineLock = new ReentrantLock(); + private final ReentrantLock failEngineLock = new ReentrantLock(); private final FailedEngineListener failedEngineListener; private final AtomicLong translogIdGenerator = new AtomicLong(); @@ -181,8 +180,10 @@ public class InternalEngine extends Engine { if (success == false) { IOUtils.closeWhileHandlingException(writer, manager); versionMap.clear(); - // failure we need to dec the store reference - store.decRef(); + if (isClosed.get() == false) { + // failure we need to dec the store reference + store.decRef(); + } } } } @@ -227,7 +228,7 @@ public class InternalEngine extends Engine { } if (mustCommitTranslogId) { // translog id is not in the metadata - fix this inconsistency some code relies on this and old indices might not have it. indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId))); - indexWriter.commit(); + commitIndexWriter(indexWriter); } final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter, true), shardId); searcherManager = new SearcherManager(directoryReader, searcherFactory); @@ -238,7 +239,7 @@ public class InternalEngine extends Engine { success = true; return searcherManager; } catch (IOException e) { - maybeFailEngine(e, "start"); + maybeFailEngine("start", e); try { indexWriter.rollback(); } catch (IOException e1) { // iw is closed below @@ -254,7 +255,7 @@ public class InternalEngine extends Engine { } private void updateSettings() { - if (closedOrFailed == false) { + if (isClosed.get() == false) { final LiveIndexWriterConfig iwc = indexWriter.getConfig(); iwc.setUseCompoundFile(engineConfig.isCompoundOnFlush()); } @@ -332,7 +333,7 @@ public class InternalEngine extends Engine { } flushNeeded = true; } catch (OutOfMemoryError | IllegalStateException | IOException t) { - maybeFailEngine(t, "create"); + maybeFailEngine("create", t); throw new CreateFailedEngineException(shardId, create, t); } checkVersionMapRefresh(); @@ -438,7 +439,7 @@ public class InternalEngine extends Engine { } flushNeeded = true; } catch (OutOfMemoryError | IllegalStateException | IOException t) { - maybeFailEngine(t, "index"); + maybeFailEngine("index", t); throw new IndexFailedEngineException(shardId, index, t); } checkVersionMapRefresh(); @@ -451,7 +452,7 @@ public class InternalEngine extends Engine { // TODO: we force refresh when versionMap is using > 25% of IW's RAM buffer; should we make this separately configurable? if (versionMap.ramBytesUsedForRefresh() > 0.25 * engineConfig.getIndexingBufferSize().bytes() && versionMapRefreshPending.getAndSet(true) == false) { try { - if (closedOrFailed) { + if (isClosed.get()) { // no point... return; } @@ -530,7 +531,7 @@ public class InternalEngine extends Engine { innerDelete(delete); flushNeeded = true; } catch (OutOfMemoryError | IllegalStateException | IOException t) { - maybeFailEngine(t, "delete"); + maybeFailEngine("delete", t); throw new DeleteFailedEngineException(shardId, delete, t); } @@ -609,7 +610,7 @@ public class InternalEngine extends Engine { translog.add(new Translog.DeleteByQuery(delete)); flushNeeded = true; } catch (Throwable t) { - maybeFailEngine(t, "delete_by_query"); + maybeFailEngine("delete_by_query", t); throw new DeleteByQueryFailedEngineException(shardId, delete, t); } @@ -684,6 +685,7 @@ public class InternalEngine extends Engine { searcherManager.maybeRefreshBlocking(); } catch (AlreadyClosedException e) { ensureOpen(); + maybeFailEngine("refresh", e); } catch (EngineClosedException e) { throw e; } catch (Throwable t) { @@ -737,7 +739,7 @@ public class InternalEngine extends Engine { long translogId = translogIdGenerator.incrementAndGet(); translog.newTransientTranslog(translogId); indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId))); - indexWriter.commit(); + commitIndexWriter(indexWriter); // we need to refresh in order to clear older version values refresh("version_table_flush"); // we need to move transient to current only after we refresh @@ -774,7 +776,7 @@ public class InternalEngine extends Engine { try { long translogId = translog.currentId(); indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId))); - indexWriter.commit(); + commitIndexWriter(indexWriter); } catch (Throwable e) { throw new FlushFailedEngineException(shardId, e); } @@ -788,19 +790,22 @@ public class InternalEngine extends Engine { } // reread the last committed segment infos + store.incRef(); try (ReleasableLock _ = readLock.acquire()) { ensureOpen(); lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); } catch (Throwable e) { - if (closedOrFailed == false) { + if (isClosed.get() == false) { logger.warn("failed to read latest segment infos on flush", e); if (Lucene.isCorruptionException(e)) { throw new FlushFailedEngineException(shardId, e); } } + } finally { + store.decRef(); } } catch (FlushFailedEngineException ex) { - maybeFailEngine(ex, "flush"); + maybeFailEngine("flush", ex); throw ex; } finally { flushLock.unlock(); @@ -809,7 +814,7 @@ public class InternalEngine extends Engine { } private void ensureOpen() { - if (closedOrFailed) { + if (isClosed.get()) { throw new EngineClosedException(shardId, failedEngine); } } @@ -886,7 +891,7 @@ public class InternalEngine extends Engine { indexWriter.forceMerge(maxNumSegments, false); } } catch (Throwable t) { - maybeFailEngine(t, "optimize"); + maybeFailEngine("optimize", t); throw new OptimizeFailedEngineException(shardId, t); } finally { optimizeMutex.set(false); @@ -939,7 +944,7 @@ public class InternalEngine extends Engine { try { phase1Snapshot = deletionPolicy.snapshot(); } catch (Throwable e) { - maybeFailEngine(e, "recovery"); + maybeFailEngine("recovery", e); Releasables.closeWhileHandlingException(onGoingRecoveries); throw new RecoveryEngineException(shardId, 1, "Snapshot failed", e); } @@ -947,7 +952,7 @@ public class InternalEngine extends Engine { try { recoveryHandler.phase1(phase1Snapshot); } catch (Throwable e) { - maybeFailEngine(e, "recovery phase 1"); + maybeFailEngine("recovery phase 1", e); Releasables.closeWhileHandlingException(onGoingRecoveries, phase1Snapshot); throw new RecoveryEngineException(shardId, 1, "Execution failed", wrapIfClosed(e)); } @@ -956,14 +961,14 @@ public class InternalEngine extends Engine { try { phase2Snapshot = translog.snapshot(); } catch (Throwable e) { - maybeFailEngine(e, "snapshot recovery"); + maybeFailEngine("snapshot recovery", e); Releasables.closeWhileHandlingException(onGoingRecoveries, phase1Snapshot); throw new RecoveryEngineException(shardId, 2, "Snapshot failed", wrapIfClosed(e)); } try { recoveryHandler.phase2(phase2Snapshot); } catch (Throwable e) { - maybeFailEngine(e, "recovery phase 2"); + maybeFailEngine("recovery phase 2", e); Releasables.closeWhileHandlingException(onGoingRecoveries, phase1Snapshot, phase2Snapshot); throw new RecoveryEngineException(shardId, 2, "Execution failed", wrapIfClosed(e)); } @@ -977,7 +982,7 @@ public class InternalEngine extends Engine { recoveryHandler.phase3(phase3Snapshot); success = true; } catch (Throwable e) { - maybeFailEngine(e, "recovery phase 3"); + maybeFailEngine("recovery phase 3", e); throw new RecoveryEngineException(shardId, 3, "Execution failed", wrapIfClosed(e)); } finally { Releasables.close(success, onGoingRecoveries, writeLock, phase1Snapshot, @@ -985,7 +990,7 @@ public class InternalEngine extends Engine { } } - private boolean maybeFailEngine(Throwable t, String source) { + private boolean maybeFailEngine(String source, Throwable t) { if (Lucene.isCorruptionException(t)) { if (engineConfig.isFailEngineOnCorruption()) { failEngine("corrupt file detected source: [" + source + "]", t); @@ -996,12 +1001,25 @@ public class InternalEngine extends Engine { } else if (ExceptionsHelper.isOOM(t)) { failEngine("out of memory", t); return true; + } else if (t instanceof AlreadyClosedException) { + // if we are already closed due to some tragic exception + // we need to fail the engine. it might have already been failed before + // but we are double-checking it's failed and closed + if (indexWriter.isOpen() == false && indexWriter.getTragicException() != null) { + failEngine("already closed by tragic event", indexWriter.getTragicException()); + } + return true; + } else if (t != null && indexWriter.isOpen() == false && indexWriter.getTragicException() == t) { + // this spot on - we are handling the tragic event exception here so we have to fail the engine + // right away + failEngine(source, t); + return true; } return false; } private Throwable wrapIfClosed(Throwable t) { - if (closedOrFailed) { + if (isClosed.get()) { if (t != failedEngine && failedEngine != null) { t.addSuppressed(failedEngine); } @@ -1118,37 +1136,47 @@ public class InternalEngine extends Engine { @Override public void close() throws ElasticsearchException { - logger.debug("close now acquire writeLock"); - try (ReleasableLock _ = writeLock.acquire()) { - logger.debug("close acquired writeLock"); - if (isClosed.compareAndSet(false, true)) { + if (isClosed.get() == false) { // don't acquire the write lock if we are already closed + logger.trace("close now acquire writeLock"); + try (ReleasableLock _ = writeLock.acquire()) { + logger.trace("close now acquired writeLock"); + closeNoLock("api"); + } + } + } + + /** + * Closes the engine without acquiring the write lock. This should only be + * called while the write lock is hold or in a disaster condition ie. if the engine + * is failed. + */ + private void closeNoLock(String reason) throws ElasticsearchException { + if (isClosed.compareAndSet(false, true)) { + assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself"; + try { + this.versionMap.clear(); + logger.trace("close searcherManager"); try { - closedOrFailed = true; - this.versionMap.clear(); - logger.debug("close searcherManager"); - try { - IOUtils.close(searcherManager); - } catch (Throwable t) { - logger.warn("Failed to close SearcherManager", t); - } - // no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed - if (indexWriter != null) { - logger.debug("rollback indexWriter"); - try { - indexWriter.rollback(); - } catch (AlreadyClosedException e) { - // ignore - } - logger.debug("rollback indexWriter done"); - } - } catch (Throwable e) { - logger.warn("failed to rollback writer on close", e); - } finally { - store.decRef(); - this.mergeScheduler.removeListener(mergeSchedulerListener); - this.mergeScheduler.removeFailureListener(mergeSchedulerFailureListener); - engineConfig.getIndexSettingsService().removeListener(listener); + IOUtils.close(searcherManager); + } catch (Throwable t) { + logger.warn("Failed to close SearcherManager", t); } + // no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed + logger.trace("rollback indexWriter"); + try { + indexWriter.rollback(); + } catch (AlreadyClosedException e) { + // ignore + } + logger.trace("rollback indexWriter done"); + } catch (Throwable e) { + logger.warn("failed to rollback writer on close", e); + } finally { + store.decRef(); + this.mergeScheduler.removeListener(mergeSchedulerListener); + this.mergeScheduler.removeFailureListener(mergeSchedulerFailureListener); + engineConfig.getIndexSettingsService().removeListener(listener); + logger.debug("engine closed [{}]", reason); } } } @@ -1157,8 +1185,11 @@ public class InternalEngine extends Engine { public void failEngine(String reason, Throwable failure) { assert failure != null; if (failEngineLock.tryLock()) { + store.incRef(); try { try { + // we just go and close this engine - no way to recover + closeNoLock("engine failed on: [" + reason + "]"); // we first mark the store as corrupted before we notify any listeners // this must happen first otherwise we might try to reallocate so quickly // on the same node that we don't see the corrupted marker file when @@ -1184,14 +1215,7 @@ public class InternalEngine extends Engine { // don't bubble up these exceptions up logger.warn("failEngine threw exception", t); } finally { - closedOrFailed = true; - try (ReleasableLock _ = readLock.acquire()) { - // we take the readlock here to ensure nobody replaces this IW concurrently. - indexWriter.rollback(); - } catch (Throwable t) { - logger.warn("Rolling back indexwriter on engine failure failed", t); - // to be on the safe side we just rollback the IW - } + store.decRef(); } } else { logger.debug("tried to fail engine but could not acquire lock - engine should be failed by now [{}]", reason, failure); @@ -1252,7 +1276,7 @@ public class InternalEngine extends Engine { } } catch (Throwable t) { // Don't fail a merge if the warm-up failed - if (closedOrFailed == false) { + if (isClosed.get() == false) { logger.warn("Warm-up failed", t); } if (t instanceof Error) { @@ -1324,7 +1348,7 @@ public class InternalEngine extends Engine { } warmer.warmTopReader(new IndicesWarmer.WarmerContext(shardId, new Searcher("warmer", searcher))); } catch (Throwable e) { - if (closedOrFailed == false) { + if (isClosed.get() == false) { logger.warn("failed to prepare/warm", e); } } finally { @@ -1402,4 +1426,14 @@ public class InternalEngine extends Engine { EngineConfig config() { return engineConfig; } + + + private void commitIndexWriter(IndexWriter writer) throws IOException { + try { + writer.commit(); + } catch (Throwable ex) { + failEngine("lucene commit failed", ex); + throw ex; + } + } } diff --git a/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/src/main/java/org/elasticsearch/index/shard/IndexShard.java index c0f1bd276fc..f13811b6ed7 100644 --- a/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1084,17 +1084,12 @@ public class IndexShard extends AbstractIndexShardComponent { // called by the current engine @Override public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable failure) { - try { - for (Engine.FailedEngineListener listener : delegates) { - try { - listener.onFailedEngine(shardId, reason, failure); - } catch (Exception e) { - logger.warn("exception while notifying engine failure", e); - } + for (Engine.FailedEngineListener listener : delegates) { + try { + listener.onFailedEngine(shardId, reason, failure); + } catch (Exception e) { + logger.warn("exception while notifying engine failure", e); } - } finally { - // close the engine all bets are off... don't use engine() here it can throw an exception - IOUtils.closeWhileHandlingException(currentEngineReference.get()); } } } diff --git a/src/test/java/org/elasticsearch/search/basic/SearchWithRandomExceptionsTests.java b/src/test/java/org/elasticsearch/search/basic/SearchWithRandomExceptionsTests.java index da2caf82d04..9828133e515 100644 --- a/src/test/java/org/elasticsearch/search/basic/SearchWithRandomExceptionsTests.java +++ b/src/test/java/org/elasticsearch/search/basic/SearchWithRandomExceptionsTests.java @@ -22,7 +22,6 @@ package org.elasticsearch.search.basic; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.LeafReader; import org.apache.lucene.util.English; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; @@ -52,7 +51,6 @@ import java.util.concurrent.ExecutionException; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*; -@LuceneTestCase.AwaitsFix(bugUrl = "Boaz Leskes: disabling this until further discussion. Recent failures probably relate to #9211 & #8720 (+ friends)") public class SearchWithRandomExceptionsTests extends ElasticsearchIntegrationTest { @Test