diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 4086bebfa88..913c4989d40 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -35,7 +35,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.TranslogConfig; -import org.elasticsearch.indices.memory.IndexingMemoryController; +import org.elasticsearch.indices.IndexingMemoryController; import org.elasticsearch.threadpool.ThreadPool; import java.util.concurrent.TimeUnit; @@ -192,7 +192,7 @@ public final class EngineConfig { } /** - * Returns the initial index buffer size. This setting is only read on startup and otherwise controlled by {@link org.elasticsearch.indices.memory.IndexingMemoryController} + * Returns the initial index buffer size. This setting is only read on startup and otherwise controlled by {@link IndexingMemoryController} */ public ByteSizeValue getIndexingBufferSize() { return indexingBufferSize; diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index db5e1f7e4a5..e4f656dd558 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -111,7 +111,7 @@ import org.elasticsearch.index.warmer.ShardIndexWarmerService; import org.elasticsearch.index.warmer.WarmerStats; import org.elasticsearch.indices.IndicesWarmer; import org.elasticsearch.indices.cache.query.IndicesQueryCache; -import org.elasticsearch.indices.memory.IndexingMemoryController; +import org.elasticsearch.indices.IndexingMemoryController; import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.percolator.PercolatorService; diff --git a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java b/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java similarity index 89% rename from core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java rename to core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java index a72c115835c..d84c3b00255 100644 --- a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java +++ b/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java @@ -17,10 +17,9 @@ * under the License. */ -package org.elasticsearch.indices.memory; +package org.elasticsearch.indices; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -32,16 +31,16 @@ import org.elasticsearch.index.engine.FlushNotAllowedEngineException; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; -import org.elasticsearch.indices.IndicesService; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.threadpool.ThreadPool; +import java.io.Closeable; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; import java.util.concurrent.ScheduledFuture; -public class IndexingMemoryController extends AbstractLifecycleComponent implements IndexEventListener { +public class IndexingMemoryController extends AbstractComponent implements IndexEventListener, Closeable { /** How much heap (% or bytes) we will share across all actively indexing shards on this node (default: 10%). */ public static final String INDEX_BUFFER_SIZE_SETTING = "indices.memory.index_buffer_size"; @@ -70,10 +69,6 @@ public class IndexingMemoryController extends AbstractLifecycleComponent CAN_UPDATE_INDEX_BUFFER_STATES = EnumSet.of( IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED); private final ShardsIndicesStatusChecker statusChecker; - @Inject - public IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService) { + IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService) { this(settings, threadPool, indicesService, JvmInfo.jvmInfo().getMem().getHeapMax().bytes()); } // for testing - protected IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService, long jvmMemoryInBytes) { + IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService, long jvmMemoryInBytes) { super(settings); - this.threadPool = threadPool; this.indicesService = indicesService; ByteSizeValue indexingBuffer; @@ -131,29 +124,24 @@ public class IndexingMemoryController extends AbstractLifecycleComponent scheduleTask(ThreadPool threadPool) { // it's fine to run it on the scheduler thread, no busy work - this.scheduler = threadPool.scheduleWithFixedDelay(statusChecker, interval); + return threadPool.scheduleWithFixedDelay(statusChecker, interval); } @Override - protected void doStop() { + public void close() { FutureUtils.cancel(scheduler); - scheduler = null; - } - - @Override - protected void doClose() { } /** * returns the current budget for the total amount of indexing buffers of * active shards on this node */ - public ByteSizeValue indexingBufferSize() { + ByteSizeValue indexingBufferSize() { return indexingBuffer; } @@ -188,7 +176,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent i private final OldShardsStats oldShardsStats = new OldShardsStats(); private final IndexStoreConfig indexStoreConfig; private final MapperRegistry mapperRegistry; + private final IndexingMemoryController indexingMemoryController; @Override protected void doStart() { @@ -114,7 +116,7 @@ public class IndicesService extends AbstractLifecycleComponent i public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvironment nodeEnv, ClusterSettings clusterSettings, AnalysisRegistry analysisRegistry, IndicesQueriesRegistry indicesQueriesRegistry, IndexNameExpressionResolver indexNameExpressionResolver, - ClusterService clusterService, MapperRegistry mapperRegistry) { + ClusterService clusterService, MapperRegistry mapperRegistry, ThreadPool threadPool) { super(settings); this.pluginsService = pluginsService; this.nodeEnv = nodeEnv; @@ -127,7 +129,7 @@ public class IndicesService extends AbstractLifecycleComponent i this.mapperRegistry = mapperRegistry; clusterSettings.addSettingsUpdateConsumer(IndexStoreConfig.INDICES_STORE_THROTTLE_TYPE_SETTING, indexStoreConfig::setRateLimitingType); clusterSettings.addSettingsUpdateConsumer(IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING, indexStoreConfig::setRateLimitingThrottle); - + indexingMemoryController = new IndexingMemoryController(settings, threadPool, this); } @Override @@ -161,7 +163,7 @@ public class IndicesService extends AbstractLifecycleComponent i @Override protected void doClose() { - IOUtils.closeWhileHandlingException(analysisRegistry); + IOUtils.closeWhileHandlingException(analysisRegistry, indexingMemoryController); } /** @@ -291,6 +293,7 @@ public class IndicesService extends AbstractLifecycleComponent i final IndexModule indexModule = new IndexModule(idxSettings, indexStoreConfig, analysisRegistry); pluginsService.onIndexModule(indexModule); + indexModule.addIndexEventListener(indexingMemoryController); for (IndexEventListener listener : builtInListeners) { indexModule.addIndexEventListener(listener); } diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 8a213898b6c..9357de7b1eb 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -63,7 +63,6 @@ import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.flush.SyncedFlushService; -import org.elasticsearch.indices.memory.IndexingMemoryController; import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoverySource; import org.elasticsearch.indices.recovery.RecoveryState; @@ -130,9 +129,9 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent scheduleTask(ThreadPool threadPool) { + return null; + } } public void testShardAdditionAndRemoval() {