Fix Local Translog Recovery not Updating Safe Commit in Edge Case (#57350) (#57380)

In case the local checkpoint in the latest commit is less
than the last processed local checkpoint we would recover
0 ops and hence not commit again.
This would lead to the logic in `IndexShard#recoverLocallyUpToGlobalCheckpoint`
not seeing the latest local checkpoint when it reload the safe commit from the store
and thus cause inefficient recoveries because the recoveries would work from a
lower than possible local checkpoint.

Closes #57010
This commit is contained in:
Armin Braun 2020-05-30 09:28:50 +02:00 committed by GitHub
parent d6a3704932
commit 59570eaa7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 23 additions and 27 deletions

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.engine;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
@ -453,7 +454,6 @@ public class InternalEngine extends Engine {
@Override
public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
flushLock.lock();
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
if (pendingTranslogRecovery.get() == false) {
@ -470,8 +470,6 @@ public class InternalEngine extends Engine {
}
throw e;
}
} finally {
flushLock.unlock();
}
return this;
}
@ -498,13 +496,10 @@ public class InternalEngine extends Engine {
// note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
pendingTranslogRecovery.set(false); // we are good - now we can commit
if (opsRecovered > 0) {
logger.trace("flushing post recovery from translog: ops recovered [{}], current translog generation [{}]",
opsRecovered, translog.currentFileGeneration());
commitIndexWriter(indexWriter, translog, null);
refreshLastCommittedSegmentInfos();
refresh("translog_recovery");
}
logger.trace(() -> new ParameterizedMessage(
"flushing post recovery from translog: ops recovered [{}], current translog generation [{}]",
opsRecovered, translog.currentFileGeneration()));
flush(false, true);
translog.trimUnreferencedReaders();
}
@ -1759,12 +1754,6 @@ public class InternalEngine extends Engine {
"wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing);
}
final byte[] newCommitId;
/*
* Unfortunately the lock order is important here. We have to acquire the readlock first otherwise
* if we are flushing at the end of the recovery while holding the write lock we can deadlock if:
* Thread 1: flushes via API and gets the flush lock but blocks on the readlock since Thread 2 has the writeLock
* Thread 2: flushes at the end of the recovery holding the writeLock and blocks on the flushLock owned by Thread 1
*/
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
if (flushLock.tryLock() == false) {
@ -1781,10 +1770,13 @@ public class InternalEngine extends Engine {
}
try {
// Only flush if (1) Lucene has uncommitted docs, or (2) forced by caller, or (3) the
// newly created commit points to a different translog generation (can free translog)
// newly created commit points to a different translog generation (can free translog),
// or (4) the local checkpoint information in the last commit is stale, which slows down future recoveries.
boolean hasUncommittedChanges = indexWriter.hasUncommittedChanges();
boolean shouldPeriodicallyFlush = shouldPeriodicallyFlush();
if (hasUncommittedChanges || force || shouldPeriodicallyFlush) {
if (hasUncommittedChanges || force || shouldPeriodicallyFlush
|| getProcessedLocalCheckpoint() > Long.parseLong(
lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY))) {
ensureCanFlush();
try {
translog.rollGeneration();

View File

@ -1615,13 +1615,15 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
/**
* Roll the current translog generation into a new generation. This does not commit the
* translog.
* Roll the current translog generation into a new generation if it's not empty. This does not commit the translog.
*
* @throws IOException if an I/O exception occurred during any file operations
*/
public void rollGeneration() throws IOException {
syncBeforeRollGeneration();
if (current.totalOperations() == 0 && primaryTermSupplier.getAsLong() == current.getPrimaryTerm()) {
return;
}
try (Releasable ignored = writeLock.acquire()) {
ensureOpen();
try {

View File

@ -772,6 +772,7 @@ public class InternalEngineTests extends EngineTestCase {
}
try (Engine recoveringEngine = new InternalEngine(engine.config())) {
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
recoveringEngine.refresh("test");
try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
final TotalHitCountCollector collector = new TotalHitCountCollector();
searcher.search(new MatchAllDocsQuery(), collector);
@ -840,6 +841,7 @@ public class InternalEngineTests extends EngineTestCase {
initialEngine.close();
recoveringEngine = new InternalEngine(initialEngine.config());
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
recoveringEngine.refresh("test");
try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), docs);
assertEquals(docs, topDocs.totalHits.value);
@ -1166,13 +1168,13 @@ public class InternalEngineTests extends EngineTestCase {
assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L));
engine.flush(true, true);
assertThat(engine.getTranslog().currentFileGeneration(), equalTo(4L));
assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 4L : 2L));
assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L));
assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L));
globalCheckpoint.set(engine.getPersistedLocalCheckpoint());
engine.flush(true, true);
assertThat(engine.getTranslog().currentFileGeneration(), equalTo(5L));
assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(5L));
assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L));
assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(3L));
}
public void testSyncTranslogConcurrently() throws Exception {
@ -3143,11 +3145,10 @@ public class InternalEngineTests extends EngineTestCase {
assertEquals(flush ? 1 : 2, translogHandler.appliedOperations());
engine.delete(new Engine.Delete("test", Integer.toString(randomId), newUid(doc), primaryTerm.get()));
if (randomBoolean()) {
engine.refresh("test");
} else {
engine.close();
engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier);
}
engine.refresh("test");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), numDocs);
assertThat(topDocs.totalHits.value, equalTo((long) numDocs));
@ -3209,7 +3210,7 @@ public class InternalEngineTests extends EngineTestCase {
expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig));
engine = createEngine(store, primaryTranslogDir); // and recover again!
assertVisibleCount(engine, numDocs, false);
assertVisibleCount(engine, numDocs, true);
}
public void testShardNotAvailableExceptionWhenEngineClosedConcurrently() throws IOException, InterruptedException {

View File

@ -3234,6 +3234,7 @@ public class TranslogTests extends ESTestCase {
try (Translog brokenTranslog = create(filterFileSystemProvider.getPath(path.toUri()))) {
failOnCopy.set(true);
primaryTerm.incrementAndGet(); // increment primary term to force rolling generation
assertThat(expectThrows(IOException.class, brokenTranslog::rollGeneration).getMessage(), equalTo(expectedExceptionMessage));
assertFalse(brokenTranslog.isOpen());