Flush big merges automatically if shard is inactive

Today if a shard is marked as inactive after a heavy indexing period
large merges are very likely. Yet, those merges are never committed today
since time-based flush has been removed and unless the shard becomes active
again we won't revisit it.
Yet, inactive shards have very likely been sync-flushed before such that we need
to maintain the sync id if possible which this change tries on a per shard basis.
This commit is contained in:
Simon Willnauer 2015-10-24 22:45:17 +02:00
parent e2be57f395
commit 885af39fe2
5 changed files with 91 additions and 10 deletions

View File

@ -562,6 +562,9 @@ public abstract class Engine implements Closeable {
public interface EventListener {
/**
* Called when a fatal exception occurred
*/
default void onFailedEngine(String reason, @Nullable Throwable t) {}
}

View File

@ -57,6 +57,7 @@ public final class EngineConfig {
private volatile boolean compoundOnFlush = true;
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
private volatile boolean enableGcDeletes = true;
private volatile boolean flushWhenLastMergeFinished = false;
private final String codecName;
private final ThreadPool threadPool;
private final ShardIndexingService indexingService;
@ -399,4 +400,18 @@ public final class EngineConfig {
public boolean isCreate() {
return create;
}
/**
* Returns <code>true</code> iff then engine should be flushed once the last merged finished.
*/
public boolean isFlushWhenLastMergeFinished() {
return flushWhenLastMergeFinished;
}
/**
* Set to <code>true</code> iff then engine should be flushed once the last merged finished.
*/
public void setFlushWhenLastMergeFinished(boolean flushWhenLastMergeFinished) {
this.flushWhenLastMergeFinished = flushWhenLastMergeFinished;
}
}

View File

@ -569,6 +569,24 @@ public class InternalEngine extends Engine {
}
}
final boolean tryRenewSyncCommit() {
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID);
if (syncId != null && translog.totalOperations() == 0 && indexWriter.hasUncommittedChanges()) {
logger.trace("start renewing sync commit [{}]", syncId);
commitIndexWriter(indexWriter, translog, syncId);
logger.debug("successfully sync committed. sync id [{}].", syncId);
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
return true;
}
return false;
} catch (IOException ex) {
maybeFailEngine("renew sync commit", ex);
throw new EngineException(shardId, "failed to renew sync commit", ex);
}
}
@Override
public CommitId flush() throws EngineException {
return flush(false, false);
@ -1055,6 +1073,21 @@ public class InternalEngine extends Engine {
deactivateThrottling();
}
}
if (engineConfig.isFlushWhenLastMergeFinished() && indexWriter.hasPendingMerges() == false) {
// if we have no pending merges and we are supposed to flush once merges have finished
// we try to renew a sync commit which is the case when we are having a big merge after we
// are inactive. If that didn't work we go and do a real flush which is ok since it only doesn't work
// if we either have records in the translog or if we don't have a sync ID at all...
try {
if (tryRenewSyncCommit() == false) {
flush();
}
} catch (EngineClosedException | EngineException ex) {
if (isClosed.get() == false) {
logger.warn("failed to flush after merge has finished");
}
}
}
}
@Override

View File

@ -912,6 +912,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
private void markLastWrite(Engine.Operation op) {
lastWriteNS = op.startTime();
if (active.getAndSet(true) == false) {
engineConfig.setFlushWhenLastMergeFinished(false);
// We are currently inactive, but a new write operation just showed up, so we now notify IMC
// to wake up and fix our indexing buffer. We could do this async instead, but cost should
// be low, and it's rare this happens.
@ -1032,6 +1033,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
public boolean checkIdle(long inactiveTimeNS) {
if (System.nanoTime() - lastWriteNS >= inactiveTimeNS) {
boolean wasActive = active.getAndSet(false);
engineConfig.setFlushWhenLastMergeFinished(true);
if (wasActive) {
updateBufferSize(IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER);
logger.debug("shard is now inactive");

View File

@ -28,16 +28,7 @@ import org.apache.lucene.codecs.Codec;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.LogByteSizeMergePolicy;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.index.*;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TermQuery;
@ -794,6 +785,43 @@ public class InternalEngineTests extends ESTestCase {
}
}
public void testRenewSyncFlush() throws IOException {
try (Store store = createStore();
InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings),
new LogDocMergePolicy()), false)) {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.index(new Engine.Index(newUid("1"), doc));
engine.flush();
engine.index(new Engine.Index(newUid("2"), doc));
engine.flush();
engine.index(new Engine.Index(newUid("3"), doc));
Engine.CommitId commitID = engine.flush();
assertEquals("should succeed to flush commit with right id and no pending doc", engine.syncFlush(syncId, commitID),
Engine.SyncedFlushResult.SUCCESS);
assertEquals(3, engine.segments(false).size());
engine.engineConfig.setFlushWhenLastMergeFinished(randomBoolean());
engine.forceMerge(false, 1, false, false, false);
if (engine.engineConfig.isFlushWhenLastMergeFinished() == false) {
engine.refresh("make all segments visible");
assertEquals(4, engine.segments(false).size());
assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
assertTrue(engine.tryRenewSyncCommit());
}
engine.refresh("let old segments go");
assertEquals(1, engine.segments(false).size());
assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
engine.index(new Engine.Index(newUid("4"), doc));
assertFalse(engine.tryRenewSyncCommit());
engine.flush();
assertNull(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID));
assertNull(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID));
}
}
public void testSycnedFlushSurvivesEngineRestart() throws IOException {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);