diff --git a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index a6babce961d..e4b92d82d20 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -60,6 +60,7 @@ import org.elasticsearch.gateway.PrimaryShardAllocator; import org.elasticsearch.http.HttpTransportSettings; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.store.IndexStoreConfig; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.analysis.HunspellService; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.indices.cache.query.IndicesQueryCache; @@ -290,7 +291,7 @@ public final class ClusterSettings extends AbstractScopedSettings { ScriptService.SCRIPT_CACHE_SIZE_SETTING, ScriptService.SCRIPT_CACHE_EXPIRE_SETTING, ScriptService.SCRIPT_AUTO_RELOAD_ENABLED_SETTING, - IndicesFieldDataCache.INDICES_FIELDDATA_CLEAN_INTERVAL_SETTING, + IndicesService.INDICES_FIELDDATA_CLEAN_INTERVAL_SETTING, IndicesFieldDataCache.INDICES_FIELDDATA_CACHE_SIZE_KEY, IndicesRequestCache.INDICES_CACHE_QUERY_SIZE, IndicesRequestCache.INDICES_CACHE_QUERY_EXPIRE, diff --git a/core/src/main/java/org/elasticsearch/index/IndexModule.java b/core/src/main/java/org/elasticsearch/index/IndexModule.java index 4688fba5034..fada5f42830 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexModule.java +++ b/core/src/main/java/org/elasticsearch/index/IndexModule.java @@ -38,6 +38,7 @@ import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStoreConfig; import org.elasticsearch.indices.cache.query.IndicesQueryCache; +import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.mapper.MapperRegistry; import java.io.IOException; @@ -240,7 +241,7 @@ public final class IndexModule { IndexSearcherWrapper newWrapper(final IndexService indexService); } - public IndexService newIndexService(NodeEnvironment environment, IndexService.ShardStoreDeleter shardStoreDeleter, NodeServicesProvider servicesProvider, MapperRegistry mapperRegistry, + public IndexService newIndexService(NodeEnvironment environment, IndexService.ShardStoreDeleter shardStoreDeleter, NodeServicesProvider servicesProvider, MapperRegistry mapperRegistry, IndicesFieldDataCache indicesFieldDataCache, IndexingOperationListener... listeners) throws IOException { IndexSearcherWrapperFactory searcherWrapperFactory = indexSearcherWrapper.get() == null ? (shard) -> null : indexSearcherWrapper.get(); IndexEventListener eventListener = freeze(); @@ -264,7 +265,7 @@ public final class IndexModule { final BiFunction queryCacheProvider = queryCaches.get(queryCacheType); final QueryCache queryCache = queryCacheProvider.apply(indexSettings, servicesProvider.getIndicesQueryCache()); return new IndexService(indexSettings, environment, new SimilarityService(indexSettings, similarities), shardStoreDeleter, analysisRegistry, engineFactory.get(), - servicesProvider, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry, listeners); + servicesProvider, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry, indicesFieldDataCache, listeners); } } diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index 8c87b2b5606..af800caf1c8 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -76,6 +76,7 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.AliasFilterParsingException; import org.elasticsearch.indices.InvalidAliasNameException; +import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.mapper.MapperRegistry; import org.elasticsearch.threadpool.ThreadPool; @@ -122,13 +123,14 @@ public final class IndexService extends AbstractIndexComponent implements IndexC IndexEventListener eventListener, IndexModule.IndexSearcherWrapperFactory wrapperFactory, MapperRegistry mapperRegistry, + IndicesFieldDataCache indicesFieldDataCache, IndexingOperationListener... listenersIn) throws IOException { super(indexSettings); this.indexSettings = indexSettings; this.analysisService = registry.build(indexSettings); this.similarityService = similarityService; this.mapperService = new MapperService(indexSettings, analysisService, similarityService, mapperRegistry, IndexService.this::getQueryShardContext); - this.indexFieldData = new IndexFieldDataService(indexSettings, nodeServicesProvider.getIndicesFieldDataCache(), nodeServicesProvider.getCircuitBreakerService(), mapperService); + this.indexFieldData = new IndexFieldDataService(indexSettings, indicesFieldDataCache, nodeServicesProvider.getCircuitBreakerService(), mapperService); this.shardStoreDeleter = shardStoreDeleter; this.eventListener = eventListener; this.nodeEnv = nodeEnv; diff --git a/core/src/main/java/org/elasticsearch/index/NodeServicesProvider.java b/core/src/main/java/org/elasticsearch/index/NodeServicesProvider.java index 36d3fdc5dce..d2b4894a5e6 100644 --- a/core/src/main/java/org/elasticsearch/index/NodeServicesProvider.java +++ b/core/src/main/java/org/elasticsearch/index/NodeServicesProvider.java @@ -26,7 +26,6 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.indices.IndicesWarmer; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.cache.query.IndicesQueryCache; -import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.query.IndicesQueriesRegistry; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; @@ -45,11 +44,10 @@ public final class NodeServicesProvider { private final Client client; private final IndicesQueriesRegistry indicesQueriesRegistry; private final ScriptService scriptService; - private final IndicesFieldDataCache indicesFieldDataCache; private final CircuitBreakerService circuitBreakerService; @Inject - public NodeServicesProvider(ThreadPool threadPool, IndicesQueryCache indicesQueryCache, @Nullable IndicesWarmer warmer, BigArrays bigArrays, Client client, ScriptService scriptService, IndicesQueriesRegistry indicesQueriesRegistry, IndicesFieldDataCache indicesFieldDataCache, CircuitBreakerService circuitBreakerService) { + public NodeServicesProvider(ThreadPool threadPool, IndicesQueryCache indicesQueryCache, @Nullable IndicesWarmer warmer, BigArrays bigArrays, Client client, ScriptService scriptService, IndicesQueriesRegistry indicesQueriesRegistry, CircuitBreakerService circuitBreakerService) { this.threadPool = threadPool; this.indicesQueryCache = indicesQueryCache; this.warmer = warmer; @@ -57,7 +55,6 @@ public final class NodeServicesProvider { this.client = client; this.indicesQueriesRegistry = indicesQueriesRegistry; this.scriptService = scriptService; - this.indicesFieldDataCache = indicesFieldDataCache; this.circuitBreakerService = circuitBreakerService; } @@ -87,10 +84,6 @@ public final class NodeServicesProvider { return scriptService; } - public IndicesFieldDataCache getIndicesFieldDataCache() { - return indicesFieldDataCache; - } - public CircuitBreakerService getCircuitBreakerService() { return circuitBreakerService; } diff --git a/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java b/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java index 7640a9be200..68c05aba3fe 100644 --- a/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java +++ b/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java @@ -48,12 +48,12 @@ public interface IndexFieldDataCache { /** * Called after the fielddata is loaded during the cache phase */ - void onCache(ShardId shardId, String fieldName, FieldDataType fieldDataType, Accountable ramUsage); + default void onCache(ShardId shardId, String fieldName, FieldDataType fieldDataType, Accountable ramUsage){} /** * Called after the fielddata is unloaded */ - void onRemoval(ShardId shardId, String fieldName, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes); + default void onRemoval(ShardId shardId, String fieldName, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes){} } class None implements IndexFieldDataCache { diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesModule.java b/core/src/main/java/org/elasticsearch/indices/IndicesModule.java index 256aa72a105..e66cd3e721c 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -55,12 +55,9 @@ import org.elasticsearch.index.mapper.internal.VersionFieldMapper; import org.elasticsearch.index.mapper.ip.IpFieldMapper; import org.elasticsearch.index.mapper.object.ObjectMapper; import org.elasticsearch.index.percolator.PercolatorFieldMapper; -import org.elasticsearch.index.termvectors.TermVectorsService; import org.elasticsearch.indices.cache.query.IndicesQueryCache; import org.elasticsearch.indices.cache.request.IndicesRequestCache; import org.elasticsearch.indices.cluster.IndicesClusterStateService; -import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; -import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener; import org.elasticsearch.indices.flush.SyncedFlushService; import org.elasticsearch.indices.mapper.MapperRegistry; import org.elasticsearch.indices.recovery.RecoverySettings; @@ -165,13 +162,11 @@ public class IndicesModule extends AbstractModule { bind(SyncedFlushService.class).asEagerSingleton(); bind(IndicesQueryCache.class).asEagerSingleton(); bind(IndicesRequestCache.class).asEagerSingleton(); - bind(IndicesFieldDataCache.class).asEagerSingleton(); bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton(); bind(IndicesTTLService.class).asEagerSingleton(); bind(IndicesWarmer.class).asEagerSingleton(); bind(UpdateHelper.class).asEagerSingleton(); bind(MetaDataIndexUpgradeService.class).asEagerSingleton(); - bind(IndicesFieldDataCacheListener.class).asEagerSingleton(); bind(NodeServicesProvider.class).asEagerSingleton(); } diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesService.java b/core/src/main/java/org/elasticsearch/indices/IndicesService.java index bb61fed4362..876411964b4 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -20,6 +20,7 @@ package org.elasticsearch.indices; import org.apache.lucene.store.LockObtainFailedException; +import org.apache.lucene.util.Accountable; import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; @@ -33,11 +34,15 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.FileSystemUtils; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -51,6 +56,8 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.NodeServicesProvider; import org.elasticsearch.index.analysis.AnalysisRegistry; +import org.elasticsearch.index.fielddata.FieldDataType; +import org.elasticsearch.index.fielddata.IndexFieldDataCache; import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.get.GetStats; import org.elasticsearch.index.merge.MergeStats; @@ -63,6 +70,8 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexingStats; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.IndexStoreConfig; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.mapper.MapperRegistry; import org.elasticsearch.indices.query.IndicesQueriesRegistry; import org.elasticsearch.plugins.PluginsService; @@ -81,6 +90,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; import static java.util.Collections.emptyMap; @@ -94,6 +104,7 @@ import static org.elasticsearch.common.util.CollectionUtils.arrayAsArrayList; public class IndicesService extends AbstractLifecycleComponent implements Iterable, IndexService.ShardStoreDeleter { public static final String INDICES_SHARDS_CLOSED_TIMEOUT = "indices.shards_closed_timeout"; + public static final Setting INDICES_FIELDDATA_CLEAN_INTERVAL_SETTING = Setting.positiveTimeSetting("indices.fielddata.cache.cleanup_interval", TimeValue.timeValueMinutes(1), false, Setting.Scope.CLUSTER); private final PluginsService pluginsService; private final NodeEnvironment nodeEnv; private final TimeValue shardsClosedTimeout; @@ -102,23 +113,31 @@ public class IndicesService extends AbstractLifecycleComponent i private final ClusterService clusterService; private final IndexNameExpressionResolver indexNameExpressionResolver; private final IndexScopedSettings indexScopeSetting; + private final IndicesFieldDataCache indicesFieldDataCache; + private final FieldDataCacheCleaner fieldDataCacheCleaner; + private final ThreadPool threadPool; + private final CircuitBreakerService circuitBreakerService; private volatile Map indices = emptyMap(); private final Map> pendingDeletes = new HashMap<>(); private final OldShardsStats oldShardsStats = new OldShardsStats(); private final IndexStoreConfig indexStoreConfig; private final MapperRegistry mapperRegistry; private final IndexingMemoryController indexingMemoryController; + private final TimeValue cleanInterval; @Override protected void doStart() { + // Start thread that will manage cleaning the field data cache periodically + threadPool.schedule(this.cleanInterval, ThreadPool.Names.SAME, this.fieldDataCacheCleaner); } @Inject public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvironment nodeEnv, ClusterSettings clusterSettings, AnalysisRegistry analysisRegistry, IndicesQueriesRegistry indicesQueriesRegistry, IndexNameExpressionResolver indexNameExpressionResolver, - ClusterService clusterService, MapperRegistry mapperRegistry, ThreadPool threadPool, IndexScopedSettings indexScopedSettings) { + ClusterService clusterService, MapperRegistry mapperRegistry, ThreadPool threadPool, IndexScopedSettings indexScopedSettings, CircuitBreakerService circuitBreakerService) { super(settings); + this.threadPool = threadPool; this.pluginsService = pluginsService; this.nodeEnv = nodeEnv; this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS)); @@ -132,6 +151,18 @@ public class IndicesService extends AbstractLifecycleComponent i clusterSettings.addSettingsUpdateConsumer(IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING, indexStoreConfig::setRateLimitingThrottle); indexingMemoryController = new IndexingMemoryController(settings, threadPool, this); this.indexScopeSetting = indexScopedSettings; + this.circuitBreakerService = circuitBreakerService; + this.indicesFieldDataCache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() { + @Override + public void onRemoval(ShardId shardId, String fieldName, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) { + assert sizeInBytes >= 0 : "When reducing circuit breaker, it should be adjusted with a number higher or equal to 0 and not [" + sizeInBytes + "]"; + circuitBreakerService.getBreaker(CircuitBreaker.FIELDDATA).addWithoutBreaking(-sizeInBytes); + } + }); + this.cleanInterval = INDICES_FIELDDATA_CLEAN_INTERVAL_SETTING.get(settings); + this.fieldDataCacheCleaner = new FieldDataCacheCleaner(indicesFieldDataCache, logger, threadPool, this.cleanInterval); + + } @Override @@ -165,7 +196,7 @@ public class IndicesService extends AbstractLifecycleComponent i @Override protected void doClose() { - IOUtils.closeWhileHandlingException(analysisRegistry, indexingMemoryController); + IOUtils.closeWhileHandlingException(analysisRegistry, indexingMemoryController, indicesFieldDataCache, fieldDataCacheCleaner); } /** @@ -320,7 +351,7 @@ public class IndicesService extends AbstractLifecycleComponent i indexModule.addIndexEventListener(oldShardsStats); final IndexEventListener listener = indexModule.freeze(); listener.beforeIndexCreated(index, idxSettings.getSettings()); - final IndexService indexService = indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, mapperRegistry, indexingMemoryController); + final IndexService indexService = indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, mapperRegistry, indicesFieldDataCache, indexingMemoryController); boolean success = false; try { assert indexService.getIndexEventListener() == listener; @@ -381,6 +412,14 @@ public class IndicesService extends AbstractLifecycleComponent i } } + public IndicesFieldDataCache getIndicesFieldDataCache() { + return indicesFieldDataCache; + } + + public CircuitBreakerService getCircuitBreakerService() { + return circuitBreakerService; + } + static class OldShardsStats implements IndexEventListener { final SearchStats searchStats = new SearchStats(); @@ -760,4 +799,51 @@ public class IndicesService extends AbstractLifecycleComponent i public AnalysisRegistry getAnalysis() { return analysisRegistry; } + + /** + * FieldDataCacheCleaner is a scheduled Runnable used to clean a Guava cache + * periodically. In this case it is the field data cache, because a cache that + * has an entry invalidated may not clean up the entry if it is not read from + * or written to after invalidation. + */ + private final static class FieldDataCacheCleaner implements Runnable, Releasable { + + private final IndicesFieldDataCache cache; + private final ESLogger logger; + private final ThreadPool threadPool; + private final TimeValue interval; + private final AtomicBoolean closed = new AtomicBoolean(false); + + public FieldDataCacheCleaner(IndicesFieldDataCache cache, ESLogger logger, ThreadPool threadPool, TimeValue interval) { + this.cache = cache; + this.logger = logger; + this.threadPool = threadPool; + this.interval = interval; + } + + @Override + public void run() { + long startTimeNS = System.nanoTime(); + if (logger.isTraceEnabled()) { + logger.trace("running periodic field data cache cleanup"); + } + try { + this.cache.getCache().refresh(); + } catch (Exception e) { + logger.warn("Exception during periodic field data cache cleanup:", e); + } + if (logger.isTraceEnabled()) { + logger.trace("periodic field data cache cleanup finished in {} milliseconds", TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)); + } + // Reschedule itself to run again if not closed + if (closed.get() == false) { + threadPool.schedule(interval, ThreadPool.Names.SAME, this); + } + } + + @Override + public void close() { + closed.compareAndSet(false, true); + } + } } diff --git a/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java b/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java index 144f8b7f775..0a3f063dfcc 100644 --- a/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java +++ b/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java @@ -30,14 +30,12 @@ import org.elasticsearch.common.cache.CacheBuilder; import org.elasticsearch.common.cache.RemovalListener; import org.elasticsearch.common.cache.RemovalNotification; import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.fielddata.AtomicFieldData; import org.elasticsearch.index.fielddata.FieldDataType; @@ -45,7 +43,6 @@ import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.fielddata.IndexFieldDataCache; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardUtils; -import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.List; @@ -55,20 +52,12 @@ import java.util.function.ToLongBiFunction; */ public class IndicesFieldDataCache extends AbstractComponent implements RemovalListener, Releasable{ - public static final Setting INDICES_FIELDDATA_CLEAN_INTERVAL_SETTING = Setting.positiveTimeSetting("indices.fielddata.cache.cleanup_interval", TimeValue.timeValueMinutes(1), false, Setting.Scope.CLUSTER); public static final Setting INDICES_FIELDDATA_CACHE_SIZE_KEY = Setting.byteSizeSetting("indices.fielddata.cache.size", new ByteSizeValue(-1), false, Setting.Scope.CLUSTER); - - - private final IndicesFieldDataCacheListener indicesFieldDataCacheListener; + private final IndexFieldDataCache.Listener indicesFieldDataCacheListener; private final Cache cache; - private final TimeValue cleanInterval; - private final ThreadPool threadPool; - private volatile boolean closed = false; - @Inject - public IndicesFieldDataCache(Settings settings, IndicesFieldDataCacheListener indicesFieldDataCacheListener, ThreadPool threadPool) { + public IndicesFieldDataCache(Settings settings, IndexFieldDataCache.Listener indicesFieldDataCacheListener) { super(settings); - this.threadPool = threadPool; this.indicesFieldDataCacheListener = indicesFieldDataCacheListener; final long sizeInBytes = INDICES_FIELDDATA_CACHE_SIZE_KEY.get(settings).bytes(); CacheBuilder cacheBuilder = CacheBuilder.builder() @@ -76,19 +65,12 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL if (sizeInBytes > 0) { cacheBuilder.setMaximumWeight(sizeInBytes).weigher(new FieldDataWeigher()); } - cache = cacheBuilder.build(); - - this.cleanInterval = INDICES_FIELDDATA_CLEAN_INTERVAL_SETTING.get(settings); - // Start thread that will manage cleaning the field data cache periodically - threadPool.schedule(this.cleanInterval, ThreadPool.Names.SAME, - new FieldDataCacheCleaner(this.cache, this.logger, this.threadPool, this.cleanInterval)); } @Override public void close() { cache.invalidateAll(); - this.closed = true; } public IndexFieldDataCache buildIndexFieldDataCache(IndexFieldDataCache.Listener listener, Index index, String fieldName, FieldDataType fieldDataType) { @@ -260,44 +242,5 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL } } - /** - * FieldDataCacheCleaner is a scheduled Runnable used to clean a Guava cache - * periodically. In this case it is the field data cache, because a cache that - * has an entry invalidated may not clean up the entry if it is not read from - * or written to after invalidation. - */ - public class FieldDataCacheCleaner implements Runnable { - private final Cache cache; - private final ESLogger logger; - private final ThreadPool threadPool; - private final TimeValue interval; - - public FieldDataCacheCleaner(Cache cache, ESLogger logger, ThreadPool threadPool, TimeValue interval) { - this.cache = cache; - this.logger = logger; - this.threadPool = threadPool; - this.interval = interval; - } - - @Override - public void run() { - long startTimeNS = System.nanoTime(); - if (logger.isTraceEnabled()) { - logger.trace("running periodic field data cache cleanup"); - } - try { - this.cache.refresh(); - } catch (Exception e) { - logger.warn("Exception during periodic field data cache cleanup:", e); - } - if (logger.isTraceEnabled()) { - logger.trace("periodic field data cache cleanup finished in {} milliseconds", TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)); - } - // Reschedule itself to run again if not closed - if (closed == false) { - threadPool.schedule(interval, ThreadPool.Names.SAME, this); - } - } - } } diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index db942ffd284..88d21764abb 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -392,7 +392,6 @@ public class Node implements Closeable { toClose.add(injector.getInstance(IndicesService.class)); // close filter/fielddata caches after indices toClose.add(injector.getInstance(IndicesQueryCache.class)); - toClose.add(injector.getInstance(IndicesFieldDataCache.class)); toClose.add(injector.getInstance(IndicesStore.class)); toClose.add(() ->stopWatch.stop().start("routing")); toClose.add(injector.getInstance(RoutingService.class)); diff --git a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java index 4fdbf651a6f..f2245cc7de0 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -44,6 +44,7 @@ import org.elasticsearch.index.cache.query.QueryCache; import org.elasticsearch.index.cache.query.index.IndexQueryCache; import org.elasticsearch.index.cache.query.none.NoneQueryCache; import org.elasticsearch.index.engine.EngineException; +import org.elasticsearch.index.fielddata.IndexFieldDataCache; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexSearcherWrapper; import org.elasticsearch.index.shard.ShardId; @@ -96,6 +97,8 @@ public class IndexModuleTests extends ESTestCase { public void addPendingDelete(ShardId shardId, IndexSettings indexSettings) { } }; + + private final IndexFieldDataCache.Listener listener = new IndexFieldDataCache.Listener() {}; private MapperRegistry mapperRegistry; static NodeServicesProvider newNodeServiceProvider(Settings settings, Environment environment, Client client, ScriptEngineService... scriptEngineServices) throws IOException { @@ -106,7 +109,6 @@ public class IndexModuleTests extends ESTestCase { CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService(); PageCacheRecycler recycler = new PageCacheRecycler(settings, threadPool); BigArrays bigArrays = new BigArrays(recycler, circuitBreakerService); - IndicesFieldDataCache indicesFieldDataCache = new IndicesFieldDataCache(settings, new IndicesFieldDataCacheListener(circuitBreakerService), threadPool); Set scriptEngines = Collections.emptySet(); scriptEngines.addAll(Arrays.asList(scriptEngineServices)); ScriptEngineRegistry scriptEngineRegistry = new ScriptEngineRegistry(Collections.emptyList()); @@ -114,7 +116,7 @@ public class IndexModuleTests extends ESTestCase { ScriptSettings scriptSettings = new ScriptSettings(scriptEngineRegistry, scriptContextRegistry); ScriptService scriptService = new ScriptService(settings, environment, scriptEngines, new ResourceWatcherService(settings, threadPool), scriptEngineRegistry, scriptContextRegistry, scriptSettings); IndicesQueriesRegistry indicesQueriesRegistry = new IndicesQueriesRegistry(settings, emptyMap()); - return new NodeServicesProvider(threadPool, indicesQueryCache, warmer, bigArrays, client, scriptService, indicesQueriesRegistry, indicesFieldDataCache, circuitBreakerService); + return new NodeServicesProvider(threadPool, indicesQueryCache, warmer, bigArrays, client, scriptService, indicesQueriesRegistry, circuitBreakerService); } @Override @@ -143,7 +145,7 @@ public class IndexModuleTests extends ESTestCase { IndexModule module = new IndexModule(indexSettings, null, new AnalysisRegistry(null, environment)); module.setSearcherWrapper((s) -> new Wrapper()); module.engineFactory.set(new MockEngineFactory(AssertingDirectoryReader.class)); - IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry); + IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, new IndicesFieldDataCache(settings, listener)); assertTrue(indexService.getSearcherWrapper() instanceof Wrapper); assertSame(indexService.getEngineFactory(), module.engineFactory.get()); indexService.close("simon says", false); @@ -161,7 +163,7 @@ public class IndexModuleTests extends ESTestCase { final Index index = indexSettings.getIndex(); IndexModule module = new IndexModule(indexSettings, null, new AnalysisRegistry(null, environment)); module.addIndexStore("foo_store", FooStore::new); - IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry); + IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, new IndicesFieldDataCache(settings, listener)); assertTrue(indexService.getIndexStore() instanceof FooStore); try { module.addIndexStore("foo_store", FooStore::new); @@ -184,7 +186,8 @@ public class IndexModuleTests extends ESTestCase { IndexModule module = new IndexModule(indexSettings, null, new AnalysisRegistry(null, environment)); Consumer listener = (s) -> {}; module.addIndexEventListener(eventListener); - IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry); + IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, + new IndicesFieldDataCache(settings, this.listener)); IndexSettings x = indexService.getIndexSettings(); assertEquals(x.getSettings().getAsMap(), indexSettings.getSettings().getAsMap()); assertEquals(x.getIndex(), index); @@ -208,7 +211,8 @@ public class IndexModuleTests extends ESTestCase { } - IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry); + IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, + new IndicesFieldDataCache(settings, listener)); assertSame(booleanSetting, indexService.getIndexSettings().getScopedSettings().get(booleanSetting.getKey())); indexService.close("simon says", false); @@ -234,7 +238,8 @@ public class IndexModuleTests extends ESTestCase { } }); - IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry); + IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, + new IndicesFieldDataCache(settings, listener)); SimilarityService similarityService = indexService.similarityService(); assertNotNull(similarityService.getSimilarity("my_similarity")); assertTrue(similarityService.getSimilarity("my_similarity").get() instanceof TestSimilarity); @@ -251,7 +256,8 @@ public class IndexModuleTests extends ESTestCase { .build(); IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null, new AnalysisRegistry(null, environment)); try { - module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry); + module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, + new IndicesFieldDataCache(settings, listener)); } catch (IllegalArgumentException ex) { assertEquals("Unknown Similarity type [test_similarity] for [my_similarity]", ex.getMessage()); } @@ -265,7 +271,8 @@ public class IndexModuleTests extends ESTestCase { .build(); IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null, new AnalysisRegistry(null, environment)); try { - module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry); + module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, + new IndicesFieldDataCache(settings, listener)); } catch (IllegalArgumentException ex) { assertEquals("Similarity [my_similarity] must have an associated type", ex.getMessage()); } @@ -312,7 +319,8 @@ public class IndexModuleTests extends ESTestCase { assertEquals(e.getMessage(), "Can't register the same [query_cache] more than once for [custom]"); } - IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry); + IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, + new IndicesFieldDataCache(settings, listener)); assertTrue(indexService.cache().query() instanceof CustomQueryCache); indexService.close("simon says", false); } @@ -322,7 +330,8 @@ public class IndexModuleTests extends ESTestCase { .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null, new AnalysisRegistry(null, environment)); - IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry); + IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, + new IndicesFieldDataCache(settings, listener)); assertTrue(indexService.cache().query() instanceof IndexQueryCache); indexService.close("simon says", false); } diff --git a/core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTestCase.java b/core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTestCase.java index 0bdbfb58722..6b3f0ddbd3f 100644 --- a/core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTestCase.java @@ -48,6 +48,7 @@ import org.elasticsearch.index.mapper.geo.GeoPointFieldMapper; import org.elasticsearch.index.mapper.geo.GeoPointFieldMapperLegacy; import org.elasticsearch.index.mapper.internal.ParentFieldMapper; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; @@ -132,7 +133,7 @@ public abstract class AbstractFieldDataTestCase extends ESSingleNodeTestCase { .put(IndexMetaData.SETTING_VERSION_CREATED, version).build(); indexService = createIndex("test", settings); mapperService = indexService.mapperService(); - indicesFieldDataCache = getInstanceFromNode(IndicesFieldDataCache.class); + indicesFieldDataCache = getInstanceFromNode(IndicesService.class).getIndicesFieldDataCache(); ifdService = indexService.fieldData(); // LogByteSizeMP to preserve doc ID order writer = new IndexWriter(new RAMDirectory(), new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(new LogByteSizeMergePolicy())); diff --git a/core/src/test/java/org/elasticsearch/index/fielddata/IndexFieldDataServiceTests.java b/core/src/test/java/org/elasticsearch/index/fielddata/IndexFieldDataServiceTests.java index da2899bf2c3..0a2a3c4d79c 100644 --- a/core/src/test/java/org/elasticsearch/index/fielddata/IndexFieldDataServiceTests.java +++ b/core/src/test/java/org/elasticsearch/index/fielddata/IndexFieldDataServiceTests.java @@ -51,6 +51,7 @@ import org.elasticsearch.index.mapper.core.LongFieldMapper; import org.elasticsearch.index.mapper.core.ShortFieldMapper; import org.elasticsearch.index.mapper.core.StringFieldMapper; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.test.ESSingleNodeTestCase; @@ -131,10 +132,10 @@ public class IndexFieldDataServiceTests extends ESSingleNodeTestCase { public void testFieldDataCacheListener() throws Exception { final IndexService indexService = createIndex("test"); - IndexFieldDataService shardPrivateService = indexService.fieldData(); + final IndicesService indicesService = getInstanceFromNode(IndicesService.class); // copy the ifdService since we can set the listener only once. final IndexFieldDataService ifdService = new IndexFieldDataService(indexService.getIndexSettings(), - getInstanceFromNode(IndicesFieldDataCache.class), getInstanceFromNode(CircuitBreakerService.class), indexService.mapperService()); + indicesService.getIndicesFieldDataCache(), indicesService.getCircuitBreakerService(), indexService.mapperService()); final BuilderContext ctx = new BuilderContext(indexService.getIndexSettings().getSettings(), new ContentPath(1)); final MappedFieldType mapper1 = MapperBuilders.stringField("s").tokenized(false).docValues(true).fieldDataSettings(Settings.builder().put(FieldDataType.FORMAT_KEY, "paged_bytes").build()).build(ctx).fieldType(); @@ -205,7 +206,7 @@ public class IndexFieldDataServiceTests extends ESSingleNodeTestCase { private void doTestRequireDocValues(MappedFieldType ft) { ThreadPool threadPool = new ThreadPool("random_threadpool_name"); try { - IndicesFieldDataCache cache = new IndicesFieldDataCache(Settings.EMPTY, null, threadPool); + IndicesFieldDataCache cache = new IndicesFieldDataCache(Settings.EMPTY, null); IndexFieldDataService ifds = new IndexFieldDataService(IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), cache, null, null); ft.setName("some_long"); ft.setHasDocValues(true); @@ -238,7 +239,7 @@ public class IndexFieldDataServiceTests extends ESSingleNodeTestCase { ThreadPool threadPool = new ThreadPool("random_threadpool_name"); StringFieldMapper.StringFieldType ft = new StringFieldMapper.StringFieldType(); try { - IndicesFieldDataCache cache = new IndicesFieldDataCache(Settings.EMPTY, null, threadPool); + IndicesFieldDataCache cache = new IndicesFieldDataCache(Settings.EMPTY, null); IndexFieldDataService ifds = new IndexFieldDataService(IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), cache, null, null); ft.setName("some_str"); ft.setFieldDataType(new FieldDataType("string", Settings.builder().put(FieldDataType.FORMAT_KEY, "disabled").build())); diff --git a/core/src/test/java/org/elasticsearch/index/query/AbstractQueryTestCase.java b/core/src/test/java/org/elasticsearch/index/query/AbstractQueryTestCase.java index d9f70978ee2..29d556c9a8a 100644 --- a/core/src/test/java/org/elasticsearch/index/query/AbstractQueryTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/query/AbstractQueryTestCase.java @@ -71,6 +71,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.cache.bitset.BitsetFilterCache; +import org.elasticsearch.index.fielddata.IndexFieldDataCache; import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.query.support.QueryParsers; @@ -271,7 +272,9 @@ public abstract class AbstractQueryTestCase> SimilarityService similarityService = new SimilarityService(idxSettings, Collections.emptyMap()); MapperRegistry mapperRegistry = injector.getInstance(MapperRegistry.class); MapperService mapperService = new MapperService(idxSettings, analysisService, similarityService, mapperRegistry, () -> queryShardContext); - indexFieldDataService = new IndexFieldDataService(idxSettings, injector.getInstance(IndicesFieldDataCache.class), injector.getInstance(CircuitBreakerService.class), mapperService); + IndicesFieldDataCache indicesFieldDataCache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() { + }); + indexFieldDataService = new IndexFieldDataService(idxSettings, indicesFieldDataCache, injector.getInstance(CircuitBreakerService.class), mapperService); BitsetFilterCache bitsetFilterCache = new BitsetFilterCache(idxSettings, new IndicesWarmer(idxSettings.getNodeSettings(), null), new BitsetFilterCache.Listener() { @Override public void onCache(ShardId shardId, Accountable accountable) { diff --git a/core/src/test/java/org/elasticsearch/indices/memory/breaker/RandomExceptionCircuitBreakerIT.java b/core/src/test/java/org/elasticsearch/indices/memory/breaker/RandomExceptionCircuitBreakerIT.java index 4807cf61dea..a406106f110 100644 --- a/core/src/test/java/org/elasticsearch/indices/memory/breaker/RandomExceptionCircuitBreakerIT.java +++ b/core/src/test/java/org/elasticsearch/indices/memory/breaker/RandomExceptionCircuitBreakerIT.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.MockEngineFactoryPlugin; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.sort.SortOrder; @@ -176,7 +177,7 @@ public class RandomExceptionCircuitBreakerIT extends ESIntegTestCase { // Since .cleanUp() is no longer called on cache clear, we need to call it on each node manually for (String node : internalCluster().getNodeNames()) { - final IndicesFieldDataCache fdCache = internalCluster().getInstance(IndicesFieldDataCache.class, node); + final IndicesFieldDataCache fdCache = internalCluster().getInstance(IndicesService.class, node).getIndicesFieldDataCache(); // Clean up the cache, ensuring that entries' listeners have been called fdCache.getCache().refresh(); } diff --git a/modules/lang-mustache/src/test/java/org/elasticsearch/messy/tests/TemplateQueryParserTests.java b/modules/lang-mustache/src/test/java/org/elasticsearch/messy/tests/TemplateQueryParserTests.java index 2e19f660047..e0d8fc3a506 100644 --- a/modules/lang-mustache/src/test/java/org/elasticsearch/messy/tests/TemplateQueryParserTests.java +++ b/modules/lang-mustache/src/test/java/org/elasticsearch/messy/tests/TemplateQueryParserTests.java @@ -43,6 +43,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.cache.bitset.BitsetFilterCache; +import org.elasticsearch.index.fielddata.IndexFieldDataCache; import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.query.QueryShardContext; @@ -138,7 +139,8 @@ public class TemplateQueryParserTests extends ESTestCase { SimilarityService similarityService = new SimilarityService(idxSettings, Collections.emptyMap()); MapperRegistry mapperRegistry = new IndicesModule().getMapperRegistry(); MapperService mapperService = new MapperService(idxSettings, analysisService, similarityService, mapperRegistry, () -> context); - IndexFieldDataService indexFieldDataService =new IndexFieldDataService(idxSettings, injector.getInstance(IndicesFieldDataCache.class), injector.getInstance(CircuitBreakerService.class), mapperService); + IndicesFieldDataCache cache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() {}); + IndexFieldDataService indexFieldDataService =new IndexFieldDataService(idxSettings, cache, injector.getInstance(CircuitBreakerService.class), mapperService); BitsetFilterCache bitsetFilterCache = new BitsetFilterCache(idxSettings, new IndicesWarmer(idxSettings.getNodeSettings(), null), new BitsetFilterCache.Listener() { @Override public void onCache(ShardId shardId, Accountable accountable) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 61a58c59b1e..cfc820460f8 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1819,7 +1819,7 @@ public final class InternalTestCluster extends TestCluster { // network request, because a network request can increment one // of the breakers for (NodeAndClient nodeAndClient : nodes.values()) { - final IndicesFieldDataCache fdCache = getInstanceFromNode(IndicesFieldDataCache.class, nodeAndClient.node); + final IndicesFieldDataCache fdCache = getInstanceFromNode(IndicesService.class, nodeAndClient.node).getIndicesFieldDataCache(); // Clean up the cache, ensuring that entries' listeners have been called fdCache.getCache().refresh();