diff --git a/core/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java b/core/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java index f4ddd75ba85..e81a4a7c86b 100644 --- a/core/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java +++ b/core/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java @@ -125,6 +125,18 @@ final class CompositeIndexEventListener implements IndexEventListener { } } + @Override + public void onShardActive(IndexShard indexShard) { + for (IndexEventListener listener : listeners) { + try { + listener.onShardActive(indexShard); + } catch (Throwable t) { + logger.warn("[{}] failed to invoke on shard active callback", t, indexShard.shardId().getId()); + throw t; + } + } + } + @Override public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) { for (IndexEventListener listener : listeners) { diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index 03a6decb3e3..76a4d954d53 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -68,7 +68,7 @@ import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder; /** * */ -public class IndexService extends AbstractIndexComponent implements IndexComponent, Iterable{ +public final class IndexService extends AbstractIndexComponent implements IndexComponent, Iterable{ private final IndexEventListener eventListener; private final AnalysisService analysisService; @@ -538,15 +538,15 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone final EngineFactory getEngineFactory() { return engineFactory; - } + } // pkg private for testing final IndexSearcherWrapper getSearcherWrapper() { return searcherWrapper; - } + } // pkg private for testing final IndexStore getIndexStore() { return indexStore; - } + } // pkg private for testing } diff --git a/core/src/main/java/org/elasticsearch/index/NodeServicesProvider.java b/core/src/main/java/org/elasticsearch/index/NodeServicesProvider.java index 12fe36e99f2..aeb57926c04 100644 --- a/core/src/main/java/org/elasticsearch/index/NodeServicesProvider.java +++ b/core/src/main/java/org/elasticsearch/index/NodeServicesProvider.java @@ -45,7 +45,6 @@ public final class NodeServicesProvider { private final TermVectorsService termVectorsService; private final IndicesWarmer warmer; private final BigArrays bigArrays; - private final IndexingMemoryController indexingMemoryController; private final Client client; private final IndicesQueriesRegistry indicesQueriesRegistry; private final ScriptService scriptService; @@ -53,13 +52,12 @@ public final class NodeServicesProvider { private final CircuitBreakerService circuitBreakerService; @Inject - public NodeServicesProvider(ThreadPool threadPool, IndicesQueryCache indicesQueryCache, TermVectorsService termVectorsService, @Nullable IndicesWarmer warmer, BigArrays bigArrays, IndexingMemoryController indexingMemoryController, Client client, ScriptService scriptService, IndicesQueriesRegistry indicesQueriesRegistry, IndicesFieldDataCache indicesFieldDataCache, CircuitBreakerService circuitBreakerService) { + public NodeServicesProvider(ThreadPool threadPool, IndicesQueryCache indicesQueryCache, TermVectorsService termVectorsService, @Nullable IndicesWarmer warmer, BigArrays bigArrays, Client client, ScriptService scriptService, IndicesQueriesRegistry indicesQueriesRegistry, IndicesFieldDataCache indicesFieldDataCache, CircuitBreakerService circuitBreakerService) { this.threadPool = threadPool; this.indicesQueryCache = indicesQueryCache; this.termVectorsService = termVectorsService; this.warmer = warmer; this.bigArrays = bigArrays; - this.indexingMemoryController = indexingMemoryController; this.client = client; this.indicesQueriesRegistry = indicesQueriesRegistry; this.scriptService = scriptService; @@ -97,10 +95,6 @@ public final class NodeServicesProvider { return scriptService; } - public IndexingMemoryController getIndexingMemoryController() { - return indexingMemoryController; - } - public IndicesFieldDataCache getIndicesFieldDataCache() { return indicesFieldDataCache; } diff --git a/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java b/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java index 7c774aafe81..0308b7e2a33 100644 --- a/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java +++ b/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java @@ -23,7 +23,6 @@ import org.apache.lucene.util.Accountable; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.index.AbstractIndexComponent; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.fielddata.plain.BytesBinaryDVIndexFieldData; @@ -159,7 +158,6 @@ public class IndexFieldDataService extends AbstractIndexComponent implements Clo private volatile IndexFieldDataCache.Listener listener = DEFAULT_NOOP_LISTENER; - @Inject public IndexFieldDataService(IndexSettings indexSettings, IndicesFieldDataCache indicesFieldDataCache, CircuitBreakerService circuitBreakerService, MapperService mapperService) { super(indexSettings); diff --git a/core/src/main/java/org/elasticsearch/index/mapper/MapperService.java b/core/src/main/java/org/elasticsearch/index/mapper/MapperService.java index 3f046d9a429..beb84fefeb5 100755 --- a/core/src/main/java/org/elasticsearch/index/mapper/MapperService.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/MapperService.java @@ -33,7 +33,6 @@ import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.compress.CompressedXContent; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.search.Queries; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.util.concurrent.ReleasableLock; @@ -105,7 +104,6 @@ public class MapperService extends AbstractIndexComponent implements Closeable { private volatile Set parentTypes = emptySet(); - @Inject public MapperService(IndexSettings indexSettings, AnalysisService analysisService, SimilarityService similarityService) { super(indexSettings); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java b/core/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java index f361e2d6175..9a55b9b6161 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java @@ -89,6 +89,13 @@ public interface IndexEventListener { */ default void onShardInactive(IndexShard indexShard) {} + /** + * Called when a shard is marked as active ie. was previously inactive and is now active again. + * + * @param indexShard The shard that was marked active + */ + default void onShardActive(IndexShard indexShard) {} + /** * Called before the index gets created. Note that this is also called * when the index is created on data nodes diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index aa3ca5167cd..ef2552bad23 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -198,8 +198,6 @@ public class IndexShard extends AbstractIndexShardComponent { * IndexingMemoryController}). */ private final AtomicBoolean active = new AtomicBoolean(); - private final IndexingMemoryController indexingMemoryController; - public IndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache, MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService, @Nullable EngineFactory engineFactory, @@ -248,7 +246,6 @@ public class IndexShard extends AbstractIndexShardComponent { cachingPolicy = new UsageTrackingQueryCachingPolicy(); } - this.indexingMemoryController = provider.getIndexingMemoryController(); this.engineConfig = newEngineConfig(translogConfig, cachingPolicy); this.flushThresholdOperations = this.indexSettings.getAsInt(INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, this.indexSettings.getAsInt("index.translog.flush_threshold", Integer.MAX_VALUE)); this.flushThresholdSize = this.indexSettings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB)); @@ -921,7 +918,7 @@ public class IndexShard extends AbstractIndexShardComponent { // We are currently inactive, but a new write operation just showed up, so we now notify IMC // to wake up and fix our indexing buffer. We could do this async instead, but cost should // be low, and it's rare this happens. - indexingMemoryController.forceCheck(); + indexEventListener.onShardActive(this); } } @@ -1468,7 +1465,7 @@ public class IndexShard extends AbstractIndexShardComponent { final Engine.Warmer engineWarmer = (searcher, toLevel) -> warmer.warm(searcher, this, idxSettings, toLevel); return new EngineConfig(shardId, threadPool, indexingService, indexSettings, engineWarmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig, - mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, indexingMemoryController.getInactiveTime()); + mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, indexSettings.getAsTime(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5))); } private static class IndexShardOperationCounter extends AbstractRefCounted { diff --git a/core/src/main/java/org/elasticsearch/index/similarity/SimilarityService.java b/core/src/main/java/org/elasticsearch/index/similarity/SimilarityService.java index ce1a64b1319..1d08683f47b 100644 --- a/core/src/main/java/org/elasticsearch/index/similarity/SimilarityService.java +++ b/core/src/main/java/org/elasticsearch/index/similarity/SimilarityService.java @@ -33,7 +33,7 @@ import java.util.HashMap; import java.util.Map; import java.util.function.BiFunction; -public class SimilarityService extends AbstractIndexComponent { +public final class SimilarityService extends AbstractIndexComponent { public final static String DEFAULT_SIMILARITY = "default"; private final Similarity defaultSimilarity; diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java b/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java index eea4fb1753f..30ec403942f 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java @@ -63,7 +63,6 @@ public final class IndicesWarmer extends AbstractComponent { if (shard.state() == IndexShardState.CLOSED) { return; } - final IndexMetaData indexMetaData = settings.getIndexMetaData(); final Settings indexSettings = settings.getSettings(); if (!indexSettings.getAsBoolean(INDEX_WARMER_ENABLED, settings.getNodeSettings().getAsBoolean(INDEX_WARMER_ENABLED, true))) { return; diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index bddaee5ab88..74f11b4b929 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -53,6 +53,7 @@ import org.elasticsearch.index.shard.*; import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.flush.SyncedFlushService; +import org.elasticsearch.indices.memory.IndexingMemoryController; import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoverySource; import org.elasticsearch.indices.recovery.RecoveryState; @@ -112,9 +113,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent { +public class IndexingMemoryController extends AbstractLifecycleComponent implements IndexEventListener { /** How much heap (% or bytes) we will share across all actively indexing shards on this node (default: 10%). */ public static final String INDEX_BUFFER_SIZE_SETTING = "indices.memory.index_buffer_size"; @@ -426,4 +427,9 @@ public class IndexingMemoryController extends AbstractLifecycleComponent