diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index 93b4ded9d05..2fc7a242db1 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -288,7 +288,6 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone indicesLifecycle.afterIndexShardCreated(indexShard); settingsService.addListener(indexShard); shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); - indexServicesProvider.getIndexingMemoryController().forceCheck(); success = true; return indexShard; } catch (IOException e) { diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 567f1e01eed..d55300383f2 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -40,6 +40,7 @@ import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.IndicesWarmer; +import org.elasticsearch.indices.memory.IndexingMemoryController; import org.elasticsearch.threadpool.ThreadPool; import java.util.concurrent.TimeUnit; @@ -107,7 +108,6 @@ public final class EngineConfig { public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS); public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60); - public static final ByteSizeValue DEFAULT_INDEX_BUFFER_SIZE = new ByteSizeValue(64, ByteSizeUnit.MB); public static final String DEFAULT_VERSION_MAP_SIZE = "25%"; @@ -138,7 +138,8 @@ public final class EngineConfig { this.failedEngineListener = failedEngineListener; this.compoundOnFlush = indexSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush); codecName = indexSettings.get(EngineConfig.INDEX_CODEC_SETTING, EngineConfig.DEFAULT_CODEC_NAME); - indexingBufferSize = DEFAULT_INDEX_BUFFER_SIZE; + // We start up inactive and rely on IndexingMemoryController to give us our fair share once we start indexing: + indexingBufferSize = IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER; gcDeletesInMillis = indexSettings.getAsTime(INDEX_GC_DELETES_SETTING, EngineConfig.DEFAULT_GC_DELETES).millis(); versionMapSizeSetting = indexSettings.get(INDEX_VERSION_MAP_SIZE, DEFAULT_VERSION_MAP_SIZE); updateVersionMapSize(); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 0e7d61b99ab..e274636dc2c 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -259,9 +259,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett if (mapperService.hasMapping(PercolatorService.TYPE_NAME)) { percolatorQueriesRegistry.enableRealTimePercolator(); } - - lastWriteNS = System.nanoTime(); - active.set(true); + + // We start up inactive + active.set(false); } public Store store() { @@ -1057,16 +1057,18 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett /** Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last * indexing operation, and become inactive (reducing indexing and translog buffers to tiny values) if so. This returns true - * if the shard did in fact become inactive, else false. */ + * if the shard is inactive. */ public boolean checkIdle(long inactiveTimeNS) { - if (System.nanoTime() - lastWriteNS >= inactiveTimeNS && active.getAndSet(false)) { - updateBufferSize(IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER); - logger.debug("shard is now inactive"); - indicesLifecycle.onShardInactive(this); - return true; + if (System.nanoTime() - lastWriteNS >= inactiveTimeNS) { + boolean wasActive = active.getAndSet(false); + if (wasActive) { + updateBufferSize(IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER); + logger.debug("shard is now inactive"); + indicesLifecycle.onShardInactive(this); + } } - return false; + return active.get() == false; } /** Returns {@code true} if this shard is active (has seen indexing ops in the last {@link diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java index 2f5baa5b856..30ab8144e1e 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog.TranslogGeneration; +import org.elasticsearch.indices.memory.IndexingMemoryController; import org.elasticsearch.threadpool.ThreadPool; import java.nio.file.Path; @@ -43,8 +44,6 @@ public final class TranslogConfig { public static final String INDEX_TRANSLOG_BUFFER_SIZE = "index.translog.fs.buffer_size"; public static final String INDEX_TRANSLOG_SYNC_INTERVAL = "index.translog.sync_interval"; - public static final ByteSizeValue DEFAULT_SHARD_TRANSLOG_BUFFER_SIZE = ByteSizeValue.parseBytesSizeValue("64k", INDEX_TRANSLOG_BUFFER_SIZE); - private final TimeValue syncInterval; private final BigArrays bigArrays; private final ThreadPool threadPool; @@ -74,7 +73,7 @@ public final class TranslogConfig { this.threadPool = threadPool; this.bigArrays = bigArrays; this.type = TranslogWriter.Type.fromString(indexSettings.get(INDEX_TRANSLOG_FS_TYPE, TranslogWriter.Type.BUFFERED.name())); - this.bufferSize = (int) indexSettings.getAsBytesSize(INDEX_TRANSLOG_BUFFER_SIZE, DEFAULT_SHARD_TRANSLOG_BUFFER_SIZE).bytes(); // Not really interesting, updated by IndexingMemoryController... + this.bufferSize = (int) indexSettings.getAsBytesSize(INDEX_TRANSLOG_BUFFER_SIZE, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER).bytes(); // Not really interesting, updated by IndexingMemoryController... syncInterval = indexSettings.getAsTime(INDEX_TRANSLOG_SYNC_INTERVAL, TimeValue.timeValueSeconds(5)); if (syncInterval.millis() > 0 && threadPool != null) { diff --git a/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java b/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java index 5aac772eb06..d3d9e961a61 100644 --- a/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java @@ -123,9 +123,9 @@ public class IndexingMemoryControllerTests extends ESTestCase { public void simulateIndexing(ShardId shardId) { lastIndexTimeNanos.put(shardId, currentTimeInNanos()); if (indexingBuffers.containsKey(shardId) == false) { - // First time we are indexing into this shard; start it off with default indexing buffer: - indexingBuffers.put(shardId, EngineConfig.DEFAULT_INDEX_BUFFER_SIZE); - translogBuffers.put(shardId, TranslogConfig.DEFAULT_SHARD_TRANSLOG_BUFFER_SIZE); + // First time we are seeing this shard; start it off with inactive buffers as IndexShard does: + indexingBuffers.put(shardId, IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER); + translogBuffers.put(shardId, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER); } activeShards.add(shardId); forceCheck();