Merge pull request #16469 from s1monw/simplify_indices_field_data_cache

Simplify IndicesFieldDataCache and detach from guice
This commit is contained in:
Simon Willnauer 2016-02-05 14:24:19 +01:00
commit 778ec84a7c
16 changed files with 139 additions and 102 deletions

View File

@ -60,6 +60,7 @@ import org.elasticsearch.gateway.PrimaryShardAllocator;
import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.HunspellService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
@ -290,7 +291,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
ScriptService.SCRIPT_CACHE_SIZE_SETTING,
ScriptService.SCRIPT_CACHE_EXPIRE_SETTING,
ScriptService.SCRIPT_AUTO_RELOAD_ENABLED_SETTING,
IndicesFieldDataCache.INDICES_FIELDDATA_CLEAN_INTERVAL_SETTING,
IndicesService.INDICES_FIELDDATA_CLEAN_INTERVAL_SETTING,
IndicesFieldDataCache.INDICES_FIELDDATA_CACHE_SIZE_KEY,
IndicesRequestCache.INDICES_CACHE_QUERY_SIZE,
IndicesRequestCache.INDICES_CACHE_QUERY_EXPIRE,

View File

@ -38,6 +38,7 @@ import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.mapper.MapperRegistry;
import java.io.IOException;
@ -240,7 +241,7 @@ public final class IndexModule {
IndexSearcherWrapper newWrapper(final IndexService indexService);
}
public IndexService newIndexService(NodeEnvironment environment, IndexService.ShardStoreDeleter shardStoreDeleter, NodeServicesProvider servicesProvider, MapperRegistry mapperRegistry,
public IndexService newIndexService(NodeEnvironment environment, IndexService.ShardStoreDeleter shardStoreDeleter, NodeServicesProvider servicesProvider, MapperRegistry mapperRegistry, IndicesFieldDataCache indicesFieldDataCache,
IndexingOperationListener... listeners) throws IOException {
IndexSearcherWrapperFactory searcherWrapperFactory = indexSearcherWrapper.get() == null ? (shard) -> null : indexSearcherWrapper.get();
IndexEventListener eventListener = freeze();
@ -264,7 +265,7 @@ public final class IndexModule {
final BiFunction<IndexSettings, IndicesQueryCache, QueryCache> queryCacheProvider = queryCaches.get(queryCacheType);
final QueryCache queryCache = queryCacheProvider.apply(indexSettings, servicesProvider.getIndicesQueryCache());
return new IndexService(indexSettings, environment, new SimilarityService(indexSettings, similarities), shardStoreDeleter, analysisRegistry, engineFactory.get(),
servicesProvider, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry, listeners);
servicesProvider, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry, indicesFieldDataCache, listeners);
}
}

View File

@ -76,6 +76,7 @@ import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.AliasFilterParsingException;
import org.elasticsearch.indices.InvalidAliasNameException;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.threadpool.ThreadPool;
@ -122,13 +123,14 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
IndexEventListener eventListener,
IndexModule.IndexSearcherWrapperFactory wrapperFactory,
MapperRegistry mapperRegistry,
IndicesFieldDataCache indicesFieldDataCache,
IndexingOperationListener... listenersIn) throws IOException {
super(indexSettings);
this.indexSettings = indexSettings;
this.analysisService = registry.build(indexSettings);
this.similarityService = similarityService;
this.mapperService = new MapperService(indexSettings, analysisService, similarityService, mapperRegistry, IndexService.this::getQueryShardContext);
this.indexFieldData = new IndexFieldDataService(indexSettings, nodeServicesProvider.getIndicesFieldDataCache(), nodeServicesProvider.getCircuitBreakerService(), mapperService);
this.indexFieldData = new IndexFieldDataService(indexSettings, indicesFieldDataCache, nodeServicesProvider.getCircuitBreakerService(), mapperService);
this.shardStoreDeleter = shardStoreDeleter;
this.eventListener = eventListener;
this.nodeEnv = nodeEnv;

View File

@ -26,7 +26,6 @@ import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
@ -45,11 +44,10 @@ public final class NodeServicesProvider {
private final Client client;
private final IndicesQueriesRegistry indicesQueriesRegistry;
private final ScriptService scriptService;
private final IndicesFieldDataCache indicesFieldDataCache;
private final CircuitBreakerService circuitBreakerService;
@Inject
public NodeServicesProvider(ThreadPool threadPool, IndicesQueryCache indicesQueryCache, @Nullable IndicesWarmer warmer, BigArrays bigArrays, Client client, ScriptService scriptService, IndicesQueriesRegistry indicesQueriesRegistry, IndicesFieldDataCache indicesFieldDataCache, CircuitBreakerService circuitBreakerService) {
public NodeServicesProvider(ThreadPool threadPool, IndicesQueryCache indicesQueryCache, @Nullable IndicesWarmer warmer, BigArrays bigArrays, Client client, ScriptService scriptService, IndicesQueriesRegistry indicesQueriesRegistry, CircuitBreakerService circuitBreakerService) {
this.threadPool = threadPool;
this.indicesQueryCache = indicesQueryCache;
this.warmer = warmer;
@ -57,7 +55,6 @@ public final class NodeServicesProvider {
this.client = client;
this.indicesQueriesRegistry = indicesQueriesRegistry;
this.scriptService = scriptService;
this.indicesFieldDataCache = indicesFieldDataCache;
this.circuitBreakerService = circuitBreakerService;
}
@ -87,10 +84,6 @@ public final class NodeServicesProvider {
return scriptService;
}
public IndicesFieldDataCache getIndicesFieldDataCache() {
return indicesFieldDataCache;
}
public CircuitBreakerService getCircuitBreakerService() {
return circuitBreakerService;
}

View File

@ -48,12 +48,12 @@ public interface IndexFieldDataCache {
/**
* Called after the fielddata is loaded during the cache phase
*/
void onCache(ShardId shardId, String fieldName, FieldDataType fieldDataType, Accountable ramUsage);
default void onCache(ShardId shardId, String fieldName, FieldDataType fieldDataType, Accountable ramUsage){}
/**
* Called after the fielddata is unloaded
*/
void onRemoval(ShardId shardId, String fieldName, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes);
default void onRemoval(ShardId shardId, String fieldName, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes){}
}
class None implements IndexFieldDataCache {

View File

@ -55,12 +55,9 @@ import org.elasticsearch.index.mapper.internal.VersionFieldMapper;
import org.elasticsearch.index.mapper.ip.IpFieldMapper;
import org.elasticsearch.index.mapper.object.ObjectMapper;
import org.elasticsearch.index.percolator.PercolatorFieldMapper;
import org.elasticsearch.index.termvectors.TermVectorsService;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;
import org.elasticsearch.indices.flush.SyncedFlushService;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.recovery.RecoverySettings;
@ -165,13 +162,11 @@ public class IndicesModule extends AbstractModule {
bind(SyncedFlushService.class).asEagerSingleton();
bind(IndicesQueryCache.class).asEagerSingleton();
bind(IndicesRequestCache.class).asEagerSingleton();
bind(IndicesFieldDataCache.class).asEagerSingleton();
bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton();
bind(IndicesTTLService.class).asEagerSingleton();
bind(IndicesWarmer.class).asEagerSingleton();
bind(UpdateHelper.class).asEagerSingleton();
bind(MetaDataIndexUpgradeService.class).asEagerSingleton();
bind(IndicesFieldDataCacheListener.class).asEagerSingleton();
bind(NodeServicesProvider.class).asEagerSingleton();
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.indices;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
@ -33,11 +34,15 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
@ -51,6 +56,8 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.NodeServicesProvider;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.merge.MergeStats;
@ -63,6 +70,8 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.plugins.PluginsService;
@ -81,6 +90,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import static java.util.Collections.emptyMap;
@ -94,6 +104,7 @@ import static org.elasticsearch.common.util.CollectionUtils.arrayAsArrayList;
public class IndicesService extends AbstractLifecycleComponent<IndicesService> implements Iterable<IndexService>, IndexService.ShardStoreDeleter {
public static final String INDICES_SHARDS_CLOSED_TIMEOUT = "indices.shards_closed_timeout";
public static final Setting<TimeValue> INDICES_FIELDDATA_CLEAN_INTERVAL_SETTING = Setting.positiveTimeSetting("indices.fielddata.cache.cleanup_interval", TimeValue.timeValueMinutes(1), false, Setting.Scope.CLUSTER);
private final PluginsService pluginsService;
private final NodeEnvironment nodeEnv;
private final TimeValue shardsClosedTimeout;
@ -102,23 +113,31 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
private final ClusterService clusterService;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final IndexScopedSettings indexScopeSetting;
private final IndicesFieldDataCache indicesFieldDataCache;
private final FieldDataCacheCleaner fieldDataCacheCleaner;
private final ThreadPool threadPool;
private final CircuitBreakerService circuitBreakerService;
private volatile Map<String, IndexService> indices = emptyMap();
private final Map<Index, List<PendingDelete>> pendingDeletes = new HashMap<>();
private final OldShardsStats oldShardsStats = new OldShardsStats();
private final IndexStoreConfig indexStoreConfig;
private final MapperRegistry mapperRegistry;
private final IndexingMemoryController indexingMemoryController;
private final TimeValue cleanInterval;
@Override
protected void doStart() {
// Start thread that will manage cleaning the field data cache periodically
threadPool.schedule(this.cleanInterval, ThreadPool.Names.SAME, this.fieldDataCacheCleaner);
}
@Inject
public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvironment nodeEnv,
ClusterSettings clusterSettings, AnalysisRegistry analysisRegistry,
IndicesQueriesRegistry indicesQueriesRegistry, IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, MapperRegistry mapperRegistry, ThreadPool threadPool, IndexScopedSettings indexScopedSettings) {
ClusterService clusterService, MapperRegistry mapperRegistry, ThreadPool threadPool, IndexScopedSettings indexScopedSettings, CircuitBreakerService circuitBreakerService) {
super(settings);
this.threadPool = threadPool;
this.pluginsService = pluginsService;
this.nodeEnv = nodeEnv;
this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS));
@ -132,6 +151,18 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
clusterSettings.addSettingsUpdateConsumer(IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING, indexStoreConfig::setRateLimitingThrottle);
indexingMemoryController = new IndexingMemoryController(settings, threadPool, this);
this.indexScopeSetting = indexScopedSettings;
this.circuitBreakerService = circuitBreakerService;
this.indicesFieldDataCache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() {
@Override
public void onRemoval(ShardId shardId, String fieldName, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) {
assert sizeInBytes >= 0 : "When reducing circuit breaker, it should be adjusted with a number higher or equal to 0 and not [" + sizeInBytes + "]";
circuitBreakerService.getBreaker(CircuitBreaker.FIELDDATA).addWithoutBreaking(-sizeInBytes);
}
});
this.cleanInterval = INDICES_FIELDDATA_CLEAN_INTERVAL_SETTING.get(settings);
this.fieldDataCacheCleaner = new FieldDataCacheCleaner(indicesFieldDataCache, logger, threadPool, this.cleanInterval);
}
@Override
@ -165,7 +196,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
@Override
protected void doClose() {
IOUtils.closeWhileHandlingException(analysisRegistry, indexingMemoryController);
IOUtils.closeWhileHandlingException(analysisRegistry, indexingMemoryController, indicesFieldDataCache, fieldDataCacheCleaner);
}
/**
@ -320,7 +351,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
indexModule.addIndexEventListener(oldShardsStats);
final IndexEventListener listener = indexModule.freeze();
listener.beforeIndexCreated(index, idxSettings.getSettings());
final IndexService indexService = indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, mapperRegistry, indexingMemoryController);
final IndexService indexService = indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, mapperRegistry, indicesFieldDataCache, indexingMemoryController);
boolean success = false;
try {
assert indexService.getIndexEventListener() == listener;
@ -381,6 +412,14 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
}
}
public IndicesFieldDataCache getIndicesFieldDataCache() {
return indicesFieldDataCache;
}
public CircuitBreakerService getCircuitBreakerService() {
return circuitBreakerService;
}
static class OldShardsStats implements IndexEventListener {
final SearchStats searchStats = new SearchStats();
@ -760,4 +799,51 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
public AnalysisRegistry getAnalysis() {
return analysisRegistry;
}
/**
* FieldDataCacheCleaner is a scheduled Runnable used to clean a Guava cache
* periodically. In this case it is the field data cache, because a cache that
* has an entry invalidated may not clean up the entry if it is not read from
* or written to after invalidation.
*/
private final static class FieldDataCacheCleaner implements Runnable, Releasable {
private final IndicesFieldDataCache cache;
private final ESLogger logger;
private final ThreadPool threadPool;
private final TimeValue interval;
private final AtomicBoolean closed = new AtomicBoolean(false);
public FieldDataCacheCleaner(IndicesFieldDataCache cache, ESLogger logger, ThreadPool threadPool, TimeValue interval) {
this.cache = cache;
this.logger = logger;
this.threadPool = threadPool;
this.interval = interval;
}
@Override
public void run() {
long startTimeNS = System.nanoTime();
if (logger.isTraceEnabled()) {
logger.trace("running periodic field data cache cleanup");
}
try {
this.cache.getCache().refresh();
} catch (Exception e) {
logger.warn("Exception during periodic field data cache cleanup:", e);
}
if (logger.isTraceEnabled()) {
logger.trace("periodic field data cache cleanup finished in {} milliseconds", TimeValue.nsecToMSec(System.nanoTime() - startTimeNS));
}
// Reschedule itself to run again if not closed
if (closed.get() == false) {
threadPool.schedule(interval, ThreadPool.Names.SAME, this);
}
}
@Override
public void close() {
closed.compareAndSet(false, true);
}
}
}

View File

@ -30,14 +30,12 @@ import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.cache.RemovalListener;
import org.elasticsearch.common.cache.RemovalNotification;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.fielddata.AtomicFieldData;
import org.elasticsearch.index.fielddata.FieldDataType;
@ -45,7 +43,6 @@ import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.List;
@ -55,20 +52,12 @@ import java.util.function.ToLongBiFunction;
*/
public class IndicesFieldDataCache extends AbstractComponent implements RemovalListener<IndicesFieldDataCache.Key, Accountable>, Releasable{
public static final Setting<TimeValue> INDICES_FIELDDATA_CLEAN_INTERVAL_SETTING = Setting.positiveTimeSetting("indices.fielddata.cache.cleanup_interval", TimeValue.timeValueMinutes(1), false, Setting.Scope.CLUSTER);
public static final Setting<ByteSizeValue> INDICES_FIELDDATA_CACHE_SIZE_KEY = Setting.byteSizeSetting("indices.fielddata.cache.size", new ByteSizeValue(-1), false, Setting.Scope.CLUSTER);
private final IndicesFieldDataCacheListener indicesFieldDataCacheListener;
private final IndexFieldDataCache.Listener indicesFieldDataCacheListener;
private final Cache<Key, Accountable> cache;
private final TimeValue cleanInterval;
private final ThreadPool threadPool;
private volatile boolean closed = false;
@Inject
public IndicesFieldDataCache(Settings settings, IndicesFieldDataCacheListener indicesFieldDataCacheListener, ThreadPool threadPool) {
public IndicesFieldDataCache(Settings settings, IndexFieldDataCache.Listener indicesFieldDataCacheListener) {
super(settings);
this.threadPool = threadPool;
this.indicesFieldDataCacheListener = indicesFieldDataCacheListener;
final long sizeInBytes = INDICES_FIELDDATA_CACHE_SIZE_KEY.get(settings).bytes();
CacheBuilder<Key, Accountable> cacheBuilder = CacheBuilder.<Key, Accountable>builder()
@ -76,19 +65,12 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
if (sizeInBytes > 0) {
cacheBuilder.setMaximumWeight(sizeInBytes).weigher(new FieldDataWeigher());
}
cache = cacheBuilder.build();
this.cleanInterval = INDICES_FIELDDATA_CLEAN_INTERVAL_SETTING.get(settings);
// Start thread that will manage cleaning the field data cache periodically
threadPool.schedule(this.cleanInterval, ThreadPool.Names.SAME,
new FieldDataCacheCleaner(this.cache, this.logger, this.threadPool, this.cleanInterval));
}
@Override
public void close() {
cache.invalidateAll();
this.closed = true;
}
public IndexFieldDataCache buildIndexFieldDataCache(IndexFieldDataCache.Listener listener, Index index, String fieldName, FieldDataType fieldDataType) {
@ -260,44 +242,5 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
}
}
/**
* FieldDataCacheCleaner is a scheduled Runnable used to clean a Guava cache
* periodically. In this case it is the field data cache, because a cache that
* has an entry invalidated may not clean up the entry if it is not read from
* or written to after invalidation.
*/
public class FieldDataCacheCleaner implements Runnable {
private final Cache<Key, Accountable> cache;
private final ESLogger logger;
private final ThreadPool threadPool;
private final TimeValue interval;
public FieldDataCacheCleaner(Cache cache, ESLogger logger, ThreadPool threadPool, TimeValue interval) {
this.cache = cache;
this.logger = logger;
this.threadPool = threadPool;
this.interval = interval;
}
@Override
public void run() {
long startTimeNS = System.nanoTime();
if (logger.isTraceEnabled()) {
logger.trace("running periodic field data cache cleanup");
}
try {
this.cache.refresh();
} catch (Exception e) {
logger.warn("Exception during periodic field data cache cleanup:", e);
}
if (logger.isTraceEnabled()) {
logger.trace("periodic field data cache cleanup finished in {} milliseconds", TimeValue.nsecToMSec(System.nanoTime() - startTimeNS));
}
// Reschedule itself to run again if not closed
if (closed == false) {
threadPool.schedule(interval, ThreadPool.Names.SAME, this);
}
}
}
}

View File

@ -392,7 +392,6 @@ public class Node implements Closeable {
toClose.add(injector.getInstance(IndicesService.class));
// close filter/fielddata caches after indices
toClose.add(injector.getInstance(IndicesQueryCache.class));
toClose.add(injector.getInstance(IndicesFieldDataCache.class));
toClose.add(injector.getInstance(IndicesStore.class));
toClose.add(() ->stopWatch.stop().start("routing"));
toClose.add(injector.getInstance(RoutingService.class));

View File

@ -44,6 +44,7 @@ import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.index.cache.query.index.IndexQueryCache;
import org.elasticsearch.index.cache.query.none.NoneQueryCache;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.ShardId;
@ -96,6 +97,8 @@ public class IndexModuleTests extends ESTestCase {
public void addPendingDelete(ShardId shardId, IndexSettings indexSettings) {
}
};
private final IndexFieldDataCache.Listener listener = new IndexFieldDataCache.Listener() {};
private MapperRegistry mapperRegistry;
static NodeServicesProvider newNodeServiceProvider(Settings settings, Environment environment, Client client, ScriptEngineService... scriptEngineServices) throws IOException {
@ -106,7 +109,6 @@ public class IndexModuleTests extends ESTestCase {
CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService();
PageCacheRecycler recycler = new PageCacheRecycler(settings, threadPool);
BigArrays bigArrays = new BigArrays(recycler, circuitBreakerService);
IndicesFieldDataCache indicesFieldDataCache = new IndicesFieldDataCache(settings, new IndicesFieldDataCacheListener(circuitBreakerService), threadPool);
Set<ScriptEngineService> scriptEngines = Collections.emptySet();
scriptEngines.addAll(Arrays.asList(scriptEngineServices));
ScriptEngineRegistry scriptEngineRegistry = new ScriptEngineRegistry(Collections.emptyList());
@ -114,7 +116,7 @@ public class IndexModuleTests extends ESTestCase {
ScriptSettings scriptSettings = new ScriptSettings(scriptEngineRegistry, scriptContextRegistry);
ScriptService scriptService = new ScriptService(settings, environment, scriptEngines, new ResourceWatcherService(settings, threadPool), scriptEngineRegistry, scriptContextRegistry, scriptSettings);
IndicesQueriesRegistry indicesQueriesRegistry = new IndicesQueriesRegistry(settings, emptyMap());
return new NodeServicesProvider(threadPool, indicesQueryCache, warmer, bigArrays, client, scriptService, indicesQueriesRegistry, indicesFieldDataCache, circuitBreakerService);
return new NodeServicesProvider(threadPool, indicesQueryCache, warmer, bigArrays, client, scriptService, indicesQueriesRegistry, circuitBreakerService);
}
@Override
@ -143,7 +145,7 @@ public class IndexModuleTests extends ESTestCase {
IndexModule module = new IndexModule(indexSettings, null, new AnalysisRegistry(null, environment));
module.setSearcherWrapper((s) -> new Wrapper());
module.engineFactory.set(new MockEngineFactory(AssertingDirectoryReader.class));
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, new IndicesFieldDataCache(settings, listener));
assertTrue(indexService.getSearcherWrapper() instanceof Wrapper);
assertSame(indexService.getEngineFactory(), module.engineFactory.get());
indexService.close("simon says", false);
@ -161,7 +163,7 @@ public class IndexModuleTests extends ESTestCase {
final Index index = indexSettings.getIndex();
IndexModule module = new IndexModule(indexSettings, null, new AnalysisRegistry(null, environment));
module.addIndexStore("foo_store", FooStore::new);
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, new IndicesFieldDataCache(settings, listener));
assertTrue(indexService.getIndexStore() instanceof FooStore);
try {
module.addIndexStore("foo_store", FooStore::new);
@ -184,7 +186,8 @@ public class IndexModuleTests extends ESTestCase {
IndexModule module = new IndexModule(indexSettings, null, new AnalysisRegistry(null, environment));
Consumer<Settings> listener = (s) -> {};
module.addIndexEventListener(eventListener);
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry,
new IndicesFieldDataCache(settings, this.listener));
IndexSettings x = indexService.getIndexSettings();
assertEquals(x.getSettings().getAsMap(), indexSettings.getSettings().getAsMap());
assertEquals(x.getIndex(), index);
@ -208,7 +211,8 @@ public class IndexModuleTests extends ESTestCase {
}
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry,
new IndicesFieldDataCache(settings, listener));
assertSame(booleanSetting, indexService.getIndexSettings().getScopedSettings().get(booleanSetting.getKey()));
indexService.close("simon says", false);
@ -234,7 +238,8 @@ public class IndexModuleTests extends ESTestCase {
}
});
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry,
new IndicesFieldDataCache(settings, listener));
SimilarityService similarityService = indexService.similarityService();
assertNotNull(similarityService.getSimilarity("my_similarity"));
assertTrue(similarityService.getSimilarity("my_similarity").get() instanceof TestSimilarity);
@ -251,7 +256,8 @@ public class IndexModuleTests extends ESTestCase {
.build();
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null, new AnalysisRegistry(null, environment));
try {
module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry,
new IndicesFieldDataCache(settings, listener));
} catch (IllegalArgumentException ex) {
assertEquals("Unknown Similarity type [test_similarity] for [my_similarity]", ex.getMessage());
}
@ -265,7 +271,8 @@ public class IndexModuleTests extends ESTestCase {
.build();
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null, new AnalysisRegistry(null, environment));
try {
module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry,
new IndicesFieldDataCache(settings, listener));
} catch (IllegalArgumentException ex) {
assertEquals("Similarity [my_similarity] must have an associated type", ex.getMessage());
}
@ -312,7 +319,8 @@ public class IndexModuleTests extends ESTestCase {
assertEquals(e.getMessage(), "Can't register the same [query_cache] more than once for [custom]");
}
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry,
new IndicesFieldDataCache(settings, listener));
assertTrue(indexService.cache().query() instanceof CustomQueryCache);
indexService.close("simon says", false);
}
@ -322,7 +330,8 @@ public class IndexModuleTests extends ESTestCase {
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null, new AnalysisRegistry(null, environment));
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry,
new IndicesFieldDataCache(settings, listener));
assertTrue(indexService.cache().query() instanceof IndexQueryCache);
indexService.close("simon says", false);
}

View File

@ -48,6 +48,7 @@ import org.elasticsearch.index.mapper.geo.GeoPointFieldMapper;
import org.elasticsearch.index.mapper.geo.GeoPointFieldMapperLegacy;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
@ -132,7 +133,7 @@ public abstract class AbstractFieldDataTestCase extends ESSingleNodeTestCase {
.put(IndexMetaData.SETTING_VERSION_CREATED, version).build();
indexService = createIndex("test", settings);
mapperService = indexService.mapperService();
indicesFieldDataCache = getInstanceFromNode(IndicesFieldDataCache.class);
indicesFieldDataCache = getInstanceFromNode(IndicesService.class).getIndicesFieldDataCache();
ifdService = indexService.fieldData();
// LogByteSizeMP to preserve doc ID order
writer = new IndexWriter(new RAMDirectory(), new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(new LogByteSizeMergePolicy()));

View File

@ -51,6 +51,7 @@ import org.elasticsearch.index.mapper.core.LongFieldMapper;
import org.elasticsearch.index.mapper.core.ShortFieldMapper;
import org.elasticsearch.index.mapper.core.StringFieldMapper;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.test.ESSingleNodeTestCase;
@ -131,10 +132,10 @@ public class IndexFieldDataServiceTests extends ESSingleNodeTestCase {
public void testFieldDataCacheListener() throws Exception {
final IndexService indexService = createIndex("test");
IndexFieldDataService shardPrivateService = indexService.fieldData();
final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
// copy the ifdService since we can set the listener only once.
final IndexFieldDataService ifdService = new IndexFieldDataService(indexService.getIndexSettings(),
getInstanceFromNode(IndicesFieldDataCache.class), getInstanceFromNode(CircuitBreakerService.class), indexService.mapperService());
indicesService.getIndicesFieldDataCache(), indicesService.getCircuitBreakerService(), indexService.mapperService());
final BuilderContext ctx = new BuilderContext(indexService.getIndexSettings().getSettings(), new ContentPath(1));
final MappedFieldType mapper1 = MapperBuilders.stringField("s").tokenized(false).docValues(true).fieldDataSettings(Settings.builder().put(FieldDataType.FORMAT_KEY, "paged_bytes").build()).build(ctx).fieldType();
@ -205,7 +206,7 @@ public class IndexFieldDataServiceTests extends ESSingleNodeTestCase {
private void doTestRequireDocValues(MappedFieldType ft) {
ThreadPool threadPool = new ThreadPool("random_threadpool_name");
try {
IndicesFieldDataCache cache = new IndicesFieldDataCache(Settings.EMPTY, null, threadPool);
IndicesFieldDataCache cache = new IndicesFieldDataCache(Settings.EMPTY, null);
IndexFieldDataService ifds = new IndexFieldDataService(IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), cache, null, null);
ft.setName("some_long");
ft.setHasDocValues(true);
@ -238,7 +239,7 @@ public class IndexFieldDataServiceTests extends ESSingleNodeTestCase {
ThreadPool threadPool = new ThreadPool("random_threadpool_name");
StringFieldMapper.StringFieldType ft = new StringFieldMapper.StringFieldType();
try {
IndicesFieldDataCache cache = new IndicesFieldDataCache(Settings.EMPTY, null, threadPool);
IndicesFieldDataCache cache = new IndicesFieldDataCache(Settings.EMPTY, null);
IndexFieldDataService ifds = new IndexFieldDataService(IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), cache, null, null);
ft.setName("some_str");
ft.setFieldDataType(new FieldDataType("string", Settings.builder().put(FieldDataType.FORMAT_KEY, "disabled").build()));

View File

@ -71,6 +71,7 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.support.QueryParsers;
@ -271,7 +272,9 @@ public abstract class AbstractQueryTestCase<QB extends AbstractQueryBuilder<QB>>
SimilarityService similarityService = new SimilarityService(idxSettings, Collections.emptyMap());
MapperRegistry mapperRegistry = injector.getInstance(MapperRegistry.class);
MapperService mapperService = new MapperService(idxSettings, analysisService, similarityService, mapperRegistry, () -> queryShardContext);
indexFieldDataService = new IndexFieldDataService(idxSettings, injector.getInstance(IndicesFieldDataCache.class), injector.getInstance(CircuitBreakerService.class), mapperService);
IndicesFieldDataCache indicesFieldDataCache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() {
});
indexFieldDataService = new IndexFieldDataService(idxSettings, indicesFieldDataCache, injector.getInstance(CircuitBreakerService.class), mapperService);
BitsetFilterCache bitsetFilterCache = new BitsetFilterCache(idxSettings, new IndicesWarmer(idxSettings.getNodeSettings(), null), new BitsetFilterCache.Listener() {
@Override
public void onCache(ShardId shardId, Accountable accountable) {

View File

@ -37,6 +37,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.MockEngineFactoryPlugin;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.sort.SortOrder;
@ -176,7 +177,7 @@ public class RandomExceptionCircuitBreakerIT extends ESIntegTestCase {
// Since .cleanUp() is no longer called on cache clear, we need to call it on each node manually
for (String node : internalCluster().getNodeNames()) {
final IndicesFieldDataCache fdCache = internalCluster().getInstance(IndicesFieldDataCache.class, node);
final IndicesFieldDataCache fdCache = internalCluster().getInstance(IndicesService.class, node).getIndicesFieldDataCache();
// Clean up the cache, ensuring that entries' listeners have been called
fdCache.getCache().refresh();
}

View File

@ -43,6 +43,7 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.QueryShardContext;
@ -138,7 +139,8 @@ public class TemplateQueryParserTests extends ESTestCase {
SimilarityService similarityService = new SimilarityService(idxSettings, Collections.emptyMap());
MapperRegistry mapperRegistry = new IndicesModule().getMapperRegistry();
MapperService mapperService = new MapperService(idxSettings, analysisService, similarityService, mapperRegistry, () -> context);
IndexFieldDataService indexFieldDataService =new IndexFieldDataService(idxSettings, injector.getInstance(IndicesFieldDataCache.class), injector.getInstance(CircuitBreakerService.class), mapperService);
IndicesFieldDataCache cache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() {});
IndexFieldDataService indexFieldDataService =new IndexFieldDataService(idxSettings, cache, injector.getInstance(CircuitBreakerService.class), mapperService);
BitsetFilterCache bitsetFilterCache = new BitsetFilterCache(idxSettings, new IndicesWarmer(idxSettings.getNodeSettings(), null), new BitsetFilterCache.Listener() {
@Override
public void onCache(ShardId shardId, Accountable accountable) {

View File

@ -1819,7 +1819,7 @@ public final class InternalTestCluster extends TestCluster {
// network request, because a network request can increment one
// of the breakers
for (NodeAndClient nodeAndClient : nodes.values()) {
final IndicesFieldDataCache fdCache = getInstanceFromNode(IndicesFieldDataCache.class, nodeAndClient.node);
final IndicesFieldDataCache fdCache = getInstanceFromNode(IndicesService.class, nodeAndClient.node).getIndicesFieldDataCache();
// Clean up the cache, ensuring that entries' listeners have been called
fdCache.getCache().refresh();