From d121550a4f908982e340abbb1bd83e581a5ff652 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 24 Sep 2015 09:26:12 +0200 Subject: [PATCH] Internal: pending operations in the translog prevent shard from being marked as inactive The IndexingMemoryController checks periodically if there is any indexing activity on the shard. If no activity is sean for 5m (default) the shard is marked as inactive allowing it's indexing buffer quota to given to other active shards. Sadly the current check is bad as it checks for 0 translog operation. This makes the inactive wait for a flush to happen - which used to take 30m and since #13707 doesn't happen at all (as we rely on the synced flush triggered by inactivity). This commit fixes the check so it will work with any translog size. Closes #13759 --- .../memory/IndexingMemoryController.java | 11 ++++------- .../memory/IndexingMemoryControllerIT.java | 18 ++++++++++++++---- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java b/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java index 2bf6946bc32..f54020802b5 100644 --- a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java +++ b/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java @@ -38,12 +38,7 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.threadpool.ThreadPool; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ScheduledFuture; /** @@ -258,7 +253,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent indexing some pending operations"); + indexRandom(false, client().prepareIndex("test1", "type", "0").setSource("f", "0")); + } + boolean success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes()); if (!success) { fail("failed to update shard indexing buffer size due to inactive state. expected [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" + @@ -97,12 +104,15 @@ public class IndexingMemoryControllerIT extends ESIntegTestCase { success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() > EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes()); if (!success) { - fail("failed to update shard indexing buffer size due to inactive state. expected something larger then [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" + + fail("failed to update shard indexing buffer size due to active state. expected something larger then [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" + shard1.engine().config().getIndexingBufferSize().bytes() + "]" ); } - flush(); // clean translogs + if (randomBoolean()) { + logger.info("--> flushing translogs"); + flush(); // clean translogs + } success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes()); if (!success) {