From dda72428484b4a5d79b6d9c97d56365851a79543 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Sat, 31 Jan 2015 15:49:51 +0100 Subject: [PATCH] [ENGINE] Fail engine if Lucene commit fails This is similar to refresh, if we fail to commit the data we have to fail the engine since in-ram data is likely discarded. Yet, it's still in translog and might be recoverable when the node is restarted but we have to treat the engine as failed. --- .../index/engine/internal/InternalEngine.java | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index 34ddb7063bc..7b5bbe3a0c1 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -236,7 +236,7 @@ public class InternalEngine implements Engine { } if (mustCommitTranslogId) { // translog id is not in the metadata - fix this inconsistency some code relies on this and old indices might not have it. indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId))); - indexWriter.commit(); + commitIndexWriter(indexWriter); } searcherManager = buildSearchManager(indexWriter); lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); @@ -717,6 +717,17 @@ public class InternalEngine implements Engine { versionMapRefreshPending.set(false); } + private void commitIndexWriter(IndexWriter writer) throws IOException { + try { + writer.commit(); + } catch (AlreadyClosedException ex) { + throw ex; + } catch (Throwable ex) { + failEngine("lucene commit failed", ex); + throw ex; + } + } + @Override public void flush(FlushType type, boolean force, boolean waitIfOngoing) throws EngineException { ensureOpen(); @@ -743,7 +754,7 @@ public class InternalEngine implements Engine { { // commit and close the current writer - we write the current tanslog ID just in case final long translogId = translog.currentId(); indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId))); - indexWriter.commit(); + commitIndexWriter(indexWriter); indexWriter.rollback(); } indexWriter = createWriter(); @@ -753,7 +764,7 @@ public class InternalEngine implements Engine { flushNeeded = false; long translogId = translogIdGenerator.incrementAndGet(); indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId))); - indexWriter.commit(); + commitIndexWriter(indexWriter); translog.newTranslog(translogId); } @@ -786,7 +797,7 @@ public class InternalEngine implements Engine { long translogId = translogIdGenerator.incrementAndGet(); translog.newTransientTranslog(translogId); indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId))); - indexWriter.commit(); + commitIndexWriter(indexWriter); // we need to refresh in order to clear older version values refresh("version_table_flush"); // we need to move transient to current only after we refresh @@ -823,7 +834,7 @@ public class InternalEngine implements Engine { try { long translogId = translog.currentId(); indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId))); - indexWriter.commit(); + commitIndexWriter(indexWriter); } catch (Throwable e) { throw new FlushFailedEngineException(shardId, e); }