From 8be506224dfebed36b2acfc5186a5728efe8b713 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 26 Oct 2015 10:37:11 +0100 Subject: [PATCH] Push last modified timestamp to engine and use a time delta to flush merges --- .../elasticsearch/index/engine/Engine.java | 14 +++ .../index/engine/EngineConfig.java | 18 ++-- .../index/engine/InternalEngine.java | 46 +++++++--- .../elasticsearch/index/shard/IndexShard.java | 20 ++--- .../memory/IndexingMemoryController.java | 4 + .../index/engine/InternalEngineTests.java | 89 ++++++++++++------- .../index/engine/ShadowEngineTests.java | 3 +- .../shard/IndexSearcherWrapperTests.java | 3 +- 8 files changed, 122 insertions(+), 75 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index a035b33c18e..8fabb570df0 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -54,6 +54,7 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -79,6 +80,7 @@ public abstract class Engine implements Closeable { protected final ReleasableLock readLock = new ReleasableLock(rwl.readLock()); protected final ReleasableLock writeLock = new ReleasableLock(rwl.writeLock()); protected volatile Throwable failedEngine = null; + protected volatile long lastWriteNanos; protected Engine(EngineConfig engineConfig) { Objects.requireNonNull(engineConfig.getStore(), "Store must be provided to the engine"); @@ -1043,4 +1045,16 @@ public abstract class Engine implements Closeable { public void onSettingsChanged() { } + + /** + * Retuns the timestamp of the last write in nanoseconds. + * Note: this time might not be absolutely accurate since the {@link Operation#startTime()} is used which might be + * slightly inaccurate. + * @see System#nanoTime() + * @see Operation#startTime() + * @return + */ + public long getLastWriteNanos() { + return this.lastWriteNanos; + } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 4fd6f36b852..6cc5ac12ada 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -57,7 +57,7 @@ public final class EngineConfig { private volatile boolean compoundOnFlush = true; private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis(); private volatile boolean enableGcDeletes = true; - private volatile boolean flushWhenLastMergeFinished = false; + private final TimeValue flushMergesAfter = TimeValue.timeValueMinutes(5); private final String codecName; private final ThreadPool threadPool; private final ShardIndexingService indexingService; @@ -119,7 +119,7 @@ public final class EngineConfig { Settings indexSettings, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy, MergePolicy mergePolicy, MergeSchedulerConfig mergeSchedulerConfig, Analyzer analyzer, Similarity similarity, CodecService codecService, Engine.EventListener eventListener, - TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogConfig translogConfig) { + TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogConfig translogConfig, TimeValue flushMergesAfter) { this.shardId = shardId; this.indexSettings = indexSettings; this.threadPool = threadPool; @@ -402,16 +402,10 @@ public final class EngineConfig { } /** - * Returns true iff then engine should be flushed once the last merged finished. + * Returns a {@link TimeValue} at what time interval after the last write modification to the engine finished merges + * should be automatically flushed. This is used to free up transient disk usage of potentially large segments that + * are written after the engine became inactive from an indexing perspective. */ - public boolean isFlushWhenLastMergeFinished() { - return flushWhenLastMergeFinished; - } + public TimeValue getFlushMergesAfter() { return flushMergesAfter; } - /** - * Set to true iff then engine should be flushed once the last merged finished. - */ - public void setFlushWhenLastMergeFinished(boolean flushWhenLastMergeFinished) { - this.flushWhenLastMergeFinished = flushWhenLastMergeFinished; - } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 7fcfb5ac88c..f7e9ac17746 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -353,6 +353,7 @@ public class InternalEngine extends Engine { private boolean innerIndex(Index index) throws IOException { synchronized (dirtyLock(index.uid())) { + lastWriteNanos = index.startTime(); final long currentVersion; final boolean deleted; VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes()); @@ -464,6 +465,7 @@ public class InternalEngine extends Engine { private void innerDelete(Delete delete) throws IOException { synchronized (dirtyLock(delete.uid())) { + lastWriteNanos = delete.startTime(); final long currentVersion; final boolean deleted; VersionValue versionValue = versionMap.getUnderLock(delete.uid().bytes()); @@ -570,6 +572,7 @@ public class InternalEngine extends Engine { } final boolean tryRenewSyncCommit() { + boolean renewed = false; try (ReleasableLock lock = writeLock.acquire()) { ensureOpen(); String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID); @@ -578,13 +581,17 @@ public class InternalEngine extends Engine { commitIndexWriter(indexWriter, translog, syncId); logger.debug("successfully sync committed. sync id [{}].", syncId); lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); - return true; + renewed = true; } - return false; } catch (IOException ex) { maybeFailEngine("renew sync commit", ex); throw new EngineException(shardId, "failed to renew sync commit", ex); } + if (renewed) { // refresh outside of the write lock + refresh("version_table_flush"); + } + + return renewed; } @Override @@ -1073,20 +1080,31 @@ public class InternalEngine extends Engine { deactivateThrottling(); } } - if (engineConfig.isFlushWhenLastMergeFinished() && indexWriter.hasPendingMerges() == false) { - // if we have no pending merges and we are supposed to flush once merges have finished - // we try to renew a sync commit which is the case when we are having a big merge after we - // are inactive. If that didn't work we go and do a real flush which is ok since it only doesn't work - // if we either have records in the translog or if we don't have a sync ID at all... - try { - if (tryRenewSyncCommit() == false) { - flush(); + if (indexWriter.hasPendingMerges() == false && System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) { + // NEVER do this on a merge thread since we acquire some locks blocking here and if we concurrently rollback the writer + // we deadlock on engine#close for instance. + engineConfig.getThreadPool().executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() { + @Override + public void onFailure(Throwable t) { + if (isClosed.get() == false) { + logger.warn("failed to flush after merge has finished"); + } } - } catch (EngineClosedException | EngineException ex) { - if (isClosed.get() == false) { - logger.warn("failed to flush after merge has finished"); + + @Override + protected void doRun() throws Exception { + // if we have no pending merges and we are supposed to flush once merges have finished + // we try to renew a sync commit which is the case when we are having a big merge after we + // are inactive. If that didn't work we go and do a real flush which is ok since it only doesn't work + // if we either have records in the translog or if we don't have a sync ID at all... + // maybe even more important, we flush after all merges finish and we are inactive indexing-wise to + // free up transient disk usage of the (presumably biggish) segments that were just merged + if (tryRenewSyncCommit() == false) { + flush(); + } } - } + }); + } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index da2d71a7076..f0772f88603 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -193,7 +193,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett * IndexingMemoryController}). */ private final AtomicBoolean active = new AtomicBoolean(); - private volatile long lastWriteNS; private final IndexingMemoryController indexingMemoryController; @Inject @@ -458,7 +457,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett */ public boolean index(Engine.Index index) { ensureWriteAllowed(index); - markLastWrite(index); + markLastWrite(); index = indexingService.preIndex(index); final boolean created; try { @@ -483,7 +482,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett public void delete(Engine.Delete delete) { ensureWriteAllowed(delete); - markLastWrite(delete); + markLastWrite(); delete = indexingService.preDelete(delete); try { if (logger.isTraceEnabled()) { @@ -903,16 +902,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett } } - /** Returns timestamp of last indexing operation */ - public long getLastWriteNS() { - return lastWriteNS; - } - /** Records timestamp of the last write operation, possibly switching {@code active} to true if we were inactive. */ - private void markLastWrite(Engine.Operation op) { - lastWriteNS = op.startTime(); + private void markLastWrite() { if (active.getAndSet(true) == false) { - engineConfig.setFlushWhenLastMergeFinished(false); // We are currently inactive, but a new write operation just showed up, so we now notify IMC // to wake up and fix our indexing buffer. We could do this async instead, but cost should // be low, and it's rare this happens. @@ -1031,9 +1023,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett * indexing operation, and become inactive (reducing indexing and translog buffers to tiny values) if so. This returns true * if the shard is inactive. */ public boolean checkIdle(long inactiveTimeNS) { - if (System.nanoTime() - lastWriteNS >= inactiveTimeNS) { + Engine engineOrNull = getEngineOrNull(); + if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) { boolean wasActive = active.getAndSet(false); - engineConfig.setFlushWhenLastMergeFinished(true); if (wasActive) { updateBufferSize(IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER); logger.debug("shard is now inactive"); @@ -1463,7 +1455,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett }; return new EngineConfig(shardId, threadPool, indexingService, indexSettings, warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig, - mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig); + mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, indexingMemoryController.getInactiveTime()); } private static class IndexShardOperationCounter extends AbstractRefCounted { diff --git a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java b/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java index 90bb4c41a38..1e3ede365fa 100644 --- a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java +++ b/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java @@ -422,4 +422,8 @@ public class IndexingMemoryController extends AbstractLifecycleComponent assertEquals(1, engine.segments(false).size())); + } assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); - assertTrue(engine.tryRenewSyncCommit()); - } - engine.refresh("let old segments go"); - assertEquals(1, engine.segments(false).size()); - assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); - assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); - engine.index(new Engine.Index(newUid("4"), doc)); - assertFalse(engine.tryRenewSyncCommit()); - engine.flush(); - assertNull(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID)); - assertNull(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID)); + if (randomBoolean()) { + Engine.Index doc4 = new Engine.Index(newUid("4"), doc); + engine.index(doc4); + assertEquals(engine.getLastWriteNanos(), doc4.startTime()); + } else { + Engine.Delete delete = new Engine.Delete(doc1.type(), doc1.id(), doc1.uid()); + engine.delete(delete); + assertEquals(engine.getLastWriteNanos(), delete.startTime()); + } + assertFalse(engine.tryRenewSyncCommit()); + engine.flush(); + assertNull(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID)); + assertNull(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID)); + } } } @@ -1948,7 +1971,7 @@ public class InternalEngineTests extends ESTestCase { EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexingService(), config.getIndexSettings() , null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getMergeSchedulerConfig(), config.getAnalyzer(), config.getSimilarity(), new CodecService(shardId.index()), config.getEventListener() - , config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig); + , config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5)); try { new InternalEngine(brokenConfig, false); diff --git a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java index 525c9d416aa..3023176bad4 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java @@ -43,6 +43,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.index.Index; import org.elasticsearch.index.codec.CodecService; @@ -226,7 +227,7 @@ public class ShadowEngineTests extends ESTestCase { @Override public void onFailedEngine(String reason, @Nullable Throwable t) { // we don't need to notify anybody in this test - }}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig); + }}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5)); try { config.setCreate(Lucene.indexExists(store.directory()) == false); } catch (IOException e) { diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java index e8a7a75b1d6..288a691b88b 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java @@ -32,6 +32,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.util.IOUtils; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineException; @@ -45,7 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger; /** */ public class IndexSearcherWrapperTests extends ESTestCase { - private static final EngineConfig ENGINE_CONFIG = new EngineConfig(null, null, null, Settings.EMPTY, null, null, null, null, null, null, new DefaultSimilarity(), null, null, null, null, QueryCachingPolicy.ALWAYS_CACHE, null); + private static final EngineConfig ENGINE_CONFIG = new EngineConfig(null, null, null, Settings.EMPTY, null, null, null, null, null, null, new DefaultSimilarity(), null, null, null, null, QueryCachingPolicy.ALWAYS_CACHE, null, TimeValue.timeValueMinutes(5)); public void testReaderCloseListenerIsCalled() throws IOException { Directory dir = newDirectory();