mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-23 13:26:02 +00:00
Simplify IndicesFieldDataCache and detach from guice
Indices level field data cacheing belongs into IndicesService and doesn't need to be wired by guice. This commit also moves the async cache refresh out of the class into IndicesService such that threadpool dependencies are removed and testing / creation becomes simpler.
This commit is contained in:
parent
608f4f8b6e
commit
00be9e58f2
@ -60,6 +60,7 @@ import org.elasticsearch.gateway.PrimaryShardAllocator;
|
||||
import org.elasticsearch.http.HttpTransportSettings;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.store.IndexStoreConfig;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.analysis.HunspellService;
|
||||
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
|
||||
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
|
||||
@ -290,7 +291,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
||||
ScriptService.SCRIPT_CACHE_SIZE_SETTING,
|
||||
ScriptService.SCRIPT_CACHE_EXPIRE_SETTING,
|
||||
ScriptService.SCRIPT_AUTO_RELOAD_ENABLED_SETTING,
|
||||
IndicesFieldDataCache.INDICES_FIELDDATA_CLEAN_INTERVAL_SETTING,
|
||||
IndicesService.INDICES_FIELDDATA_CLEAN_INTERVAL_SETTING,
|
||||
IndicesFieldDataCache.INDICES_FIELDDATA_CACHE_SIZE_KEY,
|
||||
IndicesRequestCache.INDICES_CACHE_QUERY_SIZE,
|
||||
IndicesRequestCache.INDICES_CACHE_QUERY_EXPIRE,
|
||||
|
@ -38,6 +38,7 @@ import org.elasticsearch.index.similarity.SimilarityService;
|
||||
import org.elasticsearch.index.store.IndexStore;
|
||||
import org.elasticsearch.index.store.IndexStoreConfig;
|
||||
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
|
||||
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
|
||||
import org.elasticsearch.indices.mapper.MapperRegistry;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -240,7 +241,7 @@ public final class IndexModule {
|
||||
IndexSearcherWrapper newWrapper(final IndexService indexService);
|
||||
}
|
||||
|
||||
public IndexService newIndexService(NodeEnvironment environment, IndexService.ShardStoreDeleter shardStoreDeleter, NodeServicesProvider servicesProvider, MapperRegistry mapperRegistry,
|
||||
public IndexService newIndexService(NodeEnvironment environment, IndexService.ShardStoreDeleter shardStoreDeleter, NodeServicesProvider servicesProvider, MapperRegistry mapperRegistry, IndicesFieldDataCache indicesFieldDataCache,
|
||||
IndexingOperationListener... listeners) throws IOException {
|
||||
IndexSearcherWrapperFactory searcherWrapperFactory = indexSearcherWrapper.get() == null ? (shard) -> null : indexSearcherWrapper.get();
|
||||
IndexEventListener eventListener = freeze();
|
||||
@ -264,7 +265,7 @@ public final class IndexModule {
|
||||
final BiFunction<IndexSettings, IndicesQueryCache, QueryCache> queryCacheProvider = queryCaches.get(queryCacheType);
|
||||
final QueryCache queryCache = queryCacheProvider.apply(indexSettings, servicesProvider.getIndicesQueryCache());
|
||||
return new IndexService(indexSettings, environment, new SimilarityService(indexSettings, similarities), shardStoreDeleter, analysisRegistry, engineFactory.get(),
|
||||
servicesProvider, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry, listeners);
|
||||
servicesProvider, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry, indicesFieldDataCache, listeners);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -76,6 +76,7 @@ import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.AliasFilterParsingException;
|
||||
import org.elasticsearch.indices.InvalidAliasNameException;
|
||||
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
|
||||
import org.elasticsearch.indices.mapper.MapperRegistry;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
@ -122,13 +123,14 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
||||
IndexEventListener eventListener,
|
||||
IndexModule.IndexSearcherWrapperFactory wrapperFactory,
|
||||
MapperRegistry mapperRegistry,
|
||||
IndicesFieldDataCache indicesFieldDataCache,
|
||||
IndexingOperationListener... listenersIn) throws IOException {
|
||||
super(indexSettings);
|
||||
this.indexSettings = indexSettings;
|
||||
this.analysisService = registry.build(indexSettings);
|
||||
this.similarityService = similarityService;
|
||||
this.mapperService = new MapperService(indexSettings, analysisService, similarityService, mapperRegistry, IndexService.this::getQueryShardContext);
|
||||
this.indexFieldData = new IndexFieldDataService(indexSettings, nodeServicesProvider.getIndicesFieldDataCache(), nodeServicesProvider.getCircuitBreakerService(), mapperService);
|
||||
this.indexFieldData = new IndexFieldDataService(indexSettings, indicesFieldDataCache, nodeServicesProvider.getCircuitBreakerService(), mapperService);
|
||||
this.shardStoreDeleter = shardStoreDeleter;
|
||||
this.eventListener = eventListener;
|
||||
this.nodeEnv = nodeEnv;
|
||||
|
@ -26,7 +26,6 @@ import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.indices.IndicesWarmer;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
|
||||
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
|
||||
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
@ -45,11 +44,10 @@ public final class NodeServicesProvider {
|
||||
private final Client client;
|
||||
private final IndicesQueriesRegistry indicesQueriesRegistry;
|
||||
private final ScriptService scriptService;
|
||||
private final IndicesFieldDataCache indicesFieldDataCache;
|
||||
private final CircuitBreakerService circuitBreakerService;
|
||||
|
||||
@Inject
|
||||
public NodeServicesProvider(ThreadPool threadPool, IndicesQueryCache indicesQueryCache, @Nullable IndicesWarmer warmer, BigArrays bigArrays, Client client, ScriptService scriptService, IndicesQueriesRegistry indicesQueriesRegistry, IndicesFieldDataCache indicesFieldDataCache, CircuitBreakerService circuitBreakerService) {
|
||||
public NodeServicesProvider(ThreadPool threadPool, IndicesQueryCache indicesQueryCache, @Nullable IndicesWarmer warmer, BigArrays bigArrays, Client client, ScriptService scriptService, IndicesQueriesRegistry indicesQueriesRegistry, CircuitBreakerService circuitBreakerService) {
|
||||
this.threadPool = threadPool;
|
||||
this.indicesQueryCache = indicesQueryCache;
|
||||
this.warmer = warmer;
|
||||
@ -57,7 +55,6 @@ public final class NodeServicesProvider {
|
||||
this.client = client;
|
||||
this.indicesQueriesRegistry = indicesQueriesRegistry;
|
||||
this.scriptService = scriptService;
|
||||
this.indicesFieldDataCache = indicesFieldDataCache;
|
||||
this.circuitBreakerService = circuitBreakerService;
|
||||
}
|
||||
|
||||
@ -87,10 +84,6 @@ public final class NodeServicesProvider {
|
||||
return scriptService;
|
||||
}
|
||||
|
||||
public IndicesFieldDataCache getIndicesFieldDataCache() {
|
||||
return indicesFieldDataCache;
|
||||
}
|
||||
|
||||
public CircuitBreakerService getCircuitBreakerService() {
|
||||
return circuitBreakerService;
|
||||
}
|
||||
|
@ -48,12 +48,12 @@ public interface IndexFieldDataCache {
|
||||
/**
|
||||
* Called after the fielddata is loaded during the cache phase
|
||||
*/
|
||||
void onCache(ShardId shardId, String fieldName, FieldDataType fieldDataType, Accountable ramUsage);
|
||||
default void onCache(ShardId shardId, String fieldName, FieldDataType fieldDataType, Accountable ramUsage){}
|
||||
|
||||
/**
|
||||
* Called after the fielddata is unloaded
|
||||
*/
|
||||
void onRemoval(ShardId shardId, String fieldName, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes);
|
||||
default void onRemoval(ShardId shardId, String fieldName, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes){}
|
||||
}
|
||||
|
||||
class None implements IndexFieldDataCache {
|
||||
|
@ -55,12 +55,9 @@ import org.elasticsearch.index.mapper.internal.VersionFieldMapper;
|
||||
import org.elasticsearch.index.mapper.ip.IpFieldMapper;
|
||||
import org.elasticsearch.index.mapper.object.ObjectMapper;
|
||||
import org.elasticsearch.index.percolator.PercolatorFieldMapper;
|
||||
import org.elasticsearch.index.termvectors.TermVectorsService;
|
||||
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
|
||||
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
|
||||
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
|
||||
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
|
||||
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;
|
||||
import org.elasticsearch.indices.flush.SyncedFlushService;
|
||||
import org.elasticsearch.indices.mapper.MapperRegistry;
|
||||
import org.elasticsearch.indices.recovery.RecoverySettings;
|
||||
@ -165,13 +162,11 @@ public class IndicesModule extends AbstractModule {
|
||||
bind(SyncedFlushService.class).asEagerSingleton();
|
||||
bind(IndicesQueryCache.class).asEagerSingleton();
|
||||
bind(IndicesRequestCache.class).asEagerSingleton();
|
||||
bind(IndicesFieldDataCache.class).asEagerSingleton();
|
||||
bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton();
|
||||
bind(IndicesTTLService.class).asEagerSingleton();
|
||||
bind(IndicesWarmer.class).asEagerSingleton();
|
||||
bind(UpdateHelper.class).asEagerSingleton();
|
||||
bind(MetaDataIndexUpgradeService.class).asEagerSingleton();
|
||||
bind(IndicesFieldDataCacheListener.class).asEagerSingleton();
|
||||
bind(NodeServicesProvider.class).asEagerSingleton();
|
||||
}
|
||||
|
||||
|
@ -20,6 +20,7 @@
|
||||
package org.elasticsearch.indices;
|
||||
|
||||
import org.apache.lucene.store.LockObtainFailedException;
|
||||
import org.apache.lucene.util.Accountable;
|
||||
import org.apache.lucene.util.CollectionUtil;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
@ -33,11 +34,15 @@ import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.IndexScopedSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
@ -51,6 +56,8 @@ import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.NodeServicesProvider;
|
||||
import org.elasticsearch.index.analysis.AnalysisRegistry;
|
||||
import org.elasticsearch.index.fielddata.FieldDataType;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
|
||||
import org.elasticsearch.index.flush.FlushStats;
|
||||
import org.elasticsearch.index.get.GetStats;
|
||||
import org.elasticsearch.index.merge.MergeStats;
|
||||
@ -63,6 +70,8 @@ import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexingStats;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.store.IndexStoreConfig;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
|
||||
import org.elasticsearch.indices.mapper.MapperRegistry;
|
||||
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
|
||||
import org.elasticsearch.plugins.PluginsService;
|
||||
@ -81,6 +90,7 @@ import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
@ -94,6 +104,7 @@ import static org.elasticsearch.common.util.CollectionUtils.arrayAsArrayList;
|
||||
public class IndicesService extends AbstractLifecycleComponent<IndicesService> implements Iterable<IndexService>, IndexService.ShardStoreDeleter {
|
||||
|
||||
public static final String INDICES_SHARDS_CLOSED_TIMEOUT = "indices.shards_closed_timeout";
|
||||
public static final Setting<TimeValue> INDICES_FIELDDATA_CLEAN_INTERVAL_SETTING = Setting.positiveTimeSetting("indices.fielddata.cache.cleanup_interval", TimeValue.timeValueMinutes(1), false, Setting.Scope.CLUSTER);
|
||||
private final PluginsService pluginsService;
|
||||
private final NodeEnvironment nodeEnv;
|
||||
private final TimeValue shardsClosedTimeout;
|
||||
@ -102,23 +113,30 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
||||
private final ClusterService clusterService;
|
||||
private final IndexNameExpressionResolver indexNameExpressionResolver;
|
||||
private final IndexScopedSettings indexScopeSetting;
|
||||
private final IndicesFieldDataCache indicesFieldDataCache;
|
||||
private final FieldDataCacheCleaner fieldDataCacheCleaner;
|
||||
private final ThreadPool threadPool;
|
||||
private volatile Map<String, IndexService> indices = emptyMap();
|
||||
private final Map<Index, List<PendingDelete>> pendingDeletes = new HashMap<>();
|
||||
private final OldShardsStats oldShardsStats = new OldShardsStats();
|
||||
private final IndexStoreConfig indexStoreConfig;
|
||||
private final MapperRegistry mapperRegistry;
|
||||
private final IndexingMemoryController indexingMemoryController;
|
||||
private final TimeValue cleanInterval;
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
// Start thread that will manage cleaning the field data cache periodically
|
||||
threadPool.schedule(this.cleanInterval, ThreadPool.Names.SAME, this.fieldDataCacheCleaner);
|
||||
}
|
||||
|
||||
@Inject
|
||||
public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvironment nodeEnv,
|
||||
ClusterSettings clusterSettings, AnalysisRegistry analysisRegistry,
|
||||
IndicesQueriesRegistry indicesQueriesRegistry, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
ClusterService clusterService, MapperRegistry mapperRegistry, ThreadPool threadPool, IndexScopedSettings indexScopedSettings) {
|
||||
ClusterService clusterService, MapperRegistry mapperRegistry, ThreadPool threadPool, IndexScopedSettings indexScopedSettings, CircuitBreakerService circuitBreakerService) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.pluginsService = pluginsService;
|
||||
this.nodeEnv = nodeEnv;
|
||||
this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS));
|
||||
@ -132,6 +150,17 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
||||
clusterSettings.addSettingsUpdateConsumer(IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING, indexStoreConfig::setRateLimitingThrottle);
|
||||
indexingMemoryController = new IndexingMemoryController(settings, threadPool, this);
|
||||
this.indexScopeSetting = indexScopedSettings;
|
||||
this.indicesFieldDataCache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() {
|
||||
@Override
|
||||
public void onRemoval(ShardId shardId, String fieldName, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) {
|
||||
assert sizeInBytes >= 0 : "When reducing circuit breaker, it should be adjusted with a number higher or equal to 0 and not [" + sizeInBytes + "]";
|
||||
circuitBreakerService.getBreaker(CircuitBreaker.FIELDDATA).addWithoutBreaking(-sizeInBytes);
|
||||
}
|
||||
});
|
||||
this.cleanInterval = INDICES_FIELDDATA_CLEAN_INTERVAL_SETTING.get(settings);
|
||||
this.fieldDataCacheCleaner = new FieldDataCacheCleaner(indicesFieldDataCache, logger, threadPool, this.cleanInterval);
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -165,7 +194,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
||||
|
||||
@Override
|
||||
protected void doClose() {
|
||||
IOUtils.closeWhileHandlingException(analysisRegistry, indexingMemoryController);
|
||||
IOUtils.closeWhileHandlingException(analysisRegistry, indexingMemoryController, indicesFieldDataCache, fieldDataCacheCleaner);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -320,7 +349,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
||||
indexModule.addIndexEventListener(oldShardsStats);
|
||||
final IndexEventListener listener = indexModule.freeze();
|
||||
listener.beforeIndexCreated(index, idxSettings.getSettings());
|
||||
final IndexService indexService = indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, mapperRegistry, indexingMemoryController);
|
||||
final IndexService indexService = indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, mapperRegistry, indicesFieldDataCache, indexingMemoryController);
|
||||
boolean success = false;
|
||||
try {
|
||||
assert indexService.getIndexEventListener() == listener;
|
||||
@ -381,6 +410,10 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
||||
}
|
||||
}
|
||||
|
||||
public IndicesFieldDataCache getIndicesFieldDataCache() {
|
||||
return indicesFieldDataCache;
|
||||
}
|
||||
|
||||
static class OldShardsStats implements IndexEventListener {
|
||||
|
||||
final SearchStats searchStats = new SearchStats();
|
||||
@ -760,4 +793,51 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
||||
public AnalysisRegistry getAnalysis() {
|
||||
return analysisRegistry;
|
||||
}
|
||||
|
||||
/**
|
||||
* FieldDataCacheCleaner is a scheduled Runnable used to clean a Guava cache
|
||||
* periodically. In this case it is the field data cache, because a cache that
|
||||
* has an entry invalidated may not clean up the entry if it is not read from
|
||||
* or written to after invalidation.
|
||||
*/
|
||||
private final static class FieldDataCacheCleaner implements Runnable, Releasable {
|
||||
|
||||
private final IndicesFieldDataCache cache;
|
||||
private final ESLogger logger;
|
||||
private final ThreadPool threadPool;
|
||||
private final TimeValue interval;
|
||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
|
||||
public FieldDataCacheCleaner(IndicesFieldDataCache cache, ESLogger logger, ThreadPool threadPool, TimeValue interval) {
|
||||
this.cache = cache;
|
||||
this.logger = logger;
|
||||
this.threadPool = threadPool;
|
||||
this.interval = interval;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
long startTimeNS = System.nanoTime();
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("running periodic field data cache cleanup");
|
||||
}
|
||||
try {
|
||||
this.cache.getCache().refresh();
|
||||
} catch (Exception e) {
|
||||
logger.warn("Exception during periodic field data cache cleanup:", e);
|
||||
}
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("periodic field data cache cleanup finished in {} milliseconds", TimeValue.nsecToMSec(System.nanoTime() - startTimeNS));
|
||||
}
|
||||
// Reschedule itself to run again if not closed
|
||||
if (closed.get() == false) {
|
||||
threadPool.schedule(interval, ThreadPool.Names.SAME, this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
closed.compareAndSet(false, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -30,14 +30,12 @@ import org.elasticsearch.common.cache.CacheBuilder;
|
||||
import org.elasticsearch.common.cache.RemovalListener;
|
||||
import org.elasticsearch.common.cache.RemovalNotification;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.fielddata.AtomicFieldData;
|
||||
import org.elasticsearch.index.fielddata.FieldDataType;
|
||||
@ -45,7 +43,6 @@ import org.elasticsearch.index.fielddata.IndexFieldData;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.ShardUtils;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
@ -55,20 +52,12 @@ import java.util.function.ToLongBiFunction;
|
||||
*/
|
||||
public class IndicesFieldDataCache extends AbstractComponent implements RemovalListener<IndicesFieldDataCache.Key, Accountable>, Releasable{
|
||||
|
||||
public static final Setting<TimeValue> INDICES_FIELDDATA_CLEAN_INTERVAL_SETTING = Setting.positiveTimeSetting("indices.fielddata.cache.cleanup_interval", TimeValue.timeValueMinutes(1), false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<ByteSizeValue> INDICES_FIELDDATA_CACHE_SIZE_KEY = Setting.byteSizeSetting("indices.fielddata.cache.size", new ByteSizeValue(-1), false, Setting.Scope.CLUSTER);
|
||||
|
||||
|
||||
private final IndicesFieldDataCacheListener indicesFieldDataCacheListener;
|
||||
private final IndexFieldDataCache.Listener indicesFieldDataCacheListener;
|
||||
private final Cache<Key, Accountable> cache;
|
||||
private final TimeValue cleanInterval;
|
||||
private final ThreadPool threadPool;
|
||||
private volatile boolean closed = false;
|
||||
|
||||
@Inject
|
||||
public IndicesFieldDataCache(Settings settings, IndicesFieldDataCacheListener indicesFieldDataCacheListener, ThreadPool threadPool) {
|
||||
public IndicesFieldDataCache(Settings settings, IndexFieldDataCache.Listener indicesFieldDataCacheListener) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.indicesFieldDataCacheListener = indicesFieldDataCacheListener;
|
||||
final long sizeInBytes = INDICES_FIELDDATA_CACHE_SIZE_KEY.get(settings).bytes();
|
||||
CacheBuilder<Key, Accountable> cacheBuilder = CacheBuilder.<Key, Accountable>builder()
|
||||
@ -76,19 +65,12 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
|
||||
if (sizeInBytes > 0) {
|
||||
cacheBuilder.setMaximumWeight(sizeInBytes).weigher(new FieldDataWeigher());
|
||||
}
|
||||
|
||||
cache = cacheBuilder.build();
|
||||
|
||||
this.cleanInterval = INDICES_FIELDDATA_CLEAN_INTERVAL_SETTING.get(settings);
|
||||
// Start thread that will manage cleaning the field data cache periodically
|
||||
threadPool.schedule(this.cleanInterval, ThreadPool.Names.SAME,
|
||||
new FieldDataCacheCleaner(this.cache, this.logger, this.threadPool, this.cleanInterval));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
cache.invalidateAll();
|
||||
this.closed = true;
|
||||
}
|
||||
|
||||
public IndexFieldDataCache buildIndexFieldDataCache(IndexFieldDataCache.Listener listener, Index index, String fieldName, FieldDataType fieldDataType) {
|
||||
@ -260,44 +242,5 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* FieldDataCacheCleaner is a scheduled Runnable used to clean a Guava cache
|
||||
* periodically. In this case it is the field data cache, because a cache that
|
||||
* has an entry invalidated may not clean up the entry if it is not read from
|
||||
* or written to after invalidation.
|
||||
*/
|
||||
public class FieldDataCacheCleaner implements Runnable {
|
||||
|
||||
private final Cache<Key, Accountable> cache;
|
||||
private final ESLogger logger;
|
||||
private final ThreadPool threadPool;
|
||||
private final TimeValue interval;
|
||||
|
||||
public FieldDataCacheCleaner(Cache cache, ESLogger logger, ThreadPool threadPool, TimeValue interval) {
|
||||
this.cache = cache;
|
||||
this.logger = logger;
|
||||
this.threadPool = threadPool;
|
||||
this.interval = interval;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
long startTimeNS = System.nanoTime();
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("running periodic field data cache cleanup");
|
||||
}
|
||||
try {
|
||||
this.cache.refresh();
|
||||
} catch (Exception e) {
|
||||
logger.warn("Exception during periodic field data cache cleanup:", e);
|
||||
}
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("periodic field data cache cleanup finished in {} milliseconds", TimeValue.nsecToMSec(System.nanoTime() - startTimeNS));
|
||||
}
|
||||
// Reschedule itself to run again if not closed
|
||||
if (closed == false) {
|
||||
threadPool.schedule(interval, ThreadPool.Names.SAME, this);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -392,7 +392,6 @@ public class Node implements Closeable {
|
||||
toClose.add(injector.getInstance(IndicesService.class));
|
||||
// close filter/fielddata caches after indices
|
||||
toClose.add(injector.getInstance(IndicesQueryCache.class));
|
||||
toClose.add(injector.getInstance(IndicesFieldDataCache.class));
|
||||
toClose.add(injector.getInstance(IndicesStore.class));
|
||||
toClose.add(() ->stopWatch.stop().start("routing"));
|
||||
toClose.add(injector.getInstance(RoutingService.class));
|
||||
|
@ -44,6 +44,7 @@ import org.elasticsearch.index.cache.query.QueryCache;
|
||||
import org.elasticsearch.index.cache.query.index.IndexQueryCache;
|
||||
import org.elasticsearch.index.cache.query.none.NoneQueryCache;
|
||||
import org.elasticsearch.index.engine.EngineException;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
|
||||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
import org.elasticsearch.index.shard.IndexSearcherWrapper;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
@ -96,6 +97,8 @@ public class IndexModuleTests extends ESTestCase {
|
||||
public void addPendingDelete(ShardId shardId, IndexSettings indexSettings) {
|
||||
}
|
||||
};
|
||||
|
||||
private final IndexFieldDataCache.Listener listener = new IndexFieldDataCache.Listener() {};
|
||||
private MapperRegistry mapperRegistry;
|
||||
|
||||
static NodeServicesProvider newNodeServiceProvider(Settings settings, Environment environment, Client client, ScriptEngineService... scriptEngineServices) throws IOException {
|
||||
@ -106,7 +109,6 @@ public class IndexModuleTests extends ESTestCase {
|
||||
CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService();
|
||||
PageCacheRecycler recycler = new PageCacheRecycler(settings, threadPool);
|
||||
BigArrays bigArrays = new BigArrays(recycler, circuitBreakerService);
|
||||
IndicesFieldDataCache indicesFieldDataCache = new IndicesFieldDataCache(settings, new IndicesFieldDataCacheListener(circuitBreakerService), threadPool);
|
||||
Set<ScriptEngineService> scriptEngines = Collections.emptySet();
|
||||
scriptEngines.addAll(Arrays.asList(scriptEngineServices));
|
||||
ScriptEngineRegistry scriptEngineRegistry = new ScriptEngineRegistry(Collections.emptyList());
|
||||
@ -114,7 +116,7 @@ public class IndexModuleTests extends ESTestCase {
|
||||
ScriptSettings scriptSettings = new ScriptSettings(scriptEngineRegistry, scriptContextRegistry);
|
||||
ScriptService scriptService = new ScriptService(settings, environment, scriptEngines, new ResourceWatcherService(settings, threadPool), scriptEngineRegistry, scriptContextRegistry, scriptSettings);
|
||||
IndicesQueriesRegistry indicesQueriesRegistry = new IndicesQueriesRegistry(settings, emptyMap());
|
||||
return new NodeServicesProvider(threadPool, indicesQueryCache, warmer, bigArrays, client, scriptService, indicesQueriesRegistry, indicesFieldDataCache, circuitBreakerService);
|
||||
return new NodeServicesProvider(threadPool, indicesQueryCache, warmer, bigArrays, client, scriptService, indicesQueriesRegistry, circuitBreakerService);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -143,7 +145,7 @@ public class IndexModuleTests extends ESTestCase {
|
||||
IndexModule module = new IndexModule(indexSettings, null, new AnalysisRegistry(null, environment));
|
||||
module.setSearcherWrapper((s) -> new Wrapper());
|
||||
module.engineFactory.set(new MockEngineFactory(AssertingDirectoryReader.class));
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, new IndicesFieldDataCache(settings, listener));
|
||||
assertTrue(indexService.getSearcherWrapper() instanceof Wrapper);
|
||||
assertSame(indexService.getEngineFactory(), module.engineFactory.get());
|
||||
indexService.close("simon says", false);
|
||||
@ -161,7 +163,7 @@ public class IndexModuleTests extends ESTestCase {
|
||||
final Index index = indexSettings.getIndex();
|
||||
IndexModule module = new IndexModule(indexSettings, null, new AnalysisRegistry(null, environment));
|
||||
module.addIndexStore("foo_store", FooStore::new);
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, new IndicesFieldDataCache(settings, listener));
|
||||
assertTrue(indexService.getIndexStore() instanceof FooStore);
|
||||
try {
|
||||
module.addIndexStore("foo_store", FooStore::new);
|
||||
@ -184,7 +186,8 @@ public class IndexModuleTests extends ESTestCase {
|
||||
IndexModule module = new IndexModule(indexSettings, null, new AnalysisRegistry(null, environment));
|
||||
Consumer<Settings> listener = (s) -> {};
|
||||
module.addIndexEventListener(eventListener);
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry,
|
||||
new IndicesFieldDataCache(settings, this.listener));
|
||||
IndexSettings x = indexService.getIndexSettings();
|
||||
assertEquals(x.getSettings().getAsMap(), indexSettings.getSettings().getAsMap());
|
||||
assertEquals(x.getIndex(), index);
|
||||
@ -208,7 +211,8 @@ public class IndexModuleTests extends ESTestCase {
|
||||
|
||||
}
|
||||
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry,
|
||||
new IndicesFieldDataCache(settings, listener));
|
||||
assertSame(booleanSetting, indexService.getIndexSettings().getScopedSettings().get(booleanSetting.getKey()));
|
||||
|
||||
indexService.close("simon says", false);
|
||||
@ -234,7 +238,8 @@ public class IndexModuleTests extends ESTestCase {
|
||||
}
|
||||
});
|
||||
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry,
|
||||
new IndicesFieldDataCache(settings, listener));
|
||||
SimilarityService similarityService = indexService.similarityService();
|
||||
assertNotNull(similarityService.getSimilarity("my_similarity"));
|
||||
assertTrue(similarityService.getSimilarity("my_similarity").get() instanceof TestSimilarity);
|
||||
@ -251,7 +256,8 @@ public class IndexModuleTests extends ESTestCase {
|
||||
.build();
|
||||
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null, new AnalysisRegistry(null, environment));
|
||||
try {
|
||||
module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
|
||||
module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry,
|
||||
new IndicesFieldDataCache(settings, listener));
|
||||
} catch (IllegalArgumentException ex) {
|
||||
assertEquals("Unknown Similarity type [test_similarity] for [my_similarity]", ex.getMessage());
|
||||
}
|
||||
@ -265,7 +271,8 @@ public class IndexModuleTests extends ESTestCase {
|
||||
.build();
|
||||
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null, new AnalysisRegistry(null, environment));
|
||||
try {
|
||||
module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
|
||||
module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry,
|
||||
new IndicesFieldDataCache(settings, listener));
|
||||
} catch (IllegalArgumentException ex) {
|
||||
assertEquals("Similarity [my_similarity] must have an associated type", ex.getMessage());
|
||||
}
|
||||
@ -312,7 +319,8 @@ public class IndexModuleTests extends ESTestCase {
|
||||
assertEquals(e.getMessage(), "Can't register the same [query_cache] more than once for [custom]");
|
||||
}
|
||||
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry,
|
||||
new IndicesFieldDataCache(settings, listener));
|
||||
assertTrue(indexService.cache().query() instanceof CustomQueryCache);
|
||||
indexService.close("simon says", false);
|
||||
}
|
||||
@ -322,7 +330,8 @@ public class IndexModuleTests extends ESTestCase {
|
||||
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
|
||||
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null, new AnalysisRegistry(null, environment));
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry,
|
||||
new IndicesFieldDataCache(settings, listener));
|
||||
assertTrue(indexService.cache().query() instanceof IndexQueryCache);
|
||||
indexService.close("simon says", false);
|
||||
}
|
||||
|
@ -48,6 +48,7 @@ import org.elasticsearch.index.mapper.geo.GeoPointFieldMapper;
|
||||
import org.elasticsearch.index.mapper.geo.GeoPointFieldMapperLegacy;
|
||||
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
@ -132,7 +133,7 @@ public abstract class AbstractFieldDataTestCase extends ESSingleNodeTestCase {
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, version).build();
|
||||
indexService = createIndex("test", settings);
|
||||
mapperService = indexService.mapperService();
|
||||
indicesFieldDataCache = getInstanceFromNode(IndicesFieldDataCache.class);
|
||||
indicesFieldDataCache = getInstanceFromNode(IndicesService.class).getIndicesFieldDataCache();
|
||||
ifdService = indexService.fieldData();
|
||||
// LogByteSizeMP to preserve doc ID order
|
||||
writer = new IndexWriter(new RAMDirectory(), new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(new LogByteSizeMergePolicy()));
|
||||
|
@ -205,7 +205,7 @@ public class IndexFieldDataServiceTests extends ESSingleNodeTestCase {
|
||||
private void doTestRequireDocValues(MappedFieldType ft) {
|
||||
ThreadPool threadPool = new ThreadPool("random_threadpool_name");
|
||||
try {
|
||||
IndicesFieldDataCache cache = new IndicesFieldDataCache(Settings.EMPTY, null, threadPool);
|
||||
IndicesFieldDataCache cache = new IndicesFieldDataCache(Settings.EMPTY, null);
|
||||
IndexFieldDataService ifds = new IndexFieldDataService(IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), cache, null, null);
|
||||
ft.setName("some_long");
|
||||
ft.setHasDocValues(true);
|
||||
@ -238,7 +238,7 @@ public class IndexFieldDataServiceTests extends ESSingleNodeTestCase {
|
||||
ThreadPool threadPool = new ThreadPool("random_threadpool_name");
|
||||
StringFieldMapper.StringFieldType ft = new StringFieldMapper.StringFieldType();
|
||||
try {
|
||||
IndicesFieldDataCache cache = new IndicesFieldDataCache(Settings.EMPTY, null, threadPool);
|
||||
IndicesFieldDataCache cache = new IndicesFieldDataCache(Settings.EMPTY, null);
|
||||
IndexFieldDataService ifds = new IndexFieldDataService(IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), cache, null, null);
|
||||
ft.setName("some_str");
|
||||
ft.setFieldDataType(new FieldDataType("string", Settings.builder().put(FieldDataType.FORMAT_KEY, "disabled").build()));
|
||||
|
@ -56,7 +56,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsFilter;
|
||||
import org.elasticsearch.common.settings.SettingsModule;
|
||||
import org.elasticsearch.common.unit.Fuzziness;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
@ -71,6 +70,7 @@ import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.analysis.AnalysisRegistry;
|
||||
import org.elasticsearch.index.analysis.AnalysisService;
|
||||
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldDataService;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.query.support.QueryParsers;
|
||||
@ -271,7 +271,9 @@ public abstract class AbstractQueryTestCase<QB extends AbstractQueryBuilder<QB>>
|
||||
SimilarityService similarityService = new SimilarityService(idxSettings, Collections.emptyMap());
|
||||
MapperRegistry mapperRegistry = injector.getInstance(MapperRegistry.class);
|
||||
MapperService mapperService = new MapperService(idxSettings, analysisService, similarityService, mapperRegistry, () -> queryShardContext);
|
||||
indexFieldDataService = new IndexFieldDataService(idxSettings, injector.getInstance(IndicesFieldDataCache.class), injector.getInstance(CircuitBreakerService.class), mapperService);
|
||||
IndicesFieldDataCache indicesFieldDataCache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() {
|
||||
});
|
||||
indexFieldDataService = new IndexFieldDataService(idxSettings, indicesFieldDataCache, injector.getInstance(CircuitBreakerService.class), mapperService);
|
||||
BitsetFilterCache bitsetFilterCache = new BitsetFilterCache(idxSettings, new IndicesWarmer(idxSettings.getNodeSettings(), null), new BitsetFilterCache.Listener() {
|
||||
@Override
|
||||
public void onCache(ShardId shardId, Accountable accountable) {
|
||||
|
@ -1819,7 +1819,7 @@ public final class InternalTestCluster extends TestCluster {
|
||||
// network request, because a network request can increment one
|
||||
// of the breakers
|
||||
for (NodeAndClient nodeAndClient : nodes.values()) {
|
||||
final IndicesFieldDataCache fdCache = getInstanceFromNode(IndicesFieldDataCache.class, nodeAndClient.node);
|
||||
final IndicesFieldDataCache fdCache = getInstanceFromNode(IndicesService.class, nodeAndClient.node).getIndicesFieldDataCache();
|
||||
// Clean up the cache, ensuring that entries' listeners have been called
|
||||
fdCache.getCache().refresh();
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user