Push last modified timestamp to engine and use a time delta to flush merges

This commit is contained in:
Simon Willnauer 2015-10-26 10:37:11 +01:00
parent 885af39fe2
commit 8be506224d
8 changed files with 122 additions and 75 deletions

View File

@ -54,6 +54,7 @@ import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -79,6 +80,7 @@ public abstract class Engine implements Closeable {
protected final ReleasableLock readLock = new ReleasableLock(rwl.readLock());
protected final ReleasableLock writeLock = new ReleasableLock(rwl.writeLock());
protected volatile Throwable failedEngine = null;
protected volatile long lastWriteNanos;
protected Engine(EngineConfig engineConfig) {
Objects.requireNonNull(engineConfig.getStore(), "Store must be provided to the engine");
@ -1043,4 +1045,16 @@ public abstract class Engine implements Closeable {
public void onSettingsChanged() {
}
/**
* Retuns the timestamp of the last write in nanoseconds.
* Note: this time might not be absolutely accurate since the {@link Operation#startTime()} is used which might be
* slightly inaccurate.
* @see System#nanoTime()
* @see Operation#startTime()
* @return
*/
public long getLastWriteNanos() {
return this.lastWriteNanos;
}
}

View File

@ -57,7 +57,7 @@ public final class EngineConfig {
private volatile boolean compoundOnFlush = true;
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
private volatile boolean enableGcDeletes = true;
private volatile boolean flushWhenLastMergeFinished = false;
private final TimeValue flushMergesAfter = TimeValue.timeValueMinutes(5);
private final String codecName;
private final ThreadPool threadPool;
private final ShardIndexingService indexingService;
@ -119,7 +119,7 @@ public final class EngineConfig {
Settings indexSettings, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy,
MergePolicy mergePolicy, MergeSchedulerConfig mergeSchedulerConfig, Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogConfig translogConfig) {
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogConfig translogConfig, TimeValue flushMergesAfter) {
this.shardId = shardId;
this.indexSettings = indexSettings;
this.threadPool = threadPool;
@ -402,16 +402,10 @@ public final class EngineConfig {
}
/**
* Returns <code>true</code> iff then engine should be flushed once the last merged finished.
* Returns a {@link TimeValue} at what time interval after the last write modification to the engine finished merges
* should be automatically flushed. This is used to free up transient disk usage of potentially large segments that
* are written after the engine became inactive from an indexing perspective.
*/
public boolean isFlushWhenLastMergeFinished() {
return flushWhenLastMergeFinished;
}
public TimeValue getFlushMergesAfter() { return flushMergesAfter; }
/**
* Set to <code>true</code> iff then engine should be flushed once the last merged finished.
*/
public void setFlushWhenLastMergeFinished(boolean flushWhenLastMergeFinished) {
this.flushWhenLastMergeFinished = flushWhenLastMergeFinished;
}
}

View File

@ -353,6 +353,7 @@ public class InternalEngine extends Engine {
private boolean innerIndex(Index index) throws IOException {
synchronized (dirtyLock(index.uid())) {
lastWriteNanos = index.startTime();
final long currentVersion;
final boolean deleted;
VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());
@ -464,6 +465,7 @@ public class InternalEngine extends Engine {
private void innerDelete(Delete delete) throws IOException {
synchronized (dirtyLock(delete.uid())) {
lastWriteNanos = delete.startTime();
final long currentVersion;
final boolean deleted;
VersionValue versionValue = versionMap.getUnderLock(delete.uid().bytes());
@ -570,6 +572,7 @@ public class InternalEngine extends Engine {
}
final boolean tryRenewSyncCommit() {
boolean renewed = false;
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID);
@ -578,13 +581,17 @@ public class InternalEngine extends Engine {
commitIndexWriter(indexWriter, translog, syncId);
logger.debug("successfully sync committed. sync id [{}].", syncId);
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
return true;
renewed = true;
}
return false;
} catch (IOException ex) {
maybeFailEngine("renew sync commit", ex);
throw new EngineException(shardId, "failed to renew sync commit", ex);
}
if (renewed) { // refresh outside of the write lock
refresh("version_table_flush");
}
return renewed;
}
@Override
@ -1073,20 +1080,31 @@ public class InternalEngine extends Engine {
deactivateThrottling();
}
}
if (engineConfig.isFlushWhenLastMergeFinished() && indexWriter.hasPendingMerges() == false) {
// if we have no pending merges and we are supposed to flush once merges have finished
// we try to renew a sync commit which is the case when we are having a big merge after we
// are inactive. If that didn't work we go and do a real flush which is ok since it only doesn't work
// if we either have records in the translog or if we don't have a sync ID at all...
try {
if (tryRenewSyncCommit() == false) {
flush();
if (indexWriter.hasPendingMerges() == false && System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) {
// NEVER do this on a merge thread since we acquire some locks blocking here and if we concurrently rollback the writer
// we deadlock on engine#close for instance.
engineConfig.getThreadPool().executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
if (isClosed.get() == false) {
logger.warn("failed to flush after merge has finished");
}
}
} catch (EngineClosedException | EngineException ex) {
if (isClosed.get() == false) {
logger.warn("failed to flush after merge has finished");
@Override
protected void doRun() throws Exception {
// if we have no pending merges and we are supposed to flush once merges have finished
// we try to renew a sync commit which is the case when we are having a big merge after we
// are inactive. If that didn't work we go and do a real flush which is ok since it only doesn't work
// if we either have records in the translog or if we don't have a sync ID at all...
// maybe even more important, we flush after all merges finish and we are inactive indexing-wise to
// free up transient disk usage of the (presumably biggish) segments that were just merged
if (tryRenewSyncCommit() == false) {
flush();
}
}
}
});
}
}

View File

@ -193,7 +193,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
* IndexingMemoryController}). */
private final AtomicBoolean active = new AtomicBoolean();
private volatile long lastWriteNS;
private final IndexingMemoryController indexingMemoryController;
@Inject
@ -458,7 +457,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
*/
public boolean index(Engine.Index index) {
ensureWriteAllowed(index);
markLastWrite(index);
markLastWrite();
index = indexingService.preIndex(index);
final boolean created;
try {
@ -483,7 +482,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
public void delete(Engine.Delete delete) {
ensureWriteAllowed(delete);
markLastWrite(delete);
markLastWrite();
delete = indexingService.preDelete(delete);
try {
if (logger.isTraceEnabled()) {
@ -903,16 +902,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
}
}
/** Returns timestamp of last indexing operation */
public long getLastWriteNS() {
return lastWriteNS;
}
/** Records timestamp of the last write operation, possibly switching {@code active} to true if we were inactive. */
private void markLastWrite(Engine.Operation op) {
lastWriteNS = op.startTime();
private void markLastWrite() {
if (active.getAndSet(true) == false) {
engineConfig.setFlushWhenLastMergeFinished(false);
// We are currently inactive, but a new write operation just showed up, so we now notify IMC
// to wake up and fix our indexing buffer. We could do this async instead, but cost should
// be low, and it's rare this happens.
@ -1031,9 +1023,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
* indexing operation, and become inactive (reducing indexing and translog buffers to tiny values) if so. This returns true
* if the shard is inactive. */
public boolean checkIdle(long inactiveTimeNS) {
if (System.nanoTime() - lastWriteNS >= inactiveTimeNS) {
Engine engineOrNull = getEngineOrNull();
if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) {
boolean wasActive = active.getAndSet(false);
engineConfig.setFlushWhenLastMergeFinished(true);
if (wasActive) {
updateBufferSize(IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER);
logger.debug("shard is now inactive");
@ -1463,7 +1455,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
};
return new EngineConfig(shardId,
threadPool, indexingService, indexSettings, warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig,
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig);
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, indexingMemoryController.getInactiveTime());
}
private static class IndexShardOperationCounter extends AbstractRefCounted {

View File

@ -422,4 +422,8 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
private static enum ShardStatusChangeType {
ADDED, DELETED, BECAME_ACTIVE, BECAME_INACTIVE
}
public TimeValue getInactiveTime() {
return inactiveTime;
}
}

View File

@ -53,6 +53,7 @@ import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.VersionType;
@ -107,6 +108,7 @@ import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -265,7 +267,7 @@ public class InternalEngineTests extends ESTestCase {
public void onFailedEngine(String reason, @Nullable Throwable t) {
// we don't need to notify anybody in this test
}
}, new TranslogHandler(shardId.index().getName(), logger), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig);
}, new TranslogHandler(shardId.index().getName(), logger), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5));
try {
config.setCreate(Lucene.indexExists(store.directory()) == false);
} catch (IOException e) {
@ -785,40 +787,61 @@ public class InternalEngineTests extends ESTestCase {
}
}
public void testRenewSyncFlush() throws IOException {
try (Store store = createStore();
InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings),
new LogDocMergePolicy()), false)) {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.index(new Engine.Index(newUid("1"), doc));
engine.flush();
engine.index(new Engine.Index(newUid("2"), doc));
engine.flush();
engine.index(new Engine.Index(newUid("3"), doc));
Engine.CommitId commitID = engine.flush();
assertEquals("should succeed to flush commit with right id and no pending doc", engine.syncFlush(syncId, commitID),
Engine.SyncedFlushResult.SUCCESS);
assertEquals(3, engine.segments(false).size());
engine.engineConfig.setFlushWhenLastMergeFinished(randomBoolean());
engine.forceMerge(false, 1, false, false, false);
if (engine.engineConfig.isFlushWhenLastMergeFinished() == false) {
engine.refresh("make all segments visible");
assertEquals(4, engine.segments(false).size());
public void testRenewSyncFlush() throws Exception {
final int iters = randomIntBetween(2, 5); // run this a couple of times to get some coverage
for (int i = 0; i < iters; i++) {
try (Store store = createStore();
InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings),
new LogDocMergePolicy()), false)) {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
Engine.Index doc1 = new Engine.Index(newUid("1"), doc);
engine.index(doc1);
assertEquals(engine.getLastWriteNanos(), doc1.startTime());
engine.flush();
Engine.Index doc2 = new Engine.Index(newUid("2"), doc);
engine.index(doc2);
assertEquals(engine.getLastWriteNanos(), doc2.startTime());
engine.flush();
final boolean forceMergeFlushes = randomBoolean();
if (forceMergeFlushes) {
engine.index(new Engine.Index(newUid("3"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime() - engine.engineConfig.getFlushMergesAfter().nanos()));
} else {
engine.index(new Engine.Index(newUid("3"), doc));
}
Engine.CommitId commitID = engine.flush();
assertEquals("should succeed to flush commit with right id and no pending doc", engine.syncFlush(syncId, commitID),
Engine.SyncedFlushResult.SUCCESS);
assertEquals(3, engine.segments(false).size());
engine.forceMerge(false, 1, false, false, false);
if (forceMergeFlushes == false) {
engine.refresh("make all segments visible");
assertEquals(4, engine.segments(false).size());
assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
assertTrue(engine.tryRenewSyncCommit());
assertEquals(1, engine.segments(false).size());
} else {
assertBusy(() -> assertEquals(1, engine.segments(false).size()));
}
assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
assertTrue(engine.tryRenewSyncCommit());
}
engine.refresh("let old segments go");
assertEquals(1, engine.segments(false).size());
assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
engine.index(new Engine.Index(newUid("4"), doc));
assertFalse(engine.tryRenewSyncCommit());
engine.flush();
assertNull(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID));
assertNull(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID));
if (randomBoolean()) {
Engine.Index doc4 = new Engine.Index(newUid("4"), doc);
engine.index(doc4);
assertEquals(engine.getLastWriteNanos(), doc4.startTime());
} else {
Engine.Delete delete = new Engine.Delete(doc1.type(), doc1.id(), doc1.uid());
engine.delete(delete);
assertEquals(engine.getLastWriteNanos(), delete.startTime());
}
assertFalse(engine.tryRenewSyncCommit());
engine.flush();
assertNull(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID));
assertNull(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID));
}
}
}
@ -1948,7 +1971,7 @@ public class InternalEngineTests extends ESTestCase {
EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexingService(), config.getIndexSettings()
, null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getMergeSchedulerConfig(),
config.getAnalyzer(), config.getSimilarity(), new CodecService(shardId.index()), config.getEventListener()
, config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig);
, config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5));
try {
new InternalEngine(brokenConfig, false);

View File

@ -43,6 +43,7 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.codec.CodecService;
@ -226,7 +227,7 @@ public class ShadowEngineTests extends ESTestCase {
@Override
public void onFailedEngine(String reason, @Nullable Throwable t) {
// we don't need to notify anybody in this test
}}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig);
}}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5));
try {
config.setCreate(Lucene.indexExists(store.directory()) == false);
} catch (IOException e) {

View File

@ -32,6 +32,7 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException;
@ -45,7 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
*/
public class IndexSearcherWrapperTests extends ESTestCase {
private static final EngineConfig ENGINE_CONFIG = new EngineConfig(null, null, null, Settings.EMPTY, null, null, null, null, null, null, new DefaultSimilarity(), null, null, null, null, QueryCachingPolicy.ALWAYS_CACHE, null);
private static final EngineConfig ENGINE_CONFIG = new EngineConfig(null, null, null, Settings.EMPTY, null, null, null, null, null, null, new DefaultSimilarity(), null, null, null, null, QueryCachingPolicy.ALWAYS_CACHE, null, TimeValue.timeValueMinutes(5));
public void testReaderCloseListenerIsCalled() throws IOException {
Directory dir = newDirectory();