Fix peer recovery flushing loop (#28350)

Today after writing an operation to an engine, we will call 
`IndexShard#afterWriteOperation` to flush a new commit if needed. The 
`shouldFlush` condition is purely based on the uncommitted translog size
and the translog flush threshold size setting. However this can cause a
replica execute an infinite loop of flushing in the following situation.

1. Primary has a fully baked index commit with its local checkpoint 
equals to max_seqno
2. Primary sends that fully baked commit, then replays all retained
translog operations to the replica
3. No operations are added to Lucence on the replica as seqno of these
operations are at most the local checkpoint
4. Once translog operations are replayed, the target calls 
`IndexShard#afterWriteOperation` to flush. If the total size of the
replaying operations exceeds the flush threshold size, this call will
`Engine#flush`. However the engine won't flush as its index writer does
not have any uncommitted operations. The method
`IndexShard#afterWriteOperation` will keep flushing as the condition
`shouldFlush` is still true.

This issue can be avoided if we always flush if the `shouldFlush` 
condition is true.
This commit is contained in:
Nhat Nguyen 2018-01-25 14:29:46 -05:00 committed by GitHub
parent fd66c94ce1
commit f39402a039
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 112 additions and 30 deletions

View File

@ -817,6 +817,12 @@ public abstract class Engine implements Closeable {
// NOTE: do NOT rename this to something containing flush or refresh! // NOTE: do NOT rename this to something containing flush or refresh!
public abstract void writeIndexingBuffer() throws EngineException; public abstract void writeIndexingBuffer() throws EngineException;
/**
* Checks if this engine should be flushed periodically.
* This check is mainly based on the uncommitted translog size and the translog flush threshold setting.
*/
public abstract boolean shouldPeriodicallyFlush();
/** /**
* Flushes the state of the engine including the transaction log, clearing memory. * Flushes the state of the engine including the transaction log, clearing memory.
* *

View File

@ -1462,6 +1462,31 @@ public class InternalEngine extends Engine {
return renewed; return renewed;
} }
@Override
public boolean shouldPeriodicallyFlush() {
ensureOpen();
final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes();
final long uncommittedSizeOfCurrentCommit = translog.uncommittedSizeInBytes();
if (uncommittedSizeOfCurrentCommit < flushThreshold) {
return false;
}
/*
* We should only flush ony if the shouldFlush condition can become false after flushing.
* This condition will change if the `uncommittedSize` of the new commit is smaller than
* the `uncommittedSize` of the current commit. This method is to maintain translog only,
* thus the IndexWriter#hasUncommittedChanges condition is not considered.
*/
final long uncommittedSizeOfNewCommit = translog.sizeOfGensAboveSeqNoInBytes(localCheckpointTracker.getCheckpoint() + 1);
/*
* If flushThreshold is too small, we may repeatedly flush even there is no uncommitted operation
* as #sizeOfGensAboveSeqNoInByte and #uncommittedSizeInBytes can return different values.
* An empty translog file has non-zero `uncommittedSize` (the translog header), and method #sizeOfGensAboveSeqNoInBytes can
* return 0 now(no translog gen contains ops above local checkpoint) but method #uncommittedSizeInBytes will return an actual
* non-zero value after rolling a new translog generation. This can be avoided by checking the actual uncommitted operations.
*/
return uncommittedSizeOfNewCommit < uncommittedSizeOfCurrentCommit && translog.uncommittedOperations() > 0;
}
@Override @Override
public CommitId flush() throws EngineException { public CommitId flush() throws EngineException {
return flush(false, false); return flush(false, false);
@ -1492,7 +1517,9 @@ public class InternalEngine extends Engine {
logger.trace("acquired flush lock immediately"); logger.trace("acquired flush lock immediately");
} }
try { try {
if (indexWriter.hasUncommittedChanges() || force) { // Only flush if (1) Lucene has uncommitted docs, or (2) forced by caller, or (3) the
// newly created commit points to a different translog generation (can free translog)
if (indexWriter.hasUncommittedChanges() || force || shouldPeriodicallyFlush()) {
ensureCanFlush(); ensureCanFlush();
try { try {
translog.rollGeneration(); translog.rollGeneration();

View File

@ -1600,17 +1600,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
} }
/** /**
* Tests whether or not the translog should be flushed. This test is based on the current size of the translog comparted to the * Tests whether or not the engine should be flushed periodically.
* configured flush threshold size. * This test is based on the current size of the translog compared to the configured flush threshold size.
* *
* @return {@code true} if the translog should be flushed * @return {@code true} if the engine should be flushed
*/ */
boolean shouldFlush() { boolean shouldPeriodicallyFlush() {
final Engine engine = getEngineOrNull(); final Engine engine = getEngineOrNull();
if (engine != null) { if (engine != null) {
try { try {
final Translog translog = engine.getTranslog(); return engine.shouldPeriodicallyFlush();
return translog.shouldFlush();
} catch (final AlreadyClosedException e) { } catch (final AlreadyClosedException e) {
// we are already closed, no need to flush or roll // we are already closed, no need to flush or roll
} }
@ -2364,7 +2363,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* executed asynchronously on the flush thread pool. * executed asynchronously on the flush thread pool.
*/ */
public void afterWriteOperation() { public void afterWriteOperation() {
if (shouldFlush() || shouldRollTranslogGeneration()) { if (shouldPeriodicallyFlush() || shouldRollTranslogGeneration()) {
if (flushOrRollRunning.compareAndSet(false, true)) { if (flushOrRollRunning.compareAndSet(false, true)) {
/* /*
* We have to check again since otherwise there is a race when a thread passes the first check next to another thread which * We have to check again since otherwise there is a race when a thread passes the first check next to another thread which
@ -2374,7 +2373,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* Additionally, a flush implicitly executes a translog generation roll so if we execute a flush then we do not need to * Additionally, a flush implicitly executes a translog generation roll so if we execute a flush then we do not need to
* check if we should roll the translog generation. * check if we should roll the translog generation.
*/ */
if (shouldFlush()) { if (shouldPeriodicallyFlush()) {
logger.debug("submitting async flush request"); logger.debug("submitting async flush request");
final AbstractRunnable flush = new AbstractRunnable() { final AbstractRunnable flush = new AbstractRunnable() {
@Override @Override

View File

@ -436,7 +436,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
/** /**
* Returns the size in bytes of the translog files with ops above the given seqNo * Returns the size in bytes of the translog files with ops above the given seqNo
*/ */
private long sizeOfGensAboveSeqNoInBytes(long minSeqNo) { public long sizeOfGensAboveSeqNoInBytes(long minSeqNo) {
try (ReleasableLock ignored = readLock.acquire()) { try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen(); ensureOpen();
return readersAboveMinSeqNo(minSeqNo).mapToLong(BaseTranslogReader::sizeInBytes).sum(); return readersAboveMinSeqNo(minSeqNo).mapToLong(BaseTranslogReader::sizeInBytes).sum();
@ -523,17 +523,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
} }
} }
/**
* Tests whether or not the translog should be flushed. This test is based on the current size
* of the translog comparted to the configured flush threshold size.
*
* @return {@code true} if the translog should be flushed
*/
public boolean shouldFlush() {
final long size = this.uncommittedSizeInBytes();
return size > this.indexSettings.getFlushThresholdSize().getBytes();
}
/** /**
* Tests whether or not the translog generation should be rolled to a new generation. This test * Tests whether or not the translog generation should be rolled to a new generation. This test
* is based on the size of the current generation compared to the configured generation * is based on the size of the current generation compared to the configured generation

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.engine; package org.elasticsearch.index.engine;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -46,6 +47,7 @@ import org.apache.lucene.index.LogDocMergePolicy;
import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.PointValues; import org.apache.lucene.index.PointValues;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.Term; import org.apache.lucene.index.Term;
import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.IndexSearcher;
@ -163,6 +165,7 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.everyItem;
@ -4439,4 +4442,37 @@ public class InternalEngineTests extends EngineTestCase {
assertThat(DirectoryReader.listCommits(store.directory()), contains(commits.get(commits.size() - 1))); assertThat(DirectoryReader.listCommits(store.directory()), contains(commits.get(commits.size() - 1)));
} }
} }
public void testShouldPeriodicallyFlush() throws Exception {
assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false));
int numDocs = between(10, 100);
for (int id = 0; id < numDocs; id++) {
final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc));
}
assertThat("Not exceeded translog flush threshold yet", engine.shouldPeriodicallyFlush(), equalTo(false));
long flushThreshold = RandomNumbers.randomLongBetween(random(), 100, engine.getTranslog().uncommittedSizeInBytes());
final IndexSettings indexSettings = engine.config().getIndexSettings();
final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData())
.settings(Settings.builder().put(indexSettings.getSettings())
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build();
indexSettings.updateIndexMetaData(indexMetaData);
engine.onSettingsChanged();
assertThat(engine.getTranslog().uncommittedOperations(), equalTo(numDocs));
assertThat(engine.shouldPeriodicallyFlush(), equalTo(true));
engine.flush();
assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0));
// Stale operations skipped by Lucene but added to translog - still able to flush
for (int id = 0; id < numDocs; id++) {
final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null);
final Engine.IndexResult result = engine.index(replicaIndexForDoc(doc, 1L, id, false));
assertThat(result.isCreated(), equalTo(false));
}
SegmentInfos lastCommitInfo = engine.getLastCommittedSegmentInfos();
assertThat(engine.getTranslog().uncommittedOperations(), equalTo(numDocs));
assertThat(engine.shouldPeriodicallyFlush(), equalTo(true));
engine.flush(false, false);
assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo)));
assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0));
}
} }

View File

@ -27,7 +27,6 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
@ -73,7 +72,6 @@ import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.junit.annotations.TestLogging;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException; import java.io.UncheckedIOException;
@ -332,23 +330,23 @@ public class IndexShardIT extends ESSingleNodeTestCase {
IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService(resolveIndex("test")); IndexService test = indicesService.indexService(resolveIndex("test"));
IndexShard shard = test.getShardOrNull(0); IndexShard shard = test.getShardOrNull(0);
assertFalse(shard.shouldFlush()); assertFalse(shard.shouldPeriodicallyFlush());
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder() client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(),
new ByteSizeValue(117 /* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get(); new ByteSizeValue(117 /* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get();
client().prepareIndex("test", "test", "0") client().prepareIndex("test", "test", "0")
.setSource("{}", XContentType.JSON).setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); .setSource("{}", XContentType.JSON).setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get();
assertFalse(shard.shouldFlush()); assertFalse(shard.shouldPeriodicallyFlush());
shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
SourceToParse.source("test", "test", "1", new BytesArray("{}"), XContentType.JSON), SourceToParse.source("test", "test", "1", new BytesArray("{}"), XContentType.JSON),
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, update -> {}); IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, update -> {});
assertTrue(shard.shouldFlush()); assertTrue(shard.shouldPeriodicallyFlush());
final Translog translog = shard.getEngine().getTranslog(); final Translog translog = shard.getEngine().getTranslog();
assertEquals(2, translog.uncommittedOperations()); assertEquals(2, translog.uncommittedOperations());
client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON) client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON)
.setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get();
assertBusy(() -> { // this is async assertBusy(() -> { // this is async
assertFalse(shard.shouldFlush()); assertFalse(shard.shouldPeriodicallyFlush());
}); });
assertEquals(0, translog.uncommittedOperations()); assertEquals(0, translog.uncommittedOperations());
translog.sync(); translog.sync();
@ -364,7 +362,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
assertBusy(() -> { // this is async assertBusy(() -> { // this is async
logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(), logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(),
translog.uncommittedOperations(), translog.getGeneration()); translog.uncommittedOperations(), translog.getGeneration());
assertFalse(shard.shouldFlush()); assertFalse(shard.shouldPeriodicallyFlush());
}); });
assertEquals(0, translog.uncommittedOperations()); assertEquals(0, translog.uncommittedOperations());
} }
@ -408,7 +406,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService(resolveIndex("test")); IndexService test = indicesService.indexService(resolveIndex("test"));
final IndexShard shard = test.getShardOrNull(0); final IndexShard shard = test.getShardOrNull(0);
assertFalse(shard.shouldFlush()); assertFalse(shard.shouldPeriodicallyFlush());
final String key; final String key;
final boolean flush = randomBoolean(); final boolean flush = randomBoolean();
if (flush) { if (flush) {
@ -423,7 +421,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
.setSource("{}", XContentType.JSON) .setSource("{}", XContentType.JSON)
.setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE) .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE)
.get(); .get();
assertFalse(shard.shouldFlush()); assertFalse(shard.shouldPeriodicallyFlush());
final AtomicBoolean running = new AtomicBoolean(true); final AtomicBoolean running = new AtomicBoolean(true);
final int numThreads = randomIntBetween(2, 4); final int numThreads = randomIntBetween(2, 4);
final Thread[] threads = new Thread[numThreads]; final Thread[] threads = new Thread[numThreads];

View File

@ -19,6 +19,7 @@
package org.elasticsearch.indices.recovery; package org.elasticsearch.indices.recovery;
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriter;
@ -306,4 +307,30 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
} }
} }
/**
* This test makes sure that there is no infinite loop of flushing (the condition `shouldPeriodicallyFlush` eventually is false)
* in peer-recovery if a primary sends a fully-baked index commit.
*/
public void testShouldFlushAfterPeerRecovery() throws Exception {
try (ReplicationGroup shards = createGroup(0)) {
shards.startAll();
int numDocs = shards.indexDocs(between(10, 100));
final long translogSizeOnPrimary = shards.getPrimary().getTranslog().uncommittedSizeInBytes();
shards.flush();
final IndexShard replica = shards.addReplica();
IndexMetaData.Builder builder = IndexMetaData.builder(replica.indexSettings().getIndexMetaData());
long flushThreshold = RandomNumbers.randomLongBetween(random(), 100, translogSizeOnPrimary);
builder.settings(Settings.builder().put(replica.indexSettings().getSettings())
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")
);
replica.indexSettings().updateIndexMetaData(builder.build());
replica.onSettingsChanged();
shards.recoverReplica(replica);
// Make sure the flushing will eventually be completed (eg. `shouldPeriodicallyFlush` is false)
assertBusy(() -> assertThat(getEngine(replica).shouldPeriodicallyFlush(), equalTo(false)));
assertThat(replica.getTranslog().totalOperations(), equalTo(numDocs));
shards.assertAllEqual(numDocs);
}
}
} }