[ENGINE] Fail engine if Lucene commit fails

This is similar to refresh, if we fail to commit the data we have to fail the
engine since in-ram data is likely discarded. Yet, it's still in translog and might
be recoverable when the node is restarted but we have to treat the engine as failed.
This commit is contained in:
Simon Willnauer 2015-01-31 15:49:51 +01:00
parent 9557625ae7
commit dda7242848
1 changed files with 16 additions and 5 deletions

View File

@ -236,7 +236,7 @@ public class InternalEngine implements Engine {
} }
if (mustCommitTranslogId) { // translog id is not in the metadata - fix this inconsistency some code relies on this and old indices might not have it. if (mustCommitTranslogId) { // translog id is not in the metadata - fix this inconsistency some code relies on this and old indices might not have it.
indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId))); indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)));
indexWriter.commit(); commitIndexWriter(indexWriter);
} }
searcherManager = buildSearchManager(indexWriter); searcherManager = buildSearchManager(indexWriter);
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
@ -717,6 +717,17 @@ public class InternalEngine implements Engine {
versionMapRefreshPending.set(false); versionMapRefreshPending.set(false);
} }
private void commitIndexWriter(IndexWriter writer) throws IOException {
try {
writer.commit();
} catch (AlreadyClosedException ex) {
throw ex;
} catch (Throwable ex) {
failEngine("lucene commit failed", ex);
throw ex;
}
}
@Override @Override
public void flush(FlushType type, boolean force, boolean waitIfOngoing) throws EngineException { public void flush(FlushType type, boolean force, boolean waitIfOngoing) throws EngineException {
ensureOpen(); ensureOpen();
@ -743,7 +754,7 @@ public class InternalEngine implements Engine {
{ // commit and close the current writer - we write the current tanslog ID just in case { // commit and close the current writer - we write the current tanslog ID just in case
final long translogId = translog.currentId(); final long translogId = translog.currentId();
indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId))); indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)));
indexWriter.commit(); commitIndexWriter(indexWriter);
indexWriter.rollback(); indexWriter.rollback();
} }
indexWriter = createWriter(); indexWriter = createWriter();
@ -753,7 +764,7 @@ public class InternalEngine implements Engine {
flushNeeded = false; flushNeeded = false;
long translogId = translogIdGenerator.incrementAndGet(); long translogId = translogIdGenerator.incrementAndGet();
indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId))); indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)));
indexWriter.commit(); commitIndexWriter(indexWriter);
translog.newTranslog(translogId); translog.newTranslog(translogId);
} }
@ -786,7 +797,7 @@ public class InternalEngine implements Engine {
long translogId = translogIdGenerator.incrementAndGet(); long translogId = translogIdGenerator.incrementAndGet();
translog.newTransientTranslog(translogId); translog.newTransientTranslog(translogId);
indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId))); indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)));
indexWriter.commit(); commitIndexWriter(indexWriter);
// we need to refresh in order to clear older version values // we need to refresh in order to clear older version values
refresh("version_table_flush"); refresh("version_table_flush");
// we need to move transient to current only after we refresh // we need to move transient to current only after we refresh
@ -823,7 +834,7 @@ public class InternalEngine implements Engine {
try { try {
long translogId = translog.currentId(); long translogId = translog.currentId();
indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId))); indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)));
indexWriter.commit(); commitIndexWriter(indexWriter);
} catch (Throwable e) { } catch (Throwable e) {
throw new FlushFailedEngineException(shardId, e); throw new FlushFailedEngineException(shardId, e);
} }