Harden periodically check to avoid endless flush loop (#29125)

In #28350, we fixed an endless flushing loop which may happen on 
replicas by tightening the relation between the flush action and the
periodically flush condition.

1. The periodically flush condition is enabled only if it is disabled 
after a flush.

2. If the periodically flush condition is enabled then a flush will
actually happen regardless of Lucene state.

(1) and (2) guarantee that a flushing loop will be terminated. Sadly, 
the condition 1 can be violated in edge cases as we used two different
algorithms to evaluate the current and future uncommitted translog size.

- We use method `uncommittedSizeInBytes` to calculate current 
  uncommitted size. It is the sum of translogs whose generation at least
the minGen (determined by a given seqno). We pick a continuous range of
translogs since the minGen to evaluate the current uncommitted size.

- We use method `sizeOfGensAboveSeqNoInBytes` to calculate the future 
  uncommitted size. It is the sum of translogs whose maxSeqNo at least
the given seqNo. Here we don't pick a range but select translog one by
one.

Suppose we have 3 translogs `gen1={#1,#2}, gen2={}, gen3={#3} and 
seqno=#1`, `uncommittedSizeInBytes` is the sum of gen1, gen2, and gen3
while `sizeOfGensAboveSeqNoInBytes` is the sum of gen1 and gen3. Gen2 is
excluded because its maxSeqno is still -1.

This commit removes both `sizeOfGensAboveSeqNoInBytes` and 
`uncommittedSizeInBytes` methods, then enforces an engine to use only
`sizeInBytesByMinGen` method to evaluate the periodically flush condition.

Closes #29097
Relates ##28350
This commit is contained in:
Nhat Nguyen 2018-03-22 14:31:15 -04:00 committed by GitHub
parent c93c7f3121
commit 14157c8705
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 94 additions and 67 deletions

View File

@ -1361,7 +1361,8 @@ public class InternalEngine extends Engine {
ensureOpen();
ensureCanFlush();
String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID);
if (syncId != null && translog.uncommittedOperations() == 0 && indexWriter.hasUncommittedChanges()) {
long translogGenOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY));
if (syncId != null && indexWriter.hasUncommittedChanges() && translog.totalOperationsByMinGen(translogGenOfLastCommit) == 0) {
logger.trace("start renewing sync commit [{}]", syncId);
commitIndexWriter(indexWriter, translog, syncId);
logger.debug("successfully sync committed. sync id [{}].", syncId);
@ -1383,19 +1384,30 @@ public class InternalEngine extends Engine {
@Override
public boolean shouldPeriodicallyFlush() {
ensureOpen();
final long translogGenerationOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY));
final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes();
final long uncommittedSizeOfCurrentCommit = translog.uncommittedSizeInBytes();
if (uncommittedSizeOfCurrentCommit < flushThreshold) {
if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) {
return false;
}
/*
* We should only flush ony if the shouldFlush condition can become false after flushing.
* This condition will change if the `uncommittedSize` of the new commit is smaller than
* the `uncommittedSize` of the current commit. This method is to maintain translog only,
* thus the IndexWriter#hasUncommittedChanges condition is not considered.
* We flush to reduce the size of uncommitted translog but strictly speaking the uncommitted size won't always be
* below the flush-threshold after a flush. To avoid getting into an endless loop of flushing, we only enable the
* periodically flush condition if this condition is disabled after a flush. The condition will change if the new
* commit points to the later generation the last commit's(eg. gen-of-last-commit < gen-of-new-commit)[1].
*
* When the local checkpoint equals to max_seqno, and translog-gen of the last commit equals to translog-gen of
* the new commit, we know that the last generation must contain operations because its size is above the flush
* threshold and the flush-threshold is guaranteed to be higher than an empty translog by the setting validation.
* This guarantees that the new commit will point to the newly rolled generation. In fact, this scenario only
* happens when the generation-threshold is close to or above the flush-threshold; otherwise we have rolled
* generations as the generation-threshold was reached, then the first condition (eg. [1]) is already satisfied.
*
* This method is to maintain translog only, thus IndexWriter#hasUncommittedChanges condition is not considered.
*/
final long uncommittedSizeOfNewCommit = translog.sizeOfGensAboveSeqNoInBytes(localCheckpointTracker.getCheckpoint() + 1);
return uncommittedSizeOfNewCommit < uncommittedSizeOfCurrentCommit;
final long translogGenerationOfNewCommit =
translog.getMinGenerationForSeqNo(localCheckpointTracker.getCheckpoint() + 1).translogFileGeneration;
return translogGenerationOfLastCommit < translogGenerationOfNewCommit
|| localCheckpointTracker.getCheckpoint() == localCheckpointTracker.getMaxSeqNo();
}
@Override

View File

@ -356,26 +356,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
}
/**
* Returns the number of operations in the translog files that aren't committed to lucene.
*/
public int uncommittedOperations() {
return totalOperations(deletionPolicy.getTranslogGenerationOfLastCommit());
}
/**
* Returns the size in bytes of the translog files that aren't committed to lucene.
*/
public long uncommittedSizeInBytes() {
return sizeInBytesByMinGen(deletionPolicy.getTranslogGenerationOfLastCommit());
}
/**
* Returns the number of operations in the translog files
*/
public int totalOperations() {
return totalOperations(-1);
return totalOperationsByMinGen(-1);
}
/**
@ -406,9 +391,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
/**
* Returns the number of operations in the transaction files that aren't committed to lucene..
* Returns the number of operations in the translog files at least the given generation
*/
private int totalOperations(long minGeneration) {
public int totalOperationsByMinGen(long minGeneration) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
return Stream.concat(readers.stream(), Stream.of(current))
@ -429,9 +414,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
/**
* Returns the size in bytes of the translog files above the given generation
* Returns the size in bytes of the translog files at least the given generation
*/
private long sizeInBytesByMinGen(long minGeneration) {
public long sizeInBytesByMinGen(long minGeneration) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
return Stream.concat(readers.stream(), Stream.of(current))
@ -441,16 +426,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
}
/**
* Returns the size in bytes of the translog files with ops above the given seqNo
*/
public long sizeOfGensAboveSeqNoInBytes(long minSeqNo) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
return readersAboveMinSeqNo(minSeqNo).mapToLong(BaseTranslogReader::sizeInBytes).sum();
}
}
/**
* Creates a new translog for the specified generation.
*
@ -758,7 +733,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
public TranslogStats stats() {
// acquire lock to make the two numbers roughly consistent (no file change half way)
try (ReleasableLock lock = readLock.acquire()) {
return new TranslogStats(totalOperations(), sizeInBytes(), uncommittedOperations(), uncommittedSizeInBytes(), earliestLastModifiedAge());
final long uncommittedGen = deletionPolicy.getTranslogGenerationOfLastCommit();
return new TranslogStats(totalOperations(), sizeInBytes(), totalOperationsByMinGen(uncommittedGen), sizeInBytesByMinGen(uncommittedGen), earliestLastModifiedAge());
}
}

View File

@ -211,7 +211,6 @@ public class TranslogDeletionPolicy {
/**
* Returns a translog generation that will be used to calculate the number of uncommitted operations since the last index commit.
* See {@link Translog#uncommittedOperations()} and {@link Translog#uncommittedSizeInBytes()}
*/
public synchronized long getTranslogGenerationOfLastCommit() {
return translogGenerationOfLastCommit;

View File

@ -156,7 +156,7 @@ public class EngineDiskUtilsTests extends EngineTestCase {
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.recoverFromTranslog();
assertEquals(2, engine.getTranslog().currentFileGeneration());
assertEquals(0L, engine.getTranslog().uncommittedOperations());
assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations());
}
}

View File

@ -725,8 +725,7 @@ public class InternalEngineTests extends EngineTestCase {
super.commitIndexWriter(writer, translog, syncId);
}
};
assertThat(recoveringEngine.getTranslog().uncommittedOperations(), equalTo(docs));
assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(docs));
recoveringEngine.recoverFromTranslog();
assertTrue(committed.get());
} finally {
@ -3614,7 +3613,7 @@ public class InternalEngineTests extends EngineTestCase {
System.nanoTime(),
reason));
assertThat(noOpEngine.getLocalCheckpointTracker().getCheckpoint(), equalTo((long) (maxSeqNo + 1)));
assertThat(noOpEngine.getTranslog().uncommittedOperations(), equalTo(1 + gapsFilled));
assertThat(noOpEngine.getTranslog().stats().getUncommittedOperations(), equalTo(1 + gapsFilled));
// skip to the op that we added to the translog
Translog.Operation op;
Translog.Operation last = null;
@ -3814,7 +3813,7 @@ public class InternalEngineTests extends EngineTestCase {
assertEquals(maxSeqIDOnReplica, replicaEngine.getLocalCheckpointTracker().getMaxSeqNo());
assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpointTracker().getCheckpoint());
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get));
assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().uncommittedOperations());
assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().stats().getUncommittedOperations());
recoveringEngine.recoverFromTranslog();
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo());
assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpointTracker().getCheckpoint());
@ -3848,7 +3847,7 @@ public class InternalEngineTests extends EngineTestCase {
try {
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get));
if (flushed) {
assertEquals(0, recoveringEngine.getTranslog().uncommittedOperations());
assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(0));
}
recoveringEngine.recoverFromTranslog();
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo());
@ -4252,7 +4251,8 @@ public class InternalEngineTests extends EngineTestCase {
public void testShouldPeriodicallyFlush() throws Exception {
assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false));
// A new engine may have more than one empty translog files - the test should account this extra.
final long extraTranslogSizeInNewEngine = engine.getTranslog().uncommittedSizeInBytes() - Translog.DEFAULT_HEADER_SIZE_IN_BYTES;
final Translog translog = engine.getTranslog();
final long extraTranslogSizeInNewEngine = engine.getTranslog().stats().getUncommittedSizeInBytes() - Translog.DEFAULT_HEADER_SIZE_IN_BYTES;
int numDocs = between(10, 100);
for (int id = 0; id < numDocs; id++) {
final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null);
@ -4260,17 +4260,17 @@ public class InternalEngineTests extends EngineTestCase {
}
assertThat("Not exceeded translog flush threshold yet", engine.shouldPeriodicallyFlush(), equalTo(false));
long flushThreshold = RandomNumbers.randomLongBetween(random(), 100,
engine.getTranslog().uncommittedSizeInBytes() - extraTranslogSizeInNewEngine);
engine.getTranslog().stats().getUncommittedSizeInBytes()- extraTranslogSizeInNewEngine);
final IndexSettings indexSettings = engine.config().getIndexSettings();
final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData())
.settings(Settings.builder().put(indexSettings.getSettings())
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build();
indexSettings.updateIndexMetaData(indexMetaData);
engine.onSettingsChanged();
assertThat(engine.getTranslog().uncommittedOperations(), equalTo(numDocs));
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs));
assertThat(engine.shouldPeriodicallyFlush(), equalTo(true));
engine.flush();
assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0));
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(0));
// Stale operations skipped by Lucene but added to translog - still able to flush
for (int id = 0; id < numDocs; id++) {
final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null);
@ -4278,13 +4278,53 @@ public class InternalEngineTests extends EngineTestCase {
assertThat(result.isCreated(), equalTo(false));
}
SegmentInfos lastCommitInfo = engine.getLastCommittedSegmentInfos();
assertThat(engine.getTranslog().uncommittedOperations(), equalTo(numDocs));
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs));
assertThat(engine.shouldPeriodicallyFlush(), equalTo(true));
engine.flush(false, false);
assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo)));
assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0));
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(0));
// If the new index commit still points to the same translog generation as the current index commit,
// we should not enable the periodically flush condition; otherwise we can get into an infinite loop of flushes.
engine.getLocalCheckpointTracker().generateSeqNo(); // create a gap here
for (int id = 0; id < numDocs; id++) {
if (randomBoolean()) {
translog.rollGeneration();
}
final ParsedDocument doc = testParsedDocument("new" + id, null, testDocumentWithTextField(), SOURCE, null);
engine.index(replicaIndexForDoc(doc, 2L, engine.getLocalCheckpointTracker().generateSeqNo(), false));
if (engine.shouldPeriodicallyFlush()) {
engine.flush();
assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo)));
assertThat(engine.shouldPeriodicallyFlush(), equalTo(false));
}
}
}
public void testStressShouldPeriodicallyFlush() throws Exception {
final long flushThreshold = randomLongBetween(100, 5000);
final long generationThreshold = randomLongBetween(1000, 5000);
final IndexSettings indexSettings = engine.config().getIndexSettings();
final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData())
.settings(Settings.builder().put(indexSettings.getSettings())
.put(IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING.getKey(), generationThreshold + "b")
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build();
indexSettings.updateIndexMetaData(indexMetaData);
engine.onSettingsChanged();
final int numOps = scaledRandomIntBetween(100, 10_000);
for (int i = 0; i < numOps; i++) {
final long localCheckPoint = engine.getLocalCheckpointTracker().getCheckpoint();
final long seqno = randomLongBetween(Math.max(0, localCheckPoint), localCheckPoint + 5);
final ParsedDocument doc = testParsedDocument(Long.toString(seqno), null, testDocumentWithTextField(), SOURCE, null);
engine.index(replicaIndexForDoc(doc, 1L, seqno, false));
if (rarely() && engine.getTranslog().shouldRollGeneration()) {
engine.rollTranslogGeneration();
}
if (rarely() || engine.shouldPeriodicallyFlush()) {
engine.flush();
assertThat(engine.shouldPeriodicallyFlush(), equalTo(false));
}
}
}
public void testStressUpdateSameDocWhileGettingIt() throws IOException, InterruptedException {
final int iters = randomIntBetween(1, 15);

View File

@ -342,29 +342,29 @@ public class IndexShardIT extends ESSingleNodeTestCase {
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, update -> {});
assertTrue(shard.shouldPeriodicallyFlush());
final Translog translog = shard.getEngine().getTranslog();
assertEquals(2, translog.uncommittedOperations());
assertEquals(2, translog.stats().getUncommittedOperations());
client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON)
.setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get();
assertBusy(() -> { // this is async
assertFalse(shard.shouldPeriodicallyFlush());
});
assertEquals(0, translog.uncommittedOperations());
assertEquals(0, translog.stats().getUncommittedOperations());
translog.sync();
long size = Math.max(translog.uncommittedSizeInBytes(), Translog.DEFAULT_HEADER_SIZE_IN_BYTES + 1);
logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(),
translog.uncommittedOperations(), translog.getGeneration());
long size = Math.max(translog.stats().getUncommittedSizeInBytes(), Translog.DEFAULT_HEADER_SIZE_IN_BYTES + 1);
logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]",
translog.stats().getUncommittedSizeInBytes(), translog.stats().getUncommittedOperations(), translog.getGeneration());
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(
IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(size, ByteSizeUnit.BYTES))
.build()).get();
client().prepareDelete("test", "test", "2").get();
logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(),
translog.uncommittedOperations(), translog.getGeneration());
logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]",
translog.stats().getUncommittedSizeInBytes(), translog.stats().getUncommittedOperations(), translog.getGeneration());
assertBusy(() -> { // this is async
logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(),
translog.uncommittedOperations(), translog.getGeneration());
logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]",
translog.stats().getUncommittedSizeInBytes(), translog.stats().getUncommittedOperations(), translog.getGeneration());
assertFalse(shard.shouldPeriodicallyFlush());
});
assertEquals(0, translog.uncommittedOperations());
assertEquals(0, translog.stats().getUncommittedOperations());
}
public void testMaybeRollTranslogGeneration() throws Exception {

View File

@ -501,10 +501,10 @@ public class TranslogTests extends ESTestCase {
translog.rollGeneration();
operationsInLastGen = 0;
}
assertThat(translog.uncommittedOperations(), equalTo(uncommittedOps));
assertThat(translog.stats().getUncommittedOperations(), equalTo(uncommittedOps));
if (frequently()) {
markCurrentGenAsCommitted(translog);
assertThat(translog.uncommittedOperations(), equalTo(operationsInLastGen));
assertThat(translog.stats().getUncommittedOperations(), equalTo(operationsInLastGen));
uncommittedOps = operationsInLastGen;
}
}
@ -2514,7 +2514,7 @@ public class TranslogTests extends ESTestCase {
long minGenForRecovery = randomLongBetween(generation, generation + rolls);
commit(translog, minGenForRecovery, generation + rolls);
assertThat(translog.currentFileGeneration(), equalTo(generation + rolls));
assertThat(translog.uncommittedOperations(), equalTo(0));
assertThat(translog.stats().getUncommittedOperations(), equalTo(0));
if (longRetention) {
for (int i = 0; i <= rolls; i++) {
assertFileIsPresent(translog, generation + i);

View File

@ -306,7 +306,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
try (ReplicationGroup shards = createGroup(0)) {
shards.startAll();
int numDocs = shards.indexDocs(between(10, 100));
final long translogSizeOnPrimary = shards.getPrimary().getTranslog().uncommittedSizeInBytes();
final long translogSizeOnPrimary = shards.getPrimary().translogStats().getUncommittedSizeInBytes();
shards.flush();
final IndexShard replica = shards.addReplica();