Move IndicesQueryCache and IndicesRequestCache into IndicesService
this is a minor cleanup that detaches `IndicesRequestCache` and `IndicesQueryCache` from guice and moves it into `IndicesService`. It also decouples the `IndexShard` and `IndexService` from these caches which are unnecessary dependencies.
This commit is contained in:
parent
ebcbe5d4c5
commit
7835525f45
|
@ -105,7 +105,7 @@ public class TransportClusterStatsAction extends TransportNodesAction<ClusterSta
|
|||
for (IndexShard indexShard : indexService) {
|
||||
if (indexShard.routingEntry() != null && indexShard.routingEntry().active()) {
|
||||
// only report on fully started shards
|
||||
shardsStats.add(new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indexShard, SHARD_STATS_FLAGS), indexShard.commitStats()));
|
||||
shardsStats.add(new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indicesService.getIndicesQueryCache(), indexShard, SHARD_STATS_FLAGS), indexShard.commitStats()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,17 +49,14 @@ import java.util.List;
|
|||
public class TransportClearIndicesCacheAction extends TransportBroadcastByNodeAction<ClearIndicesCacheRequest, ClearIndicesCacheResponse, TransportBroadcastByNodeAction.EmptyResult> {
|
||||
|
||||
private final IndicesService indicesService;
|
||||
private final IndicesRequestCache indicesRequestCache;
|
||||
|
||||
@Inject
|
||||
public TransportClearIndicesCacheAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
|
||||
TransportService transportService, IndicesService indicesService,
|
||||
IndicesRequestCache indicesQueryCache, ActionFilters actionFilters,
|
||||
TransportService transportService, IndicesService indicesService, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
super(settings, ClearIndicesCacheAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
|
||||
ClearIndicesCacheRequest::new, ThreadPool.Names.MANAGEMENT);
|
||||
this.indicesService = indicesService;
|
||||
this.indicesRequestCache = indicesQueryCache;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -101,7 +98,7 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastByNodeAc
|
|||
}
|
||||
if (request.requestCache()) {
|
||||
clearedAtLeastOne = true;
|
||||
indicesRequestCache.clear(shard);
|
||||
indicesService.getIndicesRequestCache().clear(shard);
|
||||
}
|
||||
if (request.recycler()) {
|
||||
logger.debug("Clear CacheRecycler on index [{}]", service.index());
|
||||
|
@ -117,7 +114,7 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastByNodeAc
|
|||
} else {
|
||||
service.cache().clear("api");
|
||||
service.fieldData().clear();
|
||||
indicesRequestCache.clear(shard);
|
||||
indicesService.getIndicesRequestCache().clear(shard);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.common.io.stream.Streamable;
|
|||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.cache.query.QueryCacheStats;
|
||||
import org.elasticsearch.index.cache.request.RequestCacheStats;
|
||||
import org.elasticsearch.index.engine.SegmentsStats;
|
||||
|
@ -40,10 +41,13 @@ import org.elasticsearch.index.refresh.RefreshStats;
|
|||
import org.elasticsearch.index.search.stats.SearchStats;
|
||||
import org.elasticsearch.index.shard.DocsStats;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.store.StoreStats;
|
||||
import org.elasticsearch.index.suggest.stats.SuggestStats;
|
||||
import org.elasticsearch.index.translog.TranslogStats;
|
||||
import org.elasticsearch.index.warmer.WarmerStats;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
|
||||
import org.elasticsearch.search.suggest.completion.CompletionStats;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -122,7 +126,8 @@ public class CommonStats implements Streamable, ToXContent {
|
|||
}
|
||||
|
||||
|
||||
public CommonStats(IndexShard indexShard, CommonStatsFlags flags) {
|
||||
public CommonStats(IndicesQueryCache indicesQueryCache, IndexShard indexShard, CommonStatsFlags flags) {
|
||||
|
||||
CommonStatsFlags.Flag[] setFlags = flags.getFlags();
|
||||
|
||||
for (CommonStatsFlags.Flag flag : setFlags) {
|
||||
|
@ -155,7 +160,7 @@ public class CommonStats implements Streamable, ToXContent {
|
|||
warmer = indexShard.warmerStats();
|
||||
break;
|
||||
case QueryCache:
|
||||
queryCache = indexShard.queryCacheStats();
|
||||
queryCache = indicesQueryCache.getStats(indexShard.shardId());
|
||||
break;
|
||||
case FieldData:
|
||||
fieldData = indexShard.fieldDataStats(flags.fieldDataFields());
|
||||
|
|
|
@ -162,6 +162,6 @@ public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction<
|
|||
flags.set(CommonStatsFlags.Flag.Recovery);
|
||||
}
|
||||
|
||||
return new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indexShard, flags), indexShard.commitStats());
|
||||
return new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indicesService.getIndicesQueryCache(), indexShard, flags), indexShard.commitStats());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -258,4 +258,16 @@ final class CompositeIndexEventListener implements IndexEventListener {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onStoreClosed(ShardId shardId) {
|
||||
for (IndexEventListener listener : listeners) {
|
||||
try {
|
||||
listener.onStoreClosed(shardId);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("failed to invoke on store closed", t);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -241,7 +241,7 @@ public final class IndexModule {
|
|||
IndexSearcherWrapper newWrapper(final IndexService indexService);
|
||||
}
|
||||
|
||||
public IndexService newIndexService(NodeEnvironment environment, IndexService.ShardStoreDeleter shardStoreDeleter, NodeServicesProvider servicesProvider, MapperRegistry mapperRegistry, IndicesFieldDataCache indicesFieldDataCache,
|
||||
public IndexService newIndexService(NodeEnvironment environment, IndexService.ShardStoreDeleter shardStoreDeleter, NodeServicesProvider servicesProvider, IndicesQueryCache indicesQueryCache, MapperRegistry mapperRegistry, IndicesFieldDataCache indicesFieldDataCache,
|
||||
IndexingOperationListener... listeners) throws IOException {
|
||||
IndexSearcherWrapperFactory searcherWrapperFactory = indexSearcherWrapper.get() == null ? (shard) -> null : indexSearcherWrapper.get();
|
||||
IndexEventListener eventListener = freeze();
|
||||
|
@ -263,7 +263,7 @@ public final class IndexModule {
|
|||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING, store::setMaxRate);
|
||||
final String queryCacheType = indexSettings.getValue(INDEX_QUERY_CACHE_TYPE_SETTING);
|
||||
final BiFunction<IndexSettings, IndicesQueryCache, QueryCache> queryCacheProvider = queryCaches.get(queryCacheType);
|
||||
final QueryCache queryCache = queryCacheProvider.apply(indexSettings, servicesProvider.getIndicesQueryCache());
|
||||
final QueryCache queryCache = queryCacheProvider.apply(indexSettings, indicesQueryCache);
|
||||
return new IndexService(indexSettings, environment, new SimilarityService(indexSettings, similarities), shardStoreDeleter, analysisRegistry, engineFactory.get(),
|
||||
servicesProvider, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry, indicesFieldDataCache, listeners);
|
||||
}
|
||||
|
|
|
@ -321,8 +321,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
warmer.warm(searcher, shard, IndexService.this.indexSettings, toLevel);
|
||||
}
|
||||
};
|
||||
|
||||
store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock, new StoreCloseListener(shardId, canDeleteShardContent, () -> nodeServicesProvider.getIndicesQueryCache().onClose(shardId)));
|
||||
store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock, new StoreCloseListener(shardId, canDeleteShardContent, () -> eventListener.onStoreClosed(shardId)));
|
||||
if (useShadowEngine(primary, indexSettings)) {
|
||||
indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, searchSlowLog, engineWarmer); // no indexing listeners - shadow engines don't index
|
||||
} else {
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
|
||||
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -36,7 +35,6 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
public final class NodeServicesProvider {
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
private final IndicesQueryCache indicesQueryCache;
|
||||
private final BigArrays bigArrays;
|
||||
private final Client client;
|
||||
private final IndicesQueriesRegistry indicesQueriesRegistry;
|
||||
|
@ -44,9 +42,8 @@ public final class NodeServicesProvider {
|
|||
private final CircuitBreakerService circuitBreakerService;
|
||||
|
||||
@Inject
|
||||
public NodeServicesProvider(ThreadPool threadPool, IndicesQueryCache indicesQueryCache, BigArrays bigArrays, Client client, ScriptService scriptService, IndicesQueriesRegistry indicesQueriesRegistry, CircuitBreakerService circuitBreakerService) {
|
||||
public NodeServicesProvider(ThreadPool threadPool, BigArrays bigArrays, Client client, ScriptService scriptService, IndicesQueriesRegistry indicesQueriesRegistry, CircuitBreakerService circuitBreakerService) {
|
||||
this.threadPool = threadPool;
|
||||
this.indicesQueryCache = indicesQueryCache;
|
||||
this.bigArrays = bigArrays;
|
||||
this.client = client;
|
||||
this.indicesQueriesRegistry = indicesQueriesRegistry;
|
||||
|
@ -58,10 +55,6 @@ public final class NodeServicesProvider {
|
|||
return threadPool;
|
||||
}
|
||||
|
||||
public IndicesQueryCache getIndicesQueryCache() {
|
||||
return indicesQueryCache;
|
||||
}
|
||||
|
||||
public BigArrays getBigArrays() { return bigArrays; }
|
||||
|
||||
public Client getClient() {
|
||||
|
|
|
@ -177,4 +177,12 @@ public interface IndexEventListener {
|
|||
*/
|
||||
default void beforeIndexAddedToCluster(Index index, Settings indexSettings) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the given shards store is closed. The store is closed once all resource have been released on the store.
|
||||
* This implies that all index readers are closed and no recoveries are running.
|
||||
*
|
||||
* @param shardId the shard ID the store belongs to
|
||||
*/
|
||||
default void onStoreClosed(ShardId shardId) {}
|
||||
}
|
||||
|
|
|
@ -58,7 +58,6 @@ import org.elasticsearch.index.SearchSlowLog;
|
|||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.cache.IndexCache;
|
||||
import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache;
|
||||
import org.elasticsearch.index.cache.query.QueryCacheStats;
|
||||
import org.elasticsearch.index.cache.request.ShardRequestCache;
|
||||
import org.elasticsearch.index.codec.CodecService;
|
||||
import org.elasticsearch.index.engine.CommitStats;
|
||||
|
@ -105,7 +104,6 @@ import org.elasticsearch.index.translog.TranslogStats;
|
|||
import org.elasticsearch.index.warmer.ShardIndexWarmerService;
|
||||
import org.elasticsearch.index.warmer.WarmerStats;
|
||||
import org.elasticsearch.indices.IndexingMemoryController;
|
||||
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
|
||||
import org.elasticsearch.indices.recovery.RecoveryFailedException;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.percolator.PercolatorService;
|
||||
|
@ -154,7 +152,6 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
private final SimilarityService similarityService;
|
||||
private final EngineConfig engineConfig;
|
||||
private final TranslogConfig translogConfig;
|
||||
private final IndicesQueryCache indicesQueryCache;
|
||||
private final IndexEventListener indexEventListener;
|
||||
private final IndexSettings idxSettings;
|
||||
|
||||
|
@ -227,7 +224,6 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
this.getService = new ShardGetService(indexSettings, this, mapperService);
|
||||
this.searchService = new ShardSearchStats(slowLog);
|
||||
this.shardWarmerService = new ShardIndexWarmerService(shardId, indexSettings);
|
||||
this.indicesQueryCache = provider.getIndicesQueryCache();
|
||||
this.shardQueryCache = new ShardRequestCache(shardId, indexSettings);
|
||||
this.shardFieldData = new ShardFieldData();
|
||||
this.indexFieldDataService = indexFieldDataService;
|
||||
|
@ -652,10 +648,6 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
return shardWarmerService.stats();
|
||||
}
|
||||
|
||||
public QueryCacheStats queryCacheStats() {
|
||||
return indicesQueryCache.getStats(shardId);
|
||||
}
|
||||
|
||||
public FieldDataStats fieldDataStats(String... fields) {
|
||||
return shardFieldData.stats(fields);
|
||||
}
|
||||
|
|
|
@ -55,8 +55,6 @@ import org.elasticsearch.index.mapper.internal.VersionFieldMapper;
|
|||
import org.elasticsearch.index.mapper.ip.IpFieldMapper;
|
||||
import org.elasticsearch.index.mapper.object.ObjectMapper;
|
||||
import org.elasticsearch.index.percolator.PercolatorFieldMapper;
|
||||
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
|
||||
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
|
||||
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
|
||||
import org.elasticsearch.indices.flush.SyncedFlushService;
|
||||
import org.elasticsearch.indices.mapper.MapperRegistry;
|
||||
|
@ -160,8 +158,6 @@ public class IndicesModule extends AbstractModule {
|
|||
bind(IndicesStore.class).asEagerSingleton();
|
||||
bind(IndicesClusterStateService.class).asEagerSingleton();
|
||||
bind(SyncedFlushService.class).asEagerSingleton();
|
||||
bind(IndicesQueryCache.class).asEagerSingleton();
|
||||
bind(IndicesRequestCache.class).asEagerSingleton();
|
||||
bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton();
|
||||
bind(IndicesTTLService.class).asEagerSingleton();
|
||||
bind(UpdateHelper.class).asEagerSingleton();
|
||||
|
|
|
@ -71,6 +71,8 @@ import org.elasticsearch.index.shard.IndexingStats;
|
|||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.store.IndexStoreConfig;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
|
||||
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
|
||||
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
|
||||
import org.elasticsearch.indices.mapper.MapperRegistry;
|
||||
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
|
||||
|
@ -124,6 +126,8 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
private final MapperRegistry mapperRegistry;
|
||||
private final IndexingMemoryController indexingMemoryController;
|
||||
private final TimeValue cleanInterval;
|
||||
private final IndicesRequestCache indicesRequestCache;
|
||||
private final IndicesQueryCache indicesQueryCache;
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
|
@ -146,6 +150,8 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
this.indicesQueriesRegistry = indicesQueriesRegistry;
|
||||
this.clusterService = clusterService;
|
||||
this.indexNameExpressionResolver = indexNameExpressionResolver;
|
||||
this.indicesRequestCache = new IndicesRequestCache(settings, threadPool);
|
||||
this.indicesQueryCache = new IndicesQueryCache(settings);
|
||||
this.mapperRegistry = mapperRegistry;
|
||||
clusterSettings.addSettingsUpdateConsumer(IndexStoreConfig.INDICES_STORE_THROTTLE_TYPE_SETTING, indexStoreConfig::setRateLimitingType);
|
||||
clusterSettings.addSettingsUpdateConsumer(IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING, indexStoreConfig::setRateLimitingThrottle);
|
||||
|
@ -196,7 +202,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
|
||||
@Override
|
||||
protected void doClose() {
|
||||
IOUtils.closeWhileHandlingException(analysisRegistry, indexingMemoryController, indicesFieldDataCache, fieldDataCacheCleaner);
|
||||
IOUtils.closeWhileHandlingException(analysisRegistry, indexingMemoryController, indicesFieldDataCache, fieldDataCacheCleaner, indicesRequestCache, indicesQueryCache);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -247,7 +253,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
if (indexShard.routingEntry() == null) {
|
||||
continue;
|
||||
}
|
||||
IndexShardStats indexShardStats = new IndexShardStats(indexShard.shardId(), new ShardStats[] { new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indexShard, flags), indexShard.commitStats()) });
|
||||
IndexShardStats indexShardStats = new IndexShardStats(indexShard.shardId(), new ShardStats[] { new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indicesQueryCache, indexShard, flags), indexShard.commitStats()) });
|
||||
if (!statsByShard.containsKey(indexService.index())) {
|
||||
statsByShard.put(indexService.index(), arrayAsArrayList(indexShardStats));
|
||||
} else {
|
||||
|
@ -348,10 +354,17 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
for (IndexEventListener listener : builtInListeners) {
|
||||
indexModule.addIndexEventListener(listener);
|
||||
}
|
||||
final IndexEventListener onStoreClose = new IndexEventListener() {
|
||||
@Override
|
||||
public void onStoreClosed(ShardId shardId) {
|
||||
indicesQueryCache.onClose(shardId);
|
||||
}
|
||||
};
|
||||
indexModule.addIndexEventListener(onStoreClose);
|
||||
indexModule.addIndexEventListener(oldShardsStats);
|
||||
final IndexEventListener listener = indexModule.freeze();
|
||||
listener.beforeIndexCreated(index, idxSettings.getSettings());
|
||||
final IndexService indexService = indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, mapperRegistry, indicesFieldDataCache, indexingMemoryController);
|
||||
final IndexService indexService = indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, indicesQueryCache, mapperRegistry, indicesFieldDataCache, indexingMemoryController);
|
||||
boolean success = false;
|
||||
try {
|
||||
assert indexService.getIndexEventListener() == listener;
|
||||
|
@ -420,6 +433,14 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
return circuitBreakerService;
|
||||
}
|
||||
|
||||
public IndicesRequestCache getIndicesRequestCache() {
|
||||
return indicesRequestCache;
|
||||
}
|
||||
|
||||
public IndicesQueryCache getIndicesQueryCache() {
|
||||
return indicesQueryCache;
|
||||
}
|
||||
|
||||
static class OldShardsStats implements IndexEventListener {
|
||||
|
||||
final SearchStats searchStats = new SearchStats();
|
||||
|
|
|
@ -64,7 +64,6 @@ public class IndicesQueryCache extends AbstractComponent implements QueryCache,
|
|||
// See onDocIdSetEviction for more info
|
||||
private final Map<Object, StatsAndCount> stats2 = new IdentityHashMap<>();
|
||||
|
||||
@Inject
|
||||
public IndicesQueryCache(Settings settings) {
|
||||
super(settings);
|
||||
final ByteSizeValue size = INDICES_CACHE_QUERY_SIZE_SETTING.get(settings);
|
||||
|
|
|
@ -45,6 +45,8 @@ import org.elasticsearch.common.unit.MemorySizeValue;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardState;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
@ -53,6 +55,7 @@ import org.elasticsearch.search.query.QueryPhase;
|
|||
import org.elasticsearch.search.query.QuerySearchResult;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
|
@ -74,7 +77,7 @@ import java.util.concurrent.TimeUnit;
|
|||
* There are still several TODOs left in this class, some easily addressable, some more complex, but the support
|
||||
* is functional.
|
||||
*/
|
||||
public class IndicesRequestCache extends AbstractComponent implements RemovalListener<IndicesRequestCache.Key, IndicesRequestCache.Value> {
|
||||
public class IndicesRequestCache extends AbstractComponent implements RemovalListener<IndicesRequestCache.Key, IndicesRequestCache.Value>, Closeable {
|
||||
|
||||
/**
|
||||
* A setting to enable or disable request caching on an index level. Its dynamic by default
|
||||
|
@ -89,7 +92,6 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis
|
|||
private static final Set<SearchType> CACHEABLE_SEARCH_TYPES = EnumSet.of(SearchType.QUERY_THEN_FETCH, SearchType.QUERY_AND_FETCH);
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
private final ClusterService clusterService;
|
||||
|
||||
private final TimeValue cleanInterval;
|
||||
private final Reaper reaper;
|
||||
|
@ -104,18 +106,13 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis
|
|||
|
||||
private volatile Cache<Key, Value> cache;
|
||||
|
||||
@Inject
|
||||
public IndicesRequestCache(Settings settings, ClusterService clusterService, ThreadPool threadPool) {
|
||||
public IndicesRequestCache(Settings settings, ThreadPool threadPool) {
|
||||
super(settings);
|
||||
this.clusterService = clusterService;
|
||||
this.threadPool = threadPool;
|
||||
this.cleanInterval = INDICES_CACHE_REQUEST_CLEAN_INTERVAL.get(settings);
|
||||
|
||||
this.size = INDICES_CACHE_QUERY_SIZE.get(settings);
|
||||
|
||||
this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null;
|
||||
buildCache();
|
||||
|
||||
this.reaper = new Reaper();
|
||||
threadPool.schedule(cleanInterval, ThreadPool.Names.SAME, reaper);
|
||||
}
|
||||
|
@ -123,10 +120,8 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis
|
|||
|
||||
private void buildCache() {
|
||||
long sizeInBytes = size.bytes();
|
||||
|
||||
CacheBuilder<Key, Value> cacheBuilder = CacheBuilder.<Key, Value>builder()
|
||||
.setMaximumWeight(sizeInBytes).weigher((k, v) -> k.ramBytesUsed() + v.ramBytesUsed()).removalListener(this);
|
||||
// cacheBuilder.concurrencyLevel(concurrencyLevel);
|
||||
|
||||
if (expire != null) {
|
||||
cacheBuilder.setExpireAfterAccess(TimeUnit.MILLISECONDS.toNanos(expire.millis()));
|
||||
|
@ -135,6 +130,7 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis
|
|||
cache = cacheBuilder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
reaper.close();
|
||||
cache.invalidateAll();
|
||||
|
@ -174,21 +170,17 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis
|
|||
if (!CACHEABLE_SEARCH_TYPES.contains(context.searchType())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
IndexMetaData index = clusterService.state().getMetaData().index(request.index());
|
||||
if (index == null) { // in case we didn't yet have the cluster state, or it just got deleted
|
||||
return false;
|
||||
}
|
||||
IndexSettings settings = context.indexShard().getIndexSettings();
|
||||
// if not explicitly set in the request, use the index setting, if not, use the request
|
||||
if (request.requestCache() == null) {
|
||||
if (INDEX_CACHE_REQUEST_ENABLED_SETTING.get(index.getSettings()) == false) {
|
||||
if (settings.getValue(INDEX_CACHE_REQUEST_ENABLED_SETTING) == false) {
|
||||
return false;
|
||||
}
|
||||
} else if (!request.requestCache()) {
|
||||
} else if (request.requestCache() == false) {
|
||||
return false;
|
||||
}
|
||||
// if the reader is not a directory reader, we can't get the version from it
|
||||
if (!(context.searcher().getIndexReader() instanceof DirectoryReader)) {
|
||||
if ((context.searcher().getIndexReader() instanceof DirectoryReader) == false) {
|
||||
return false;
|
||||
}
|
||||
// if now in millis is used (or in the future, a more generic "isDeterministic" flag
|
||||
|
|
|
@ -70,9 +70,7 @@ import org.elasticsearch.indices.IndicesModule;
|
|||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.analysis.AnalysisModule;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerModule;
|
||||
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
|
||||
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
|
||||
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
|
||||
import org.elasticsearch.indices.store.IndicesStore;
|
||||
import org.elasticsearch.indices.ttl.IndicesTTLService;
|
||||
import org.elasticsearch.monitor.MonitorService;
|
||||
|
@ -391,7 +389,6 @@ public class Node implements Closeable {
|
|||
toClose.add(injector.getInstance(IndicesTTLService.class));
|
||||
toClose.add(injector.getInstance(IndicesService.class));
|
||||
// close filter/fielddata caches after indices
|
||||
toClose.add(injector.getInstance(IndicesQueryCache.class));
|
||||
toClose.add(injector.getInstance(IndicesStore.class));
|
||||
toClose.add(() ->stopWatch.stop().start("routing"));
|
||||
toClose.add(injector.getInstance(RoutingService.class));
|
||||
|
|
|
@ -149,8 +149,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
|
|||
|
||||
@Inject
|
||||
public SearchService(Settings settings, ClusterSettings clusterSettings, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
|
||||
ScriptService scriptService, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase,
|
||||
IndicesRequestCache indicesQueryCache) {
|
||||
ScriptService scriptService, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase) {
|
||||
super(settings);
|
||||
this.parseFieldMatcher = new ParseFieldMatcher(settings);
|
||||
this.threadPool = threadPool;
|
||||
|
@ -162,7 +161,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
|
|||
this.dfsPhase = dfsPhase;
|
||||
this.queryPhase = queryPhase;
|
||||
this.fetchPhase = fetchPhase;
|
||||
this.indicesQueryCache = indicesQueryCache;
|
||||
this.indicesQueryCache = indicesService.getIndicesRequestCache();
|
||||
|
||||
TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings);
|
||||
this.defaultKeepAlive = DEFAULT_KEEPALIVE_SETTING.get(settings).millis();
|
||||
|
|
|
@ -88,6 +88,8 @@ public class IndexModuleTests extends ESTestCase {
|
|||
private Environment environment;
|
||||
private NodeEnvironment nodeEnvironment;
|
||||
private NodeServicesProvider nodeServicesProvider;
|
||||
private IndicesQueryCache indicesQueryCache = new IndicesQueryCache(settings);
|
||||
|
||||
private IndexService.ShardStoreDeleter deleter = new IndexService.ShardStoreDeleter() {
|
||||
@Override
|
||||
public void deleteShardStore(String reason, ShardLock lock, IndexSettings indexSettings) throws IOException {
|
||||
|
@ -103,7 +105,6 @@ public class IndexModuleTests extends ESTestCase {
|
|||
static NodeServicesProvider newNodeServiceProvider(Settings settings, Environment environment, Client client, ScriptEngineService... scriptEngineServices) throws IOException {
|
||||
// TODO this can be used in other place too - lets first refactor the IndicesQueriesRegistry
|
||||
ThreadPool threadPool = new ThreadPool("test");
|
||||
IndicesQueryCache indicesQueryCache = new IndicesQueryCache(settings);
|
||||
CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService();
|
||||
PageCacheRecycler recycler = new PageCacheRecycler(settings, threadPool);
|
||||
BigArrays bigArrays = new BigArrays(recycler, circuitBreakerService);
|
||||
|
@ -114,7 +115,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
ScriptSettings scriptSettings = new ScriptSettings(scriptEngineRegistry, scriptContextRegistry);
|
||||
ScriptService scriptService = new ScriptService(settings, environment, scriptEngines, new ResourceWatcherService(settings, threadPool), scriptEngineRegistry, scriptContextRegistry, scriptSettings);
|
||||
IndicesQueriesRegistry indicesQueriesRegistry = new IndicesQueriesRegistry(settings, emptyMap());
|
||||
return new NodeServicesProvider(threadPool, indicesQueryCache, bigArrays, client, scriptService, indicesQueriesRegistry, circuitBreakerService);
|
||||
return new NodeServicesProvider(threadPool, bigArrays, client, scriptService, indicesQueriesRegistry, circuitBreakerService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -143,7 +144,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
IndexModule module = new IndexModule(indexSettings, null, new AnalysisRegistry(null, environment));
|
||||
module.setSearcherWrapper((s) -> new Wrapper());
|
||||
module.engineFactory.set(new MockEngineFactory(AssertingDirectoryReader.class));
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, new IndicesFieldDataCache(settings, listener));
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, new IndicesFieldDataCache(settings, listener));
|
||||
assertTrue(indexService.getSearcherWrapper() instanceof Wrapper);
|
||||
assertSame(indexService.getEngineFactory(), module.engineFactory.get());
|
||||
indexService.close("simon says", false);
|
||||
|
@ -158,10 +159,9 @@ public class IndexModuleTests extends ESTestCase {
|
|||
.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), "foo_store")
|
||||
.build();
|
||||
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings);
|
||||
final Index index = indexSettings.getIndex();
|
||||
IndexModule module = new IndexModule(indexSettings, null, new AnalysisRegistry(null, environment));
|
||||
module.addIndexStore("foo_store", FooStore::new);
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, new IndicesFieldDataCache(settings, listener));
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, new IndicesFieldDataCache(settings, listener));
|
||||
assertTrue(indexService.getIndexStore() instanceof FooStore);
|
||||
try {
|
||||
module.addIndexStore("foo_store", FooStore::new);
|
||||
|
@ -184,7 +184,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
IndexModule module = new IndexModule(indexSettings, null, new AnalysisRegistry(null, environment));
|
||||
Consumer<Settings> listener = (s) -> {};
|
||||
module.addIndexEventListener(eventListener);
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry,
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry,
|
||||
new IndicesFieldDataCache(settings, this.listener));
|
||||
IndexSettings x = indexService.getIndexSettings();
|
||||
assertEquals(x.getSettings().getAsMap(), indexSettings.getSettings().getAsMap());
|
||||
|
@ -209,7 +209,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
|
||||
}
|
||||
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry,
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry,
|
||||
new IndicesFieldDataCache(settings, listener));
|
||||
assertSame(booleanSetting, indexService.getIndexSettings().getScopedSettings().get(booleanSetting.getKey()));
|
||||
|
||||
|
@ -236,7 +236,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
}
|
||||
});
|
||||
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry,
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry,
|
||||
new IndicesFieldDataCache(settings, listener));
|
||||
SimilarityService similarityService = indexService.similarityService();
|
||||
assertNotNull(similarityService.getSimilarity("my_similarity"));
|
||||
|
@ -254,7 +254,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
.build();
|
||||
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null, new AnalysisRegistry(null, environment));
|
||||
try {
|
||||
module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry,
|
||||
module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry,
|
||||
new IndicesFieldDataCache(settings, listener));
|
||||
} catch (IllegalArgumentException ex) {
|
||||
assertEquals("Unknown Similarity type [test_similarity] for [my_similarity]", ex.getMessage());
|
||||
|
@ -269,7 +269,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
.build();
|
||||
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null, new AnalysisRegistry(null, environment));
|
||||
try {
|
||||
module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry,
|
||||
module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry,
|
||||
new IndicesFieldDataCache(settings, listener));
|
||||
} catch (IllegalArgumentException ex) {
|
||||
assertEquals("Similarity [my_similarity] must have an associated type", ex.getMessage());
|
||||
|
@ -317,7 +317,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
assertEquals(e.getMessage(), "Can't register the same [query_cache] more than once for [custom]");
|
||||
}
|
||||
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry,
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry,
|
||||
new IndicesFieldDataCache(settings, listener));
|
||||
assertTrue(indexService.cache().query() instanceof CustomQueryCache);
|
||||
indexService.close("simon says", false);
|
||||
|
@ -328,7 +328,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
|
||||
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null, new AnalysisRegistry(null, environment));
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry,
|
||||
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry,
|
||||
new IndicesFieldDataCache(settings, listener));
|
||||
assertTrue(indexService.cache().query() instanceof IndexQueryCache);
|
||||
indexService.close("simon says", false);
|
||||
|
|
|
@ -564,7 +564,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||
IndexService test = indicesService.indexService("test");
|
||||
IndexShard shard = test.getShardOrNull(0);
|
||||
ShardStats stats = new ShardStats(shard.routingEntry(), shard.shardPath(), new CommonStats(shard, new CommonStatsFlags()), shard.commitStats());
|
||||
ShardStats stats = new ShardStats(shard.routingEntry(), shard.shardPath(), new CommonStats(indicesService.getIndicesQueryCache(), shard, new CommonStatsFlags()), shard.commitStats());
|
||||
assertEquals(shard.shardPath().getRootDataPath().toString(), stats.getDataPath());
|
||||
assertEquals(shard.shardPath().getRootStatePath().toString(), stats.getStatePath());
|
||||
assertEquals(shard.shardPath().isCustomDataPath(), stats.isCustomDataPath());
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.elasticsearch.common.settings.ClusterSettings;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.search.dfs.DfsPhase;
|
||||
|
@ -68,9 +67,9 @@ public class MockSearchService extends SearchService {
|
|||
@Inject
|
||||
public MockSearchService(Settings settings, ClusterSettings clusterSettings, ClusterService clusterService, IndicesService indicesService,
|
||||
ThreadPool threadPool, ScriptService scriptService, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays,
|
||||
DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase, IndicesRequestCache indicesQueryCache) {
|
||||
DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase) {
|
||||
super(settings, clusterSettings, clusterService, indicesService, threadPool, scriptService, pageCacheRecycler, bigArrays, dfsPhase,
|
||||
queryPhase, fetchPhase, indicesQueryCache);
|
||||
queryPhase, fetchPhase);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue