diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 3fc2f4b631e..0c883ccb377 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -105,7 +105,7 @@ public class TransportClusterStatsAction extends TransportNodesAction { private final IndicesService indicesService; - private final IndicesRequestCache indicesRequestCache; @Inject public TransportClearIndicesCacheAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, - TransportService transportService, IndicesService indicesService, - IndicesRequestCache indicesQueryCache, ActionFilters actionFilters, + TransportService transportService, IndicesService indicesService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, ClearIndicesCacheAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, ClearIndicesCacheRequest::new, ThreadPool.Names.MANAGEMENT); this.indicesService = indicesService; - this.indicesRequestCache = indicesQueryCache; } @Override @@ -101,7 +98,7 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastByNodeAc } if (request.requestCache()) { clearedAtLeastOne = true; - indicesRequestCache.clear(shard); + indicesService.getIndicesRequestCache().clear(shard); } if (request.recycler()) { logger.debug("Clear CacheRecycler on index [{}]", service.index()); @@ -117,7 +114,7 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastByNodeAc } else { service.cache().clear("api"); service.fieldData().clear(); - indicesRequestCache.clear(shard); + indicesService.getIndicesRequestCache().clear(shard); } } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java index 85644e8523e..47fb8d8356a 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.cache.query.QueryCacheStats; import org.elasticsearch.index.cache.request.RequestCacheStats; import org.elasticsearch.index.engine.SegmentsStats; @@ -40,10 +41,13 @@ import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.index.suggest.stats.SuggestStats; import org.elasticsearch.index.translog.TranslogStats; import org.elasticsearch.index.warmer.WarmerStats; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.cache.query.IndicesQueryCache; import org.elasticsearch.search.suggest.completion.CompletionStats; import java.io.IOException; @@ -122,7 +126,8 @@ public class CommonStats implements Streamable, ToXContent { } - public CommonStats(IndexShard indexShard, CommonStatsFlags flags) { + public CommonStats(IndicesQueryCache indicesQueryCache, IndexShard indexShard, CommonStatsFlags flags) { + CommonStatsFlags.Flag[] setFlags = flags.getFlags(); for (CommonStatsFlags.Flag flag : setFlags) { @@ -155,7 +160,7 @@ public class CommonStats implements Streamable, ToXContent { warmer = indexShard.warmerStats(); break; case QueryCache: - queryCache = indexShard.queryCacheStats(); + queryCache = indicesQueryCache.getStats(indexShard.shardId()); break; case FieldData: fieldData = indexShard.fieldDataStats(flags.fieldDataFields()); diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java index d5de67da478..4bed5f918a4 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java @@ -162,6 +162,6 @@ public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction< flags.set(CommonStatsFlags.Flag.Recovery); } - return new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indexShard, flags), indexShard.commitStats()); + return new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indicesService.getIndicesQueryCache(), indexShard, flags), indexShard.commitStats()); } } diff --git a/core/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java b/core/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java index 1ee1f1cc4a9..acc0d3f8370 100644 --- a/core/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java +++ b/core/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java @@ -258,4 +258,16 @@ final class CompositeIndexEventListener implements IndexEventListener { } } } + + @Override + public void onStoreClosed(ShardId shardId) { + for (IndexEventListener listener : listeners) { + try { + listener.onStoreClosed(shardId); + } catch (Throwable t) { + logger.warn("failed to invoke on store closed", t); + throw t; + } + } + } } diff --git a/core/src/main/java/org/elasticsearch/index/IndexModule.java b/core/src/main/java/org/elasticsearch/index/IndexModule.java index b45f29f752a..f23441fa908 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexModule.java +++ b/core/src/main/java/org/elasticsearch/index/IndexModule.java @@ -241,7 +241,7 @@ public final class IndexModule { IndexSearcherWrapper newWrapper(final IndexService indexService); } - public IndexService newIndexService(NodeEnvironment environment, IndexService.ShardStoreDeleter shardStoreDeleter, NodeServicesProvider servicesProvider, MapperRegistry mapperRegistry, IndicesFieldDataCache indicesFieldDataCache, + public IndexService newIndexService(NodeEnvironment environment, IndexService.ShardStoreDeleter shardStoreDeleter, NodeServicesProvider servicesProvider, IndicesQueryCache indicesQueryCache, MapperRegistry mapperRegistry, IndicesFieldDataCache indicesFieldDataCache, IndexingOperationListener... listeners) throws IOException { IndexSearcherWrapperFactory searcherWrapperFactory = indexSearcherWrapper.get() == null ? (shard) -> null : indexSearcherWrapper.get(); IndexEventListener eventListener = freeze(); @@ -263,7 +263,7 @@ public final class IndexModule { indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING, store::setMaxRate); final String queryCacheType = indexSettings.getValue(INDEX_QUERY_CACHE_TYPE_SETTING); final BiFunction queryCacheProvider = queryCaches.get(queryCacheType); - final QueryCache queryCache = queryCacheProvider.apply(indexSettings, servicesProvider.getIndicesQueryCache()); + final QueryCache queryCache = queryCacheProvider.apply(indexSettings, indicesQueryCache); return new IndexService(indexSettings, environment, new SimilarityService(indexSettings, similarities), shardStoreDeleter, analysisRegistry, engineFactory.get(), servicesProvider, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry, indicesFieldDataCache, listeners); } diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index 9ffb6c4d56c..ffac5d2a21b 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -321,8 +321,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC warmer.warm(searcher, shard, IndexService.this.indexSettings, toLevel); } }; - - store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock, new StoreCloseListener(shardId, canDeleteShardContent, () -> nodeServicesProvider.getIndicesQueryCache().onClose(shardId))); + store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock, new StoreCloseListener(shardId, canDeleteShardContent, () -> eventListener.onStoreClosed(shardId))); if (useShadowEngine(primary, indexSettings)) { indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, searchSlowLog, engineWarmer); // no indexing listeners - shadow engines don't index } else { diff --git a/core/src/main/java/org/elasticsearch/index/NodeServicesProvider.java b/core/src/main/java/org/elasticsearch/index/NodeServicesProvider.java index 4bb25214708..fa245352ae7 100644 --- a/core/src/main/java/org/elasticsearch/index/NodeServicesProvider.java +++ b/core/src/main/java/org/elasticsearch/index/NodeServicesProvider.java @@ -23,7 +23,6 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.indices.breaker.CircuitBreakerService; -import org.elasticsearch.indices.cache.query.IndicesQueryCache; import org.elasticsearch.indices.query.IndicesQueriesRegistry; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; @@ -36,7 +35,6 @@ import org.elasticsearch.threadpool.ThreadPool; public final class NodeServicesProvider { private final ThreadPool threadPool; - private final IndicesQueryCache indicesQueryCache; private final BigArrays bigArrays; private final Client client; private final IndicesQueriesRegistry indicesQueriesRegistry; @@ -44,9 +42,8 @@ public final class NodeServicesProvider { private final CircuitBreakerService circuitBreakerService; @Inject - public NodeServicesProvider(ThreadPool threadPool, IndicesQueryCache indicesQueryCache, BigArrays bigArrays, Client client, ScriptService scriptService, IndicesQueriesRegistry indicesQueriesRegistry, CircuitBreakerService circuitBreakerService) { + public NodeServicesProvider(ThreadPool threadPool, BigArrays bigArrays, Client client, ScriptService scriptService, IndicesQueriesRegistry indicesQueriesRegistry, CircuitBreakerService circuitBreakerService) { this.threadPool = threadPool; - this.indicesQueryCache = indicesQueryCache; this.bigArrays = bigArrays; this.client = client; this.indicesQueriesRegistry = indicesQueriesRegistry; @@ -58,10 +55,6 @@ public final class NodeServicesProvider { return threadPool; } - public IndicesQueryCache getIndicesQueryCache() { - return indicesQueryCache; - } - public BigArrays getBigArrays() { return bigArrays; } public Client getClient() { diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java b/core/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java index 8d3523a18b1..f5c6dca7d2f 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java @@ -177,4 +177,12 @@ public interface IndexEventListener { */ default void beforeIndexAddedToCluster(Index index, Settings indexSettings) { } + + /** + * Called when the given shards store is closed. The store is closed once all resource have been released on the store. + * This implies that all index readers are closed and no recoveries are running. + * + * @param shardId the shard ID the store belongs to + */ + default void onStoreClosed(ShardId shardId) {} } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f2f1add2594..0f47eec6b25 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -58,7 +58,6 @@ import org.elasticsearch.index.SearchSlowLog; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache; -import org.elasticsearch.index.cache.query.QueryCacheStats; import org.elasticsearch.index.cache.request.ShardRequestCache; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.engine.CommitStats; @@ -105,7 +104,6 @@ import org.elasticsearch.index.translog.TranslogStats; import org.elasticsearch.index.warmer.ShardIndexWarmerService; import org.elasticsearch.index.warmer.WarmerStats; import org.elasticsearch.indices.IndexingMemoryController; -import org.elasticsearch.indices.cache.query.IndicesQueryCache; import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.percolator.PercolatorService; @@ -154,7 +152,6 @@ public class IndexShard extends AbstractIndexShardComponent { private final SimilarityService similarityService; private final EngineConfig engineConfig; private final TranslogConfig translogConfig; - private final IndicesQueryCache indicesQueryCache; private final IndexEventListener indexEventListener; private final IndexSettings idxSettings; @@ -227,7 +224,6 @@ public class IndexShard extends AbstractIndexShardComponent { this.getService = new ShardGetService(indexSettings, this, mapperService); this.searchService = new ShardSearchStats(slowLog); this.shardWarmerService = new ShardIndexWarmerService(shardId, indexSettings); - this.indicesQueryCache = provider.getIndicesQueryCache(); this.shardQueryCache = new ShardRequestCache(shardId, indexSettings); this.shardFieldData = new ShardFieldData(); this.indexFieldDataService = indexFieldDataService; @@ -652,10 +648,6 @@ public class IndexShard extends AbstractIndexShardComponent { return shardWarmerService.stats(); } - public QueryCacheStats queryCacheStats() { - return indicesQueryCache.getStats(shardId); - } - public FieldDataStats fieldDataStats(String... fields) { return shardFieldData.stats(fields); } diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesModule.java b/core/src/main/java/org/elasticsearch/indices/IndicesModule.java index f14d708c0f9..b94ef19ec23 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -55,8 +55,6 @@ import org.elasticsearch.index.mapper.internal.VersionFieldMapper; import org.elasticsearch.index.mapper.ip.IpFieldMapper; import org.elasticsearch.index.mapper.object.ObjectMapper; import org.elasticsearch.index.percolator.PercolatorFieldMapper; -import org.elasticsearch.indices.cache.query.IndicesQueryCache; -import org.elasticsearch.indices.cache.request.IndicesRequestCache; import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.flush.SyncedFlushService; import org.elasticsearch.indices.mapper.MapperRegistry; @@ -160,8 +158,6 @@ public class IndicesModule extends AbstractModule { bind(IndicesStore.class).asEagerSingleton(); bind(IndicesClusterStateService.class).asEagerSingleton(); bind(SyncedFlushService.class).asEagerSingleton(); - bind(IndicesQueryCache.class).asEagerSingleton(); - bind(IndicesRequestCache.class).asEagerSingleton(); bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton(); bind(IndicesTTLService.class).asEagerSingleton(); bind(UpdateHelper.class).asEagerSingleton(); diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesService.java b/core/src/main/java/org/elasticsearch/indices/IndicesService.java index f092789ae30..7b2bc89e646 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -71,6 +71,8 @@ import org.elasticsearch.index.shard.IndexingStats; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.IndexStoreConfig; import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.cache.query.IndicesQueryCache; +import org.elasticsearch.indices.cache.request.IndicesRequestCache; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.mapper.MapperRegistry; import org.elasticsearch.indices.query.IndicesQueriesRegistry; @@ -124,6 +126,8 @@ public class IndicesService extends AbstractLifecycleComponent i private final MapperRegistry mapperRegistry; private final IndexingMemoryController indexingMemoryController; private final TimeValue cleanInterval; + private final IndicesRequestCache indicesRequestCache; + private final IndicesQueryCache indicesQueryCache; @Override protected void doStart() { @@ -146,6 +150,8 @@ public class IndicesService extends AbstractLifecycleComponent i this.indicesQueriesRegistry = indicesQueriesRegistry; this.clusterService = clusterService; this.indexNameExpressionResolver = indexNameExpressionResolver; + this.indicesRequestCache = new IndicesRequestCache(settings, threadPool); + this.indicesQueryCache = new IndicesQueryCache(settings); this.mapperRegistry = mapperRegistry; clusterSettings.addSettingsUpdateConsumer(IndexStoreConfig.INDICES_STORE_THROTTLE_TYPE_SETTING, indexStoreConfig::setRateLimitingType); clusterSettings.addSettingsUpdateConsumer(IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING, indexStoreConfig::setRateLimitingThrottle); @@ -196,7 +202,7 @@ public class IndicesService extends AbstractLifecycleComponent i @Override protected void doClose() { - IOUtils.closeWhileHandlingException(analysisRegistry, indexingMemoryController, indicesFieldDataCache, fieldDataCacheCleaner); + IOUtils.closeWhileHandlingException(analysisRegistry, indexingMemoryController, indicesFieldDataCache, fieldDataCacheCleaner, indicesRequestCache, indicesQueryCache); } /** @@ -247,7 +253,7 @@ public class IndicesService extends AbstractLifecycleComponent i if (indexShard.routingEntry() == null) { continue; } - IndexShardStats indexShardStats = new IndexShardStats(indexShard.shardId(), new ShardStats[] { new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indexShard, flags), indexShard.commitStats()) }); + IndexShardStats indexShardStats = new IndexShardStats(indexShard.shardId(), new ShardStats[] { new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indicesQueryCache, indexShard, flags), indexShard.commitStats()) }); if (!statsByShard.containsKey(indexService.index())) { statsByShard.put(indexService.index(), arrayAsArrayList(indexShardStats)); } else { @@ -348,10 +354,17 @@ public class IndicesService extends AbstractLifecycleComponent i for (IndexEventListener listener : builtInListeners) { indexModule.addIndexEventListener(listener); } + final IndexEventListener onStoreClose = new IndexEventListener() { + @Override + public void onStoreClosed(ShardId shardId) { + indicesQueryCache.onClose(shardId); + } + }; + indexModule.addIndexEventListener(onStoreClose); indexModule.addIndexEventListener(oldShardsStats); final IndexEventListener listener = indexModule.freeze(); listener.beforeIndexCreated(index, idxSettings.getSettings()); - final IndexService indexService = indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, mapperRegistry, indicesFieldDataCache, indexingMemoryController); + final IndexService indexService = indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, indicesQueryCache, mapperRegistry, indicesFieldDataCache, indexingMemoryController); boolean success = false; try { assert indexService.getIndexEventListener() == listener; @@ -420,6 +433,14 @@ public class IndicesService extends AbstractLifecycleComponent i return circuitBreakerService; } + public IndicesRequestCache getIndicesRequestCache() { + return indicesRequestCache; + } + + public IndicesQueryCache getIndicesQueryCache() { + return indicesQueryCache; + } + static class OldShardsStats implements IndexEventListener { final SearchStats searchStats = new SearchStats(); diff --git a/core/src/main/java/org/elasticsearch/indices/cache/query/IndicesQueryCache.java b/core/src/main/java/org/elasticsearch/indices/cache/query/IndicesQueryCache.java index 58c2cd5a953..718f4db9c4e 100644 --- a/core/src/main/java/org/elasticsearch/indices/cache/query/IndicesQueryCache.java +++ b/core/src/main/java/org/elasticsearch/indices/cache/query/IndicesQueryCache.java @@ -64,7 +64,6 @@ public class IndicesQueryCache extends AbstractComponent implements QueryCache, // See onDocIdSetEviction for more info private final Map stats2 = new IdentityHashMap<>(); - @Inject public IndicesQueryCache(Settings settings) { super(settings); final ByteSizeValue size = INDICES_CACHE_QUERY_SIZE_SETTING.get(settings); diff --git a/core/src/main/java/org/elasticsearch/indices/cache/request/IndicesRequestCache.java b/core/src/main/java/org/elasticsearch/indices/cache/request/IndicesRequestCache.java index 36ac787855b..d58c1c13994 100644 --- a/core/src/main/java/org/elasticsearch/indices/cache/request/IndicesRequestCache.java +++ b/core/src/main/java/org/elasticsearch/indices/cache/request/IndicesRequestCache.java @@ -45,6 +45,8 @@ import org.elasticsearch.common.unit.MemorySizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.search.internal.SearchContext; @@ -53,6 +55,7 @@ import org.elasticsearch.search.query.QueryPhase; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.threadpool.ThreadPool; +import java.io.Closeable; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -74,7 +77,7 @@ import java.util.concurrent.TimeUnit; * There are still several TODOs left in this class, some easily addressable, some more complex, but the support * is functional. */ -public class IndicesRequestCache extends AbstractComponent implements RemovalListener { +public class IndicesRequestCache extends AbstractComponent implements RemovalListener, Closeable { /** * A setting to enable or disable request caching on an index level. Its dynamic by default @@ -89,7 +92,6 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis private static final Set CACHEABLE_SEARCH_TYPES = EnumSet.of(SearchType.QUERY_THEN_FETCH, SearchType.QUERY_AND_FETCH); private final ThreadPool threadPool; - private final ClusterService clusterService; private final TimeValue cleanInterval; private final Reaper reaper; @@ -104,18 +106,13 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis private volatile Cache cache; - @Inject - public IndicesRequestCache(Settings settings, ClusterService clusterService, ThreadPool threadPool) { + public IndicesRequestCache(Settings settings, ThreadPool threadPool) { super(settings); - this.clusterService = clusterService; this.threadPool = threadPool; this.cleanInterval = INDICES_CACHE_REQUEST_CLEAN_INTERVAL.get(settings); - this.size = INDICES_CACHE_QUERY_SIZE.get(settings); - this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null; buildCache(); - this.reaper = new Reaper(); threadPool.schedule(cleanInterval, ThreadPool.Names.SAME, reaper); } @@ -123,10 +120,8 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis private void buildCache() { long sizeInBytes = size.bytes(); - CacheBuilder cacheBuilder = CacheBuilder.builder() .setMaximumWeight(sizeInBytes).weigher((k, v) -> k.ramBytesUsed() + v.ramBytesUsed()).removalListener(this); - // cacheBuilder.concurrencyLevel(concurrencyLevel); if (expire != null) { cacheBuilder.setExpireAfterAccess(TimeUnit.MILLISECONDS.toNanos(expire.millis())); @@ -135,6 +130,7 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis cache = cacheBuilder.build(); } + @Override public void close() { reaper.close(); cache.invalidateAll(); @@ -174,21 +170,17 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis if (!CACHEABLE_SEARCH_TYPES.contains(context.searchType())) { return false; } - - IndexMetaData index = clusterService.state().getMetaData().index(request.index()); - if (index == null) { // in case we didn't yet have the cluster state, or it just got deleted - return false; - } + IndexSettings settings = context.indexShard().getIndexSettings(); // if not explicitly set in the request, use the index setting, if not, use the request if (request.requestCache() == null) { - if (INDEX_CACHE_REQUEST_ENABLED_SETTING.get(index.getSettings()) == false) { + if (settings.getValue(INDEX_CACHE_REQUEST_ENABLED_SETTING) == false) { return false; } - } else if (!request.requestCache()) { + } else if (request.requestCache() == false) { return false; } // if the reader is not a directory reader, we can't get the version from it - if (!(context.searcher().getIndexReader() instanceof DirectoryReader)) { + if ((context.searcher().getIndexReader() instanceof DirectoryReader) == false) { return false; } // if now in millis is used (or in the future, a more generic "isDeterministic" flag diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 88d21764abb..ee523e975a1 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -70,9 +70,7 @@ import org.elasticsearch.indices.IndicesModule; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.analysis.AnalysisModule; import org.elasticsearch.indices.breaker.CircuitBreakerModule; -import org.elasticsearch.indices.cache.query.IndicesQueryCache; import org.elasticsearch.indices.cluster.IndicesClusterStateService; -import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.indices.ttl.IndicesTTLService; import org.elasticsearch.monitor.MonitorService; @@ -391,7 +389,6 @@ public class Node implements Closeable { toClose.add(injector.getInstance(IndicesTTLService.class)); toClose.add(injector.getInstance(IndicesService.class)); // close filter/fielddata caches after indices - toClose.add(injector.getInstance(IndicesQueryCache.class)); toClose.add(injector.getInstance(IndicesStore.class)); toClose.add(() ->stopWatch.stop().start("routing")); toClose.add(injector.getInstance(RoutingService.class)); diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index e0b30a2e346..4afef7117f5 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -149,8 +149,7 @@ public class SearchService extends AbstractLifecycleComponent imp @Inject public SearchService(Settings settings, ClusterSettings clusterSettings, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, - ScriptService scriptService, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase, - IndicesRequestCache indicesQueryCache) { + ScriptService scriptService, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase) { super(settings); this.parseFieldMatcher = new ParseFieldMatcher(settings); this.threadPool = threadPool; @@ -162,7 +161,7 @@ public class SearchService extends AbstractLifecycleComponent imp this.dfsPhase = dfsPhase; this.queryPhase = queryPhase; this.fetchPhase = fetchPhase; - this.indicesQueryCache = indicesQueryCache; + this.indicesQueryCache = indicesService.getIndicesRequestCache(); TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings); this.defaultKeepAlive = DEFAULT_KEEPALIVE_SETTING.get(settings).millis(); diff --git a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java index 0b0691bc588..55c0c85a889 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -88,6 +88,8 @@ public class IndexModuleTests extends ESTestCase { private Environment environment; private NodeEnvironment nodeEnvironment; private NodeServicesProvider nodeServicesProvider; + private IndicesQueryCache indicesQueryCache; + private IndexService.ShardStoreDeleter deleter = new IndexService.ShardStoreDeleter() { @Override public void deleteShardStore(String reason, ShardLock lock, IndexSettings indexSettings) throws IOException { @@ -103,7 +105,6 @@ public class IndexModuleTests extends ESTestCase { static NodeServicesProvider newNodeServiceProvider(Settings settings, Environment environment, Client client, ScriptEngineService... scriptEngineServices) throws IOException { // TODO this can be used in other place too - lets first refactor the IndicesQueriesRegistry ThreadPool threadPool = new ThreadPool("test"); - IndicesQueryCache indicesQueryCache = new IndicesQueryCache(settings); CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService(); PageCacheRecycler recycler = new PageCacheRecycler(settings, threadPool); BigArrays bigArrays = new BigArrays(recycler, circuitBreakerService); @@ -114,13 +115,14 @@ public class IndexModuleTests extends ESTestCase { ScriptSettings scriptSettings = new ScriptSettings(scriptEngineRegistry, scriptContextRegistry); ScriptService scriptService = new ScriptService(settings, environment, scriptEngines, new ResourceWatcherService(settings, threadPool), scriptEngineRegistry, scriptContextRegistry, scriptSettings); IndicesQueriesRegistry indicesQueriesRegistry = new IndicesQueriesRegistry(settings, emptyMap()); - return new NodeServicesProvider(threadPool, indicesQueryCache, bigArrays, client, scriptService, indicesQueriesRegistry, circuitBreakerService); + return new NodeServicesProvider(threadPool, bigArrays, client, scriptService, indicesQueriesRegistry, circuitBreakerService); } @Override public void setUp() throws Exception { super.setUp(); settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); + indicesQueryCache = new IndicesQueryCache(settings); indexSettings = IndexSettingsModule.newIndexSettings("foo", settings); index = indexSettings.getIndex(); environment = new Environment(settings); @@ -133,17 +135,15 @@ public class IndexModuleTests extends ESTestCase { public void tearDown() throws Exception { super.tearDown(); nodeEnvironment.close(); - nodeServicesProvider.getThreadPool().shutdown(); - if (nodeServicesProvider.getThreadPool().awaitTermination(10, TimeUnit.SECONDS) == false) { - nodeServicesProvider.getThreadPool().shutdownNow(); - } + indicesQueryCache.close(); + ThreadPool.terminate(nodeServicesProvider.getThreadPool(), 10, TimeUnit.SECONDS); } public void testWrapperIsBound() throws IOException { IndexModule module = new IndexModule(indexSettings, null, new AnalysisRegistry(null, environment)); module.setSearcherWrapper((s) -> new Wrapper()); module.engineFactory.set(new MockEngineFactory(AssertingDirectoryReader.class)); - IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, new IndicesFieldDataCache(settings, listener)); + IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, new IndicesFieldDataCache(settings, listener)); assertTrue(indexService.getSearcherWrapper() instanceof Wrapper); assertSame(indexService.getEngineFactory(), module.engineFactory.get()); indexService.close("simon says", false); @@ -158,10 +158,9 @@ public class IndexModuleTests extends ESTestCase { .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), "foo_store") .build(); IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings); - final Index index = indexSettings.getIndex(); IndexModule module = new IndexModule(indexSettings, null, new AnalysisRegistry(null, environment)); module.addIndexStore("foo_store", FooStore::new); - IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, new IndicesFieldDataCache(settings, listener)); + IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, new IndicesFieldDataCache(settings, listener)); assertTrue(indexService.getIndexStore() instanceof FooStore); try { module.addIndexStore("foo_store", FooStore::new); @@ -184,7 +183,7 @@ public class IndexModuleTests extends ESTestCase { IndexModule module = new IndexModule(indexSettings, null, new AnalysisRegistry(null, environment)); Consumer listener = (s) -> {}; module.addIndexEventListener(eventListener); - IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, + IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, new IndicesFieldDataCache(settings, this.listener)); IndexSettings x = indexService.getIndexSettings(); assertEquals(x.getSettings().getAsMap(), indexSettings.getSettings().getAsMap()); @@ -209,7 +208,7 @@ public class IndexModuleTests extends ESTestCase { } - IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, + IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, new IndicesFieldDataCache(settings, listener)); assertSame(booleanSetting, indexService.getIndexSettings().getScopedSettings().get(booleanSetting.getKey())); @@ -236,7 +235,7 @@ public class IndexModuleTests extends ESTestCase { } }); - IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, + IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, new IndicesFieldDataCache(settings, listener)); SimilarityService similarityService = indexService.similarityService(); assertNotNull(similarityService.getSimilarity("my_similarity")); @@ -254,7 +253,7 @@ public class IndexModuleTests extends ESTestCase { .build(); IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null, new AnalysisRegistry(null, environment)); try { - module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, + module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, new IndicesFieldDataCache(settings, listener)); } catch (IllegalArgumentException ex) { assertEquals("Unknown Similarity type [test_similarity] for [my_similarity]", ex.getMessage()); @@ -269,7 +268,7 @@ public class IndexModuleTests extends ESTestCase { .build(); IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null, new AnalysisRegistry(null, environment)); try { - module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, + module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, new IndicesFieldDataCache(settings, listener)); } catch (IllegalArgumentException ex) { assertEquals("Similarity [my_similarity] must have an associated type", ex.getMessage()); @@ -317,7 +316,7 @@ public class IndexModuleTests extends ESTestCase { assertEquals(e.getMessage(), "Can't register the same [query_cache] more than once for [custom]"); } - IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, + IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, new IndicesFieldDataCache(settings, listener)); assertTrue(indexService.cache().query() instanceof CustomQueryCache); indexService.close("simon says", false); @@ -328,7 +327,7 @@ public class IndexModuleTests extends ESTestCase { .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null, new AnalysisRegistry(null, environment)); - IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, + IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, new IndicesFieldDataCache(settings, listener)); assertTrue(indexService.cache().query() instanceof IndexQueryCache); indexService.close("simon says", false); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index e5f0ade2d50..b5a38553a8c 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -564,7 +564,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService test = indicesService.indexService("test"); IndexShard shard = test.getShardOrNull(0); - ShardStats stats = new ShardStats(shard.routingEntry(), shard.shardPath(), new CommonStats(shard, new CommonStatsFlags()), shard.commitStats()); + ShardStats stats = new ShardStats(shard.routingEntry(), shard.shardPath(), new CommonStats(indicesService.getIndicesQueryCache(), shard, new CommonStatsFlags()), shard.commitStats()); assertEquals(shard.shardPath().getRootDataPath().toString(), stats.getDataPath()); assertEquals(shard.shardPath().getRootStatePath().toString(), stats.getStatePath()); assertEquals(shard.shardPath().isCustomDataPath(), stats.isCustomDataPath()); diff --git a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java index 7cc583273b9..e9d78e4e58a 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java +++ b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java @@ -26,7 +26,6 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.indices.cache.request.IndicesRequestCache; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.dfs.DfsPhase; @@ -68,9 +67,9 @@ public class MockSearchService extends SearchService { @Inject public MockSearchService(Settings settings, ClusterSettings clusterSettings, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ScriptService scriptService, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, - DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase, IndicesRequestCache indicesQueryCache) { + DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase) { super(settings, clusterSettings, clusterService, indicesService, threadPool, scriptService, pageCacheRecycler, bigArrays, dfsPhase, - queryPhase, fetchPhase, indicesQueryCache); + queryPhase, fetchPhase); } @Override