diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 292ef01b8d1..fee033feef3 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -189,6 +189,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett private final IndexSearcherWrapper searcherWrapper; + /** True if this shard is still indexing (recently) and false if we've been idle for long enough (as periodically checked by {@link + * IndexingMemoryController}). */ + private final AtomicBoolean active = new AtomicBoolean(); + + private volatile long lastWriteNS; private final IndexingMemoryController indexingMemoryController; @Inject @@ -450,6 +455,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett */ public boolean index(Engine.Index index) { ensureWriteAllowed(index); + markLastWrite(index); index = indexingService.preIndex(index); final boolean created; try { @@ -474,6 +480,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett public void delete(Engine.Delete delete) { ensureWriteAllowed(delete); + markLastWrite(delete); delete = indexingService.preDelete(delete); try { if (logger.isTraceEnabled()) { @@ -883,6 +890,22 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett } } + /** Returns timestamp of last indexing operation */ + public long getLastWriteNS() { + return lastWriteNS; + } + + /** Records timestamp of the last write operation, possibly switching {@code active} to true if we were inactive. */ + private void markLastWrite(Engine.Operation op) { + lastWriteNS = op.startTime(); + if (active.getAndSet(true) == false) { + // We are currently inactive, but a new write operation just showed up, so we now notify IMC + // to wake up and fix our indexing buffer. We could do this async instead, but cost should + // be low, and it's rare this happens. + indexingMemoryController.forceCheck(); + } + } + private void ensureWriteAllowed(Engine.Operation op) throws IllegalIndexShardStateException { Engine.Operation.Origin origin = op.origin(); IndexShardState state = this.state; // one time volatile read @@ -954,6 +977,24 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett return engine.indexBufferRAMBytesUsed(); } + /** Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last + * indexing operation, and become inactive (reducing indexing and translog buffers to tiny values) if so. */ + public void checkIdle(long inactiveTimeNS) { + if (System.nanoTime() - lastWriteNS >= inactiveTimeNS) { + boolean wasActive = active.getAndSet(false); + if (wasActive) { + logger.debug("shard is now inactive"); + indicesLifecycle.onShardInactive(this); + } + } + } + + /** Returns {@code true} if this shard is active (has seen indexing ops in the last {@link + * IndexingMemoryController#SHARD_INACTIVE_TIME_SETTING} (default 5 minutes), else {@code false}. */ + public boolean getActive() { + return active.get(); + } + public final boolean isFlushOnClose() { return flushOnClose; } diff --git a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java b/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java index aebaeccf483..821311ad636 100644 --- a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java +++ b/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java @@ -51,6 +51,9 @@ public class IndexingMemoryController extends AbstractLifecycleComponentindices.memory.index_buffer_size is a %, to set a ceiling on the actual size in bytes (default: not set). */ public static final String MAX_INDEX_BUFFER_SIZE_SETTING = "indices.memory.max_index_buffer_size"; + /** If we see no indexing operations after this much time for a given shard, we consider that shard inactive (default: 5 minutes). */ + public static final String SHARD_INACTIVE_TIME_SETTING = "indices.memory.shard_inactive_time"; + /** How frequently we check indexing memory usage (default: 5 seconds). */ public static final String SHARD_MEMORY_INTERVAL_TIME_SETTING = "indices.memory.interval"; @@ -62,6 +65,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent 0) { - totalBytesUsed += shardBytesUsed; - } + + // Give shard a chance to transition to inactive so sync'd flush can happen: + checkIdle(shardId, inactiveTime.nanos()); + + totalBytesUsed += getIndexBufferRAMBytesUsed(shardId); } if (totalBytesUsed > indexingBuffer.bytes()) { diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index a224bc87fec..2b929f76ad3 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -91,6 +91,7 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.index.translog.TranslogTests; +import org.elasticsearch.indices.memory.IndexingMemoryController; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -1564,7 +1565,8 @@ public class InternalEngineTests extends ESTestCase { public void testDeletesAloneCanTriggerRefresh() throws Exception { Settings settings = Settings.builder() .put(defaultSettings) - .put(EngineConfig.INDEX_BUFFER_SIZE_SETTING, "1kb").build(); + .put(EngineConfig.INDEX_BUFFER_SIZE_SETTING, "1kb") + .put(IndexingMemoryController.SHARD_MEMORY_INTERVAL_TIME_SETTING, "100ms").build(); try (Store store = createStore(); Engine engine = new InternalEngine(config(settings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()), false)) { for (int i = 0; i < 100; i++) { diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index ce3e4e51aba..fd402130092 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -339,6 +339,8 @@ public class IndexShardTests extends ESSingleNodeTestCase { client().prepareIndex("test", "test").setSource("{}").get(); ensureGreen("test"); IndicesService indicesService = getInstanceFromNode(IndicesService.class); + // force the shard to become idle now: + indicesService.indexService("test").getShardOrNull(0).checkIdle(0); assertBusy(new Runnable() { // should be very very quick @Override public void run() { diff --git a/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java b/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java index 033d625a26d..2d54fb0a1fd 100644 --- a/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java @@ -75,6 +75,10 @@ public class IndexingMemoryControllerTests extends ESTestCase { } } + @Override + protected void checkIdle(ShardId shardId, long inactiveTimeNS) { + } + @Override public void refreshShardAsync(ShardId shardId) { indexBufferRAMBytesUsed.put(shardId, 0L); @@ -103,7 +107,6 @@ public class IndexingMemoryControllerTests extends ESTestCase { final ShardId shard1 = new ShardId("test", 1); controller.simulateIndexing(shard1); controller.assertBuffer(shard1, new ByteSizeValue(1, ByteSizeUnit.MB)); - // add another shard final ShardId shard2 = new ShardId("test", 2); @@ -144,7 +147,8 @@ public class IndexingMemoryControllerTests extends ESTestCase { controller.assertBuffer(shard1, new ByteSizeValue(2, ByteSizeUnit.MB)); controller.assertBuffer(shard2, new ByteSizeValue(2, ByteSizeUnit.MB)); - // index into one shard only, hits the 5mb limit, so shard1 is refreshed + // index into one shard only, crosses the 5mb limit, so shard1 is refreshed + controller.simulateIndexing(shard1); controller.simulateIndexing(shard1); controller.assertBuffer(shard1, new ByteSizeValue(0, ByteSizeUnit.MB)); controller.assertBuffer(shard2, new ByteSizeValue(2, ByteSizeUnit.MB)); @@ -153,7 +157,8 @@ public class IndexingMemoryControllerTests extends ESTestCase { controller.simulateIndexing(shard2); controller.assertBuffer(shard2, new ByteSizeValue(4, ByteSizeUnit.MB)); controller.simulateIndexing(shard2); - // shard2 used up the full 5 mb and is now cleared: + controller.simulateIndexing(shard2); + // shard2 crossed 5 mb and is now cleared: controller.assertBuffer(shard2, new ByteSizeValue(0, ByteSizeUnit.MB)); }