Merge pull request #16610 from s1monw/test_indices_request_cache

Refactor IndicesRequestCache to make it testable.
This commit is contained in:
Simon Willnauer 2016-02-12 09:27:20 +01:00
commit b5aee2075f
29 changed files with 934 additions and 526 deletions

View File

@ -649,8 +649,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]analysis[/\\]PreBuiltCacheFactory.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]analysis[/\\]PreBuiltTokenFilters.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]breaker[/\\]HierarchyCircuitBreakerService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]cache[/\\]query[/\\]terms[/\\]TermsLookup.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]cache[/\\]request[/\\]IndicesRequestCache.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]cluster[/\\]IndicesClusterStateService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]fielddata[/\\]cache[/\\]IndicesFieldDataCache.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]fielddata[/\\]cache[/\\]IndicesFieldDataCacheListener.java" checks="LineLength" />
@ -1306,7 +1304,6 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]analysis[/\\]PreBuiltAnalyzerIntegrationIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]analyze[/\\]AnalyzeActionIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]analyze[/\\]HunspellServiceIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]cache[/\\]query[/\\]IndicesRequestCacheIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]exists[/\\]indices[/\\]IndicesExistsIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]exists[/\\]types[/\\]TypesExistsIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]flush[/\\]FlushIT.java" checks="LineLength" />

View File

@ -35,7 +35,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -98,7 +97,7 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastByNodeAc
}
if (request.requestCache()) {
clearedAtLeastOne = true;
indicesService.getIndicesRequestCache().clear(shard);
indicesService.clearRequestCache(shard);
}
if (request.recycler()) {
logger.debug("Clear CacheRecycler on index [{}]", service.index());
@ -114,7 +113,7 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastByNodeAc
} else {
service.cache().clear("api");
service.fieldData().clear();
indicesService.getIndicesRequestCache().clear(shard);
indicesService.clearRequestCache(shard);
}
}
}

View File

@ -26,7 +26,6 @@ import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.cache.query.QueryCacheStats;
import org.elasticsearch.index.cache.request.RequestCacheStats;
import org.elasticsearch.index.engine.SegmentsStats;
@ -41,13 +40,11 @@ import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.index.suggest.stats.SuggestStats;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.index.warmer.WarmerStats;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import java.io.IOException;

View File

@ -65,8 +65,8 @@ import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.HunspellService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.indices.IndicesRequestCache;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
@ -307,11 +307,10 @@ public final class ClusterSettings extends AbstractScopedSettings {
ScriptService.SCRIPT_CACHE_SIZE_SETTING,
ScriptService.SCRIPT_CACHE_EXPIRE_SETTING,
ScriptService.SCRIPT_AUTO_RELOAD_ENABLED_SETTING,
IndicesService.INDICES_FIELDDATA_CLEAN_INTERVAL_SETTING,
IndicesService.INDICES_CACHE_CLEAN_INTERVAL_SETTING,
IndicesFieldDataCache.INDICES_FIELDDATA_CACHE_SIZE_KEY,
IndicesRequestCache.INDICES_CACHE_QUERY_SIZE,
IndicesRequestCache.INDICES_CACHE_QUERY_EXPIRE,
IndicesRequestCache.INDICES_CACHE_REQUEST_CLEAN_INTERVAL,
HunspellService.HUNSPELL_LAZY_LOAD,
HunspellService.HUNSPELL_IGNORE_CASE,
HunspellService.HUNSPELL_DICTIONARY_OPTIONS,

View File

@ -39,7 +39,7 @@ import org.elasticsearch.index.store.FsDirectoryService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.IndexWarmer;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
import org.elasticsearch.indices.IndicesRequestCache;
import java.util.Arrays;
import java.util.Collections;

View File

@ -37,7 +37,7 @@ import org.elasticsearch.index.similarity.SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.mapper.MapperRegistry;

View File

@ -25,7 +25,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.IndicesQueryCache;
/**
* The index-level query cache. This class mostly delegates to the node-level

View File

@ -19,27 +19,24 @@
package org.elasticsearch.index.cache.request;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.common.cache.RemovalListener;
import org.elasticsearch.common.cache.RemovalNotification;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
import org.elasticsearch.indices.IndicesRequestCache;
/**
*/
public class ShardRequestCache extends AbstractIndexShardComponent implements RemovalListener<IndicesRequestCache.Key, IndicesRequestCache.Value> {
public final class ShardRequestCache {
final CounterMetric evictionsMetric = new CounterMetric();
final CounterMetric totalMetric = new CounterMetric();
final CounterMetric hitCount = new CounterMetric();
final CounterMetric missCount = new CounterMetric();
public ShardRequestCache(ShardId shardId, IndexSettings indexSettings) {
super(shardId, indexSettings);
}
public RequestCacheStats stats() {
return new RequestCacheStats(totalMetric.count(), evictionsMetric.count(), hitCount.count(), missCount.count());
}
@ -52,21 +49,20 @@ public class ShardRequestCache extends AbstractIndexShardComponent implements Re
missCount.inc();
}
public void onCached(IndicesRequestCache.Key key, IndicesRequestCache.Value value) {
public void onCached(Accountable key, Accountable value) {
totalMetric.inc(key.ramBytesUsed() + value.ramBytesUsed());
}
@Override
public void onRemoval(RemovalNotification<IndicesRequestCache.Key, IndicesRequestCache.Value> removalNotification) {
if (removalNotification.getRemovalReason() == RemovalNotification.RemovalReason.EVICTED) {
public void onRemoval(Accountable key, Accountable value, boolean evicted) {
if (evicted) {
evictionsMetric.inc();
}
long dec = 0;
if (removalNotification.getKey() != null) {
dec += removalNotification.getKey().ramBytesUsed();
if (key != null) {
dec += key.ramBytesUsed();
}
if (removalNotification.getValue() != null) {
dec += removalNotification.getValue().ramBytesUsed();
if (value != null) {
dec += value.ramBytesUsed();
}
totalMetric.dec(dec);
}

View File

@ -27,7 +27,7 @@ import org.elasticsearch.index.query.MoreLikeThisQueryBuilder.Item;
import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilder;
import org.elasticsearch.index.search.MatchQuery;
import org.elasticsearch.indices.cache.query.terms.TermsLookup;
import org.elasticsearch.indices.TermsLookup;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.Template;

View File

@ -37,8 +37,7 @@ import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.indices.cache.query.terms.TermsLookup;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.indices.TermsLookup;
import java.io.IOException;
import java.util.ArrayList;

View File

@ -21,7 +21,7 @@ package org.elasticsearch.index.query;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.indices.cache.query.terms.TermsLookup;
import org.elasticsearch.indices.TermsLookup;
import java.io.IOException;
import java.util.ArrayList;

View File

@ -224,7 +224,7 @@ public class IndexShard extends AbstractIndexShardComponent {
this.getService = new ShardGetService(indexSettings, this, mapperService);
this.searchService = new ShardSearchStats(slowLog);
this.shardWarmerService = new ShardIndexWarmerService(shardId, indexSettings);
this.shardQueryCache = new ShardRequestCache(shardId, indexSettings);
this.shardQueryCache = new ShardRequestCache();
this.shardFieldData = new ShardFieldData();
this.indexFieldDataService = indexFieldDataService;
this.shardBitsetFilterCache = new ShardBitsetFilterCache(shardId, indexSettings);

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.indices.cache.query;
package org.elasticsearch.indices;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.Term;

View File

@ -0,0 +1,337 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices;
import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.ObjectSet;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.cache.CacheLoader;
import org.elasticsearch.common.cache.RemovalListener;
import org.elasticsearch.common.cache.RemovalNotification;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
/**
* The indices request cache allows to cache a shard level request stage responses, helping with improving
* similar requests that are potentially expensive (because of aggs for example). The cache is fully coherent
* with the semantics of NRT (the index reader version is part of the cache key), and relies on size based
* eviction to evict old reader associated cache entries as well as scheduler reaper to clean readers that
* are no longer used or closed shards.
* <p>
* Currently, the cache is only enabled for count requests, and can only be opted in on an index
* level setting that can be dynamically changed and defaults to false.
* <p>
* There are still several TODOs left in this class, some easily addressable, some more complex, but the support
* is functional.
*/
public final class IndicesRequestCache extends AbstractComponent implements RemovalListener<IndicesRequestCache.Key,
IndicesRequestCache.Value>, Closeable {
/**
* A setting to enable or disable request caching on an index level. Its dynamic by default
* since we are checking on the cluster state IndexMetaData always.
*/
public static final Setting<Boolean> INDEX_CACHE_REQUEST_ENABLED_SETTING = Setting.boolSetting("index.requests.cache.enable",
false, true, Setting.Scope.INDEX);
public static final Setting<ByteSizeValue> INDICES_CACHE_QUERY_SIZE = Setting.byteSizeSetting("indices.requests.cache.size", "1%",
false, Setting.Scope.CLUSTER);
public static final Setting<TimeValue> INDICES_CACHE_QUERY_EXPIRE = Setting.positiveTimeSetting("indices.requests.cache.expire",
new TimeValue(0), false, Setting.Scope.CLUSTER);
private final ConcurrentMap<CleanupKey, Boolean> registeredClosedListeners = ConcurrentCollections.newConcurrentMap();
private final Set<CleanupKey> keysToClean = ConcurrentCollections.newConcurrentSet();
private final ByteSizeValue size;
private final TimeValue expire;
private final Cache<Key, Value> cache;
IndicesRequestCache(Settings settings) {
super(settings);
this.size = INDICES_CACHE_QUERY_SIZE.get(settings);
this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null;
long sizeInBytes = size.bytes();
CacheBuilder<Key, Value> cacheBuilder = CacheBuilder.<Key, Value>builder()
.setMaximumWeight(sizeInBytes).weigher((k, v) -> k.ramBytesUsed() + v.ramBytesUsed()).removalListener(this);
if (expire != null) {
cacheBuilder.setExpireAfterAccess(TimeUnit.MILLISECONDS.toNanos(expire.millis()));
}
cache = cacheBuilder.build();
}
@Override
public void close() {
cache.invalidateAll();
}
void clear(CacheEntity entity) {
keysToClean.add(new CleanupKey(entity, -1));
cleanCache();
}
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
notification.getKey().entity.onRemoval(notification);
}
BytesReference getOrCompute(CacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) throws Exception {
final Key key = new Key(cacheEntity, reader.getVersion(), cacheKey);
Loader loader = new Loader(cacheEntity);
Value value = cache.computeIfAbsent(key, loader);
if (loader.isLoaded()) {
key.entity.onMiss();
// see if its the first time we see this reader, and make sure to register a cleanup key
CleanupKey cleanupKey = new CleanupKey(cacheEntity, reader.getVersion());
if (!registeredClosedListeners.containsKey(cleanupKey)) {
Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE);
if (previous == null) {
ElasticsearchDirectoryReader.addReaderCloseListener(reader, cleanupKey);
}
}
} else {
key.entity.onHit();
}
return value.reference;
}
private static class Loader implements CacheLoader<Key, Value> {
private final CacheEntity entity;
private boolean loaded;
Loader(CacheEntity entity) {
this.entity = entity;
}
public boolean isLoaded() {
return this.loaded;
}
@Override
public Value load(Key key) throws Exception {
Value value = entity.loadValue();
entity.onCached(key, value);
loaded = true;
return value;
}
}
/**
* Basic interface to make this cache testable.
*/
interface CacheEntity {
/**
* Loads the actual cache value. this is the heavy lifting part.
*/
Value loadValue() throws IOException;
/**
* Called after the value was loaded via {@link #loadValue()}
*/
void onCached(Key key, Value value);
/**
* Returns <code>true</code> iff the resource behind this entity is still open ie.
* entities assiciated with it can remain in the cache. ie. IndexShard is still open.
*/
boolean isOpen();
/**
* Returns the cache identity. this is, similar to {@link #isOpen()} the resource identity behind this cache entity.
* For instance IndexShard is the identity while a CacheEntity is per DirectoryReader. Yet, we group by IndexShard instance.
*/
Object getCacheIdentity();
/**
* Called each time this entity has a cache hit.
*/
void onHit();
/**
* Called each time this entity has a cache miss.
*/
void onMiss();
/**
* Called when this entity instance is removed
*/
void onRemoval(RemovalNotification<Key, Value> notification);
}
static class Value implements Accountable {
final BytesReference reference;
final long ramBytesUsed;
Value(BytesReference reference, long ramBytesUsed) {
this.reference = reference;
this.ramBytesUsed = ramBytesUsed;
}
@Override
public long ramBytesUsed() {
return ramBytesUsed;
}
@Override
public Collection<Accountable> getChildResources() {
return Collections.emptyList();
}
}
static class Key implements Accountable {
public final CacheEntity entity; // use as identity equality
public final long readerVersion; // use the reader version to now keep a reference to a "short" lived reader until its reaped
public final BytesReference value;
Key(CacheEntity entity, long readerVersion, BytesReference value) {
this.entity = entity;
this.readerVersion = readerVersion;
this.value = value;
}
@Override
public long ramBytesUsed() {
return RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_LONG + value.length();
}
@Override
public Collection<Accountable> getChildResources() {
// TODO: more detailed ram usage?
return Collections.emptyList();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
Key key = (Key) o;
if (readerVersion != key.readerVersion) return false;
if (!entity.getCacheIdentity().equals(key.entity.getCacheIdentity())) return false;
if (!value.equals(key.value)) return false;
return true;
}
@Override
public int hashCode() {
int result = entity.getCacheIdentity().hashCode();
result = 31 * result + Long.hashCode(readerVersion);
result = 31 * result + value.hashCode();
return result;
}
}
private class CleanupKey implements IndexReader.ReaderClosedListener {
final CacheEntity entity;
final long readerVersion; // use the reader version to now keep a reference to a "short" lived reader until its reaped
private CleanupKey(CacheEntity entity, long readerVersion) {
this.entity = entity;
this.readerVersion = readerVersion;
}
@Override
public void onClose(IndexReader reader) {
Boolean remove = registeredClosedListeners.remove(this);
if (remove != null) {
keysToClean.add(this);
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
CleanupKey that = (CleanupKey) o;
if (readerVersion != that.readerVersion) return false;
if (!entity.getCacheIdentity().equals(that.entity.getCacheIdentity())) return false;
return true;
}
@Override
public int hashCode() {
int result = entity.getCacheIdentity().hashCode();
result = 31 * result + Long.hashCode(readerVersion);
return result;
}
}
synchronized void cleanCache() {
final ObjectSet<CleanupKey> currentKeysToClean = new ObjectHashSet<>();
final ObjectSet<Object> currentFullClean = new ObjectHashSet<>();
currentKeysToClean.clear();
currentFullClean.clear();
for (Iterator<CleanupKey> iterator = keysToClean.iterator(); iterator.hasNext(); ) {
CleanupKey cleanupKey = iterator.next();
iterator.remove();
if (cleanupKey.readerVersion == -1 || cleanupKey.entity.isOpen() == false) {
// -1 indicates full cleanup, as does a closed shard
currentFullClean.add(cleanupKey.entity.getCacheIdentity());
} else {
currentKeysToClean.add(cleanupKey);
}
}
if (!currentKeysToClean.isEmpty() || !currentFullClean.isEmpty()) {
for (Iterator<Key> iterator = cache.keys().iterator(); iterator.hasNext(); ) {
Key key = iterator.next();
if (currentFullClean.contains(key.entity.getCacheIdentity())) {
iterator.remove();
} else {
if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerVersion))) {
iterator.remove();
}
}
}
}
cache.refresh();
}
/**
* Returns the current size of the cache
*/
final int count() {
return cache.count();
}
final int numRegisteredCloseListeners() { // for testing
return registeredClosedListeners.size();
}
}

View File

@ -19,8 +19,8 @@
package org.elasticsearch.indices;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
@ -29,15 +29,19 @@ import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.cache.RemovalNotification;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.ClusterSettings;
@ -56,6 +60,7 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.NodeServicesProvider;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.cache.request.ShardRequestCache;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.flush.FlushStats;
@ -67,21 +72,25 @@ import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QueryPhase;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -106,7 +115,7 @@ import static org.elasticsearch.common.util.CollectionUtils.arrayAsArrayList;
public class IndicesService extends AbstractLifecycleComponent<IndicesService> implements Iterable<IndexService>, IndexService.ShardStoreDeleter {
public static final String INDICES_SHARDS_CLOSED_TIMEOUT = "indices.shards_closed_timeout";
public static final Setting<TimeValue> INDICES_FIELDDATA_CLEAN_INTERVAL_SETTING = Setting.positiveTimeSetting("indices.fielddata.cache.cleanup_interval", TimeValue.timeValueMinutes(1), false, Setting.Scope.CLUSTER);
public static final Setting<TimeValue> INDICES_CACHE_CLEAN_INTERVAL_SETTING = Setting.positiveTimeSetting("indices.cache.cleanup_interval", TimeValue.timeValueMinutes(1), false, Setting.Scope.CLUSTER);
private final PluginsService pluginsService;
private final NodeEnvironment nodeEnv;
private final TimeValue shardsClosedTimeout;
@ -116,7 +125,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final IndexScopedSettings indexScopeSetting;
private final IndicesFieldDataCache indicesFieldDataCache;
private final FieldDataCacheCleaner fieldDataCacheCleaner;
private final CacheCleaner cacheCleaner;
private final ThreadPool threadPool;
private final CircuitBreakerService circuitBreakerService;
private volatile Map<String, IndexService> indices = emptyMap();
@ -132,7 +141,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
@Override
protected void doStart() {
// Start thread that will manage cleaning the field data cache periodically
threadPool.schedule(this.cleanInterval, ThreadPool.Names.SAME, this.fieldDataCacheCleaner);
threadPool.schedule(this.cleanInterval, ThreadPool.Names.SAME, this.cacheCleaner);
}
@Inject
@ -150,7 +159,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
this.indicesQueriesRegistry = indicesQueriesRegistry;
this.clusterService = clusterService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.indicesRequestCache = new IndicesRequestCache(settings, threadPool);
this.indicesRequestCache = new IndicesRequestCache(settings);
this.indicesQueryCache = new IndicesQueryCache(settings);
this.mapperRegistry = mapperRegistry;
clusterSettings.addSettingsUpdateConsumer(IndexStoreConfig.INDICES_STORE_THROTTLE_TYPE_SETTING, indexStoreConfig::setRateLimitingType);
@ -165,8 +174,8 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
circuitBreakerService.getBreaker(CircuitBreaker.FIELDDATA).addWithoutBreaking(-sizeInBytes);
}
});
this.cleanInterval = INDICES_FIELDDATA_CLEAN_INTERVAL_SETTING.get(settings);
this.fieldDataCacheCleaner = new FieldDataCacheCleaner(indicesFieldDataCache, logger, threadPool, this.cleanInterval);
this.cleanInterval = INDICES_CACHE_CLEAN_INTERVAL_SETTING.get(settings);
this.cacheCleaner = new CacheCleaner(indicesFieldDataCache, indicesRequestCache, logger, threadPool, this.cleanInterval);
}
@ -202,7 +211,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
@Override
protected void doClose() {
IOUtils.closeWhileHandlingException(analysisRegistry, indexingMemoryController, indicesFieldDataCache, fieldDataCacheCleaner, indicesRequestCache, indicesQueryCache);
IOUtils.closeWhileHandlingException(analysisRegistry, indexingMemoryController, indicesFieldDataCache, cacheCleaner, indicesRequestCache, indicesQueryCache);
}
/**
@ -433,10 +442,6 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
return circuitBreakerService;
}
public IndicesRequestCache getIndicesRequestCache() {
return indicesRequestCache;
}
public IndicesQueryCache getIndicesQueryCache() {
return indicesQueryCache;
}
@ -827,16 +832,18 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
* has an entry invalidated may not clean up the entry if it is not read from
* or written to after invalidation.
*/
private final static class FieldDataCacheCleaner implements Runnable, Releasable {
private final static class CacheCleaner implements Runnable, Releasable {
private final IndicesFieldDataCache cache;
private final ESLogger logger;
private final ThreadPool threadPool;
private final TimeValue interval;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final IndicesRequestCache requestCache;
public FieldDataCacheCleaner(IndicesFieldDataCache cache, ESLogger logger, ThreadPool threadPool, TimeValue interval) {
public CacheCleaner(IndicesFieldDataCache cache, IndicesRequestCache requestCache, ESLogger logger, ThreadPool threadPool, TimeValue interval) {
this.cache = cache;
this.requestCache = requestCache;
this.logger = logger;
this.threadPool = threadPool;
this.interval = interval;
@ -856,6 +863,12 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
if (logger.isTraceEnabled()) {
logger.trace("periodic field data cache cleanup finished in {} milliseconds", TimeValue.nsecToMSec(System.nanoTime() - startTimeNS));
}
try {
this.requestCache.cleanCache();
} catch (Exception e) {
logger.warn("Exception during periodic request cache cleanup:", e);
}
// Reschedule itself to run again if not closed
if (closed.get() == false) {
threadPool.schedule(interval, ThreadPool.Names.SAME, this);
@ -867,4 +880,148 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
closed.compareAndSet(false, true);
}
}
private static final Set<SearchType> CACHEABLE_SEARCH_TYPES = EnumSet.of(SearchType.QUERY_THEN_FETCH, SearchType.QUERY_AND_FETCH);
/**
* Can the shard request be cached at all?
*/
public boolean canCache(ShardSearchRequest request, SearchContext context) {
if (request.template() != null) {
return false;
}
// for now, only enable it for requests with no hits
if (context.size() != 0) {
return false;
}
// We cannot cache with DFS because results depend not only on the content of the index but also
// on the overridden statistics. So if you ran two queries on the same index with different stats
// (because an other shard was updated) you would get wrong results because of the scores
// (think about top_hits aggs or scripts using the score)
if (!CACHEABLE_SEARCH_TYPES.contains(context.searchType())) {
return false;
}
IndexSettings settings = context.indexShard().getIndexSettings();
// if not explicitly set in the request, use the index setting, if not, use the request
if (request.requestCache() == null) {
if (settings.getValue(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING) == false) {
return false;
}
} else if (request.requestCache() == false) {
return false;
}
// if the reader is not a directory reader, we can't get the version from it
if ((context.searcher().getIndexReader() instanceof DirectoryReader) == false) {
return false;
}
// if now in millis is used (or in the future, a more generic "isDeterministic" flag
// then we can't cache based on "now" key within the search request, as it is not deterministic
if (context.nowInMillisUsed()) {
return false;
}
return true;
}
public void clearRequestCache(IndexShard shard) {
if (shard == null) {
return;
}
indicesRequestCache.clear(new IndexShardCacheEntity(shard));
logger.trace("{} explicit cache clear", shard.shardId());
}
/**
* Loads the cache result, computing it if needed by executing the query phase and otherwise deserializing the cached
* value into the {@link SearchContext#queryResult() context's query result}. The combination of load + compute allows
* to have a single load operation that will cause other requests with the same key to wait till its loaded an reuse
* the same cache.
*/
public void loadIntoContext(ShardSearchRequest request, SearchContext context, QueryPhase queryPhase) throws Exception {
assert canCache(request, context);
final IndexShardCacheEntity entity = new IndexShardCacheEntity(context.indexShard(), queryPhase, context);
final DirectoryReader directoryReader = context.searcher().getDirectoryReader();
final BytesReference bytesReference = indicesRequestCache.getOrCompute(entity, directoryReader, request.cacheKey());
if (entity.loaded == false) { // if we have loaded this we don't need to do anything
// restore the cached query result into the context
final QuerySearchResult result = context.queryResult();
result.readFromWithId(context.id(), bytesReference.streamInput());
result.shardTarget(context.shardTarget());
}
}
static final class IndexShardCacheEntity implements IndicesRequestCache.CacheEntity {
private final QueryPhase queryPhase;
private final SearchContext context;
private final IndexShard indexShard;
private final ShardRequestCache requestCache;
private boolean loaded = false;
IndexShardCacheEntity(IndexShard indexShard) {
this(indexShard, null, null);
}
public IndexShardCacheEntity(IndexShard indexShard, QueryPhase queryPhase, SearchContext context) {
this.queryPhase = queryPhase;
this.context = context;
this.indexShard = indexShard;
this.requestCache = indexShard.requestCache();
}
@Override
public IndicesRequestCache.Value loadValue() throws IOException {
queryPhase.execute(context);
/* BytesStreamOutput allows to pass the expected size but by default uses
* BigArrays.PAGE_SIZE_IN_BYTES which is 16k. A common cached result ie.
* a date histogram with 3 buckets is ~100byte so 16k might be very wasteful
* since we don't shrink to the actual size once we are done serializing.
* By passing 512 as the expected size we will resize the byte array in the stream
* slowly until we hit the page size and don't waste too much memory for small query
* results.*/
final int expectedSizeInBytes = 512;
try (BytesStreamOutput out = new BytesStreamOutput(expectedSizeInBytes)) {
context.queryResult().writeToNoId(out);
// for now, keep the paged data structure, which might have unused bytes to fill a page, but better to keep
// the memory properly paged instead of having varied sized bytes
final BytesReference reference = out.bytes();
loaded = true;
return new IndicesRequestCache.Value(reference, out.ramBytesUsed());
}
}
@Override
public void onCached(IndicesRequestCache.Key key, IndicesRequestCache.Value value) {
requestCache.onCached(key, value);
}
@Override
public boolean isOpen() {
return indexShard.state() != IndexShardState.CLOSED;
}
@Override
public Object getCacheIdentity() {
return indexShard;
}
@Override
public void onHit() {
requestCache.onHit();
}
@Override
public void onMiss() {
requestCache.onMiss();
}
@Override
public void onRemoval(RemovalNotification<IndicesRequestCache.Key, IndicesRequestCache.Value> notification) {
requestCache.onRemoval(notification.getKey(), notification.getValue(), notification.getRemovalReason() == RemovalNotification.RemovalReason.EVICTED);
}
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.indices.cache.query.terms;
package org.elasticsearch.indices;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.io.stream.StreamInput;
@ -123,11 +123,12 @@ public class TermsLookup implements Writeable<TermsLookup>, ToXContent {
path = parser.text();
break;
default:
throw new ParsingException(parser.getTokenLocation(), "[" + TermsQueryBuilder.NAME + "] query does not support [" + currentFieldName
+ "] within lookup element");
throw new ParsingException(parser.getTokenLocation(), "[" + TermsQueryBuilder.NAME +
"] query does not support [" + currentFieldName + "] within lookup element");
}
} else {
throw new ParsingException(parser.getTokenLocation(), "[" + TermsQueryBuilder.NAME + "] unknown token [" + token + "] after [" + currentFieldName + "]");
throw new ParsingException(parser.getTokenLocation(), "[" + TermsQueryBuilder.NAME + "] unknown token ["
+ token + "] after [" + currentFieldName + "]");
}
}
return new TermsLookup(index, type, id, path).routing(routing);

View File

@ -1,443 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.cache.request;
import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.ObjectSet;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.cache.CacheLoader;
import org.elasticsearch.common.cache.RemovalListener;
import org.elasticsearch.common.cache.RemovalNotification;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.MemorySizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QueryPhase;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
/**
* The indices request cache allows to cache a shard level request stage responses, helping with improving
* similar requests that are potentially expensive (because of aggs for example). The cache is fully coherent
* with the semantics of NRT (the index reader version is part of the cache key), and relies on size based
* eviction to evict old reader associated cache entries as well as scheduler reaper to clean readers that
* are no longer used or closed shards.
* <p>
* Currently, the cache is only enabled for count requests, and can only be opted in on an index
* level setting that can be dynamically changed and defaults to false.
* <p>
* There are still several TODOs left in this class, some easily addressable, some more complex, but the support
* is functional.
*/
public class IndicesRequestCache extends AbstractComponent implements RemovalListener<IndicesRequestCache.Key, IndicesRequestCache.Value>, Closeable {
/**
* A setting to enable or disable request caching on an index level. Its dynamic by default
* since we are checking on the cluster state IndexMetaData always.
*/
public static final Setting<Boolean> INDEX_CACHE_REQUEST_ENABLED_SETTING = Setting.boolSetting("index.requests.cache.enable", false, true, Setting.Scope.INDEX);
public static final Setting<TimeValue> INDICES_CACHE_REQUEST_CLEAN_INTERVAL = Setting.positiveTimeSetting("indices.requests.cache.clean_interval", TimeValue.timeValueSeconds(60), false, Setting.Scope.CLUSTER);
public static final Setting<ByteSizeValue> INDICES_CACHE_QUERY_SIZE = Setting.byteSizeSetting("indices.requests.cache.size", "1%", false, Setting.Scope.CLUSTER);
public static final Setting<TimeValue> INDICES_CACHE_QUERY_EXPIRE = Setting.positiveTimeSetting("indices.requests.cache.expire", new TimeValue(0), false, Setting.Scope.CLUSTER);
private static final Set<SearchType> CACHEABLE_SEARCH_TYPES = EnumSet.of(SearchType.QUERY_THEN_FETCH, SearchType.QUERY_AND_FETCH);
private final ThreadPool threadPool;
private final TimeValue cleanInterval;
private final Reaper reaper;
final ConcurrentMap<CleanupKey, Boolean> registeredClosedListeners = ConcurrentCollections.newConcurrentMap();
final Set<CleanupKey> keysToClean = ConcurrentCollections.newConcurrentSet();
//TODO make these changes configurable on the cluster level
private final ByteSizeValue size;
private final TimeValue expire;
private volatile Cache<Key, Value> cache;
public IndicesRequestCache(Settings settings, ThreadPool threadPool) {
super(settings);
this.threadPool = threadPool;
this.cleanInterval = INDICES_CACHE_REQUEST_CLEAN_INTERVAL.get(settings);
this.size = INDICES_CACHE_QUERY_SIZE.get(settings);
this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null;
buildCache();
this.reaper = new Reaper();
threadPool.schedule(cleanInterval, ThreadPool.Names.SAME, reaper);
}
private void buildCache() {
long sizeInBytes = size.bytes();
CacheBuilder<Key, Value> cacheBuilder = CacheBuilder.<Key, Value>builder()
.setMaximumWeight(sizeInBytes).weigher((k, v) -> k.ramBytesUsed() + v.ramBytesUsed()).removalListener(this);
if (expire != null) {
cacheBuilder.setExpireAfterAccess(TimeUnit.MILLISECONDS.toNanos(expire.millis()));
}
cache = cacheBuilder.build();
}
@Override
public void close() {
reaper.close();
cache.invalidateAll();
}
public void clear(IndexShard shard) {
if (shard == null) {
return;
}
keysToClean.add(new CleanupKey(shard, -1));
logger.trace("{} explicit cache clear", shard.shardId());
reaper.reap();
}
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
notification.getKey().shard.requestCache().onRemoval(notification);
}
/**
* Can the shard request be cached at all?
*/
public boolean canCache(ShardSearchRequest request, SearchContext context) {
if (request.template() != null) {
return false;
}
// for now, only enable it for requests with no hits
if (context.size() != 0) {
return false;
}
// We cannot cache with DFS because results depend not only on the content of the index but also
// on the overridden statistics. So if you ran two queries on the same index with different stats
// (because an other shard was updated) you would get wrong results because of the scores
// (think about top_hits aggs or scripts using the score)
if (!CACHEABLE_SEARCH_TYPES.contains(context.searchType())) {
return false;
}
IndexSettings settings = context.indexShard().getIndexSettings();
// if not explicitly set in the request, use the index setting, if not, use the request
if (request.requestCache() == null) {
if (settings.getValue(INDEX_CACHE_REQUEST_ENABLED_SETTING) == false) {
return false;
}
} else if (request.requestCache() == false) {
return false;
}
// if the reader is not a directory reader, we can't get the version from it
if ((context.searcher().getIndexReader() instanceof DirectoryReader) == false) {
return false;
}
// if now in millis is used (or in the future, a more generic "isDeterministic" flag
// then we can't cache based on "now" key within the search request, as it is not deterministic
if (context.nowInMillisUsed()) {
return false;
}
return true;
}
/**
* Loads the cache result, computing it if needed by executing the query phase and otherwise deserializing the cached
* value into the {@link SearchContext#queryResult() context's query result}. The combination of load + compute allows
* to have a single load operation that will cause other requests with the same key to wait till its loaded an reuse
* the same cache.
*/
public void loadIntoContext(final ShardSearchRequest request, final SearchContext context, final QueryPhase queryPhase) throws Exception {
assert canCache(request, context);
Key key = buildKey(request, context);
Loader loader = new Loader(queryPhase, context);
Value value = cache.computeIfAbsent(key, loader);
if (loader.isLoaded()) {
key.shard.requestCache().onMiss();
// see if its the first time we see this reader, and make sure to register a cleanup key
CleanupKey cleanupKey = new CleanupKey(context.indexShard(), ((DirectoryReader) context.searcher().getIndexReader()).getVersion());
if (!registeredClosedListeners.containsKey(cleanupKey)) {
Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE);
if (previous == null) {
ElasticsearchDirectoryReader.addReaderCloseListener(context.searcher().getDirectoryReader(), cleanupKey);
}
}
} else {
key.shard.requestCache().onHit();
// restore the cached query result into the context
final QuerySearchResult result = context.queryResult();
result.readFromWithId(context.id(), value.reference.streamInput());
result.shardTarget(context.shardTarget());
}
}
private static class Loader implements CacheLoader<Key, Value> {
private final QueryPhase queryPhase;
private final SearchContext context;
private boolean loaded;
Loader(QueryPhase queryPhase, SearchContext context) {
this.queryPhase = queryPhase;
this.context = context;
}
public boolean isLoaded() {
return this.loaded;
}
@Override
public Value load(Key key) throws Exception {
queryPhase.execute(context);
/* BytesStreamOutput allows to pass the expected size but by default uses
* BigArrays.PAGE_SIZE_IN_BYTES which is 16k. A common cached result ie.
* a date histogram with 3 buckets is ~100byte so 16k might be very wasteful
* since we don't shrink to the actual size once we are done serializing.
* By passing 512 as the expected size we will resize the byte array in the stream
* slowly until we hit the page size and don't waste too much memory for small query
* results.*/
final int expectedSizeInBytes = 512;
try (BytesStreamOutput out = new BytesStreamOutput(expectedSizeInBytes)) {
context.queryResult().writeToNoId(out);
// for now, keep the paged data structure, which might have unused bytes to fill a page, but better to keep
// the memory properly paged instead of having varied sized bytes
final BytesReference reference = out.bytes();
loaded = true;
Value value = new Value(reference, out.ramBytesUsed());
key.shard.requestCache().onCached(key, value);
return value;
}
}
}
public static class Value implements Accountable {
final BytesReference reference;
final long ramBytesUsed;
public Value(BytesReference reference, long ramBytesUsed) {
this.reference = reference;
this.ramBytesUsed = ramBytesUsed;
}
@Override
public long ramBytesUsed() {
return ramBytesUsed;
}
@Override
public Collection<Accountable> getChildResources() {
return Collections.emptyList();
}
}
public static class Key implements Accountable {
public final IndexShard shard; // use as identity equality
public final long readerVersion; // use the reader version to now keep a reference to a "short" lived reader until its reaped
public final BytesReference value;
Key(IndexShard shard, long readerVersion, BytesReference value) {
this.shard = shard;
this.readerVersion = readerVersion;
this.value = value;
}
@Override
public long ramBytesUsed() {
return RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_LONG + value.length();
}
@Override
public Collection<Accountable> getChildResources() {
// TODO: more detailed ram usage?
return Collections.emptyList();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
Key key = (Key) o;
if (readerVersion != key.readerVersion) return false;
if (!shard.equals(key.shard)) return false;
if (!value.equals(key.value)) return false;
return true;
}
@Override
public int hashCode() {
int result = shard.hashCode();
result = 31 * result + Long.hashCode(readerVersion);
result = 31 * result + value.hashCode();
return result;
}
}
private class CleanupKey implements IndexReader.ReaderClosedListener {
IndexShard indexShard;
long readerVersion; // use the reader version to now keep a reference to a "short" lived reader until its reaped
private CleanupKey(IndexShard indexShard, long readerVersion) {
this.indexShard = indexShard;
this.readerVersion = readerVersion;
}
@Override
public void onClose(IndexReader reader) {
Boolean remove = registeredClosedListeners.remove(this);
if (remove != null) {
keysToClean.add(this);
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
CleanupKey that = (CleanupKey) o;
if (readerVersion != that.readerVersion) return false;
if (!indexShard.equals(that.indexShard)) return false;
return true;
}
@Override
public int hashCode() {
int result = indexShard.hashCode();
result = 31 * result + Long.hashCode(readerVersion);
return result;
}
}
private class Reaper implements Runnable {
private final ObjectSet<CleanupKey> currentKeysToClean = new ObjectHashSet<>();
private final ObjectSet<IndexShard> currentFullClean = new ObjectHashSet<>();
private volatile boolean closed;
void close() {
closed = true;
}
@Override
public void run() {
if (closed) {
return;
}
if (keysToClean.isEmpty()) {
schedule();
return;
}
try {
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
reap();
schedule();
}
});
} catch (EsRejectedExecutionException ex) {
logger.debug("Can not run ReaderCleaner - execution rejected", ex);
}
}
private void schedule() {
try {
threadPool.schedule(cleanInterval, ThreadPool.Names.SAME, this);
} catch (EsRejectedExecutionException ex) {
logger.debug("Can not schedule ReaderCleaner - execution rejected", ex);
}
}
synchronized void reap() {
currentKeysToClean.clear();
currentFullClean.clear();
for (Iterator<CleanupKey> iterator = keysToClean.iterator(); iterator.hasNext(); ) {
CleanupKey cleanupKey = iterator.next();
iterator.remove();
if (cleanupKey.readerVersion == -1 || cleanupKey.indexShard.state() == IndexShardState.CLOSED) {
// -1 indicates full cleanup, as does a closed shard
currentFullClean.add(cleanupKey.indexShard);
} else {
currentKeysToClean.add(cleanupKey);
}
}
if (!currentKeysToClean.isEmpty() || !currentFullClean.isEmpty()) {
CleanupKey lookupKey = new CleanupKey(null, -1);
for (Iterator<Key> iterator = cache.keys().iterator(); iterator.hasNext(); ) {
Key key = iterator.next();
if (currentFullClean.contains(key.shard)) {
iterator.remove();
} else {
lookupKey.indexShard = key.shard;
lookupKey.readerVersion = key.readerVersion;
if (currentKeysToClean.contains(lookupKey)) {
iterator.remove();
}
}
}
}
cache.refresh();
currentKeysToClean.clear();
currentFullClean.clear();
}
}
private static Key buildKey(ShardSearchRequest request, SearchContext context) throws Exception {
// TODO: for now, this will create different keys for different JSON order
// TODO: tricky to get around this, need to parse and order all, which can be expensive
return new Key(context.indexShard(),
((DirectoryReader) context.searcher().getIndexReader()).getVersion(),
request.cacheKey());
}
}

View File

@ -54,7 +54,7 @@ import org.elasticsearch.index.search.stats.StatsGroupsParseElement;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
import org.elasticsearch.indices.IndicesRequestCache;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptService;
@ -131,8 +131,6 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
private final FetchPhase fetchPhase;
private final IndicesRequestCache indicesQueryCache;
private final long defaultKeepAlive;
private volatile TimeValue defaultSearchTimeout;
@ -161,7 +159,6 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
this.dfsPhase = dfsPhase;
this.queryPhase = queryPhase;
this.fetchPhase = fetchPhase;
this.indicesQueryCache = indicesService.getIndicesRequestCache();
TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings);
this.defaultKeepAlive = DEFAULT_KEEPALIVE_SETTING.get(settings).millis();
@ -250,9 +247,9 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
*/
private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final SearchContext context,
final QueryPhase queryPhase) throws Exception {
final boolean canCache = indicesQueryCache.canCache(request, context);
final boolean canCache = indicesService.canCache(request, context);
if (canCache) {
indicesQueryCache.loadIntoContext(request, context, queryPhase);
indicesService.loadIntoContext(request, context, queryPhase);
} else {
queryPhase.execute(context);
}

View File

@ -55,9 +55,8 @@ import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.script.ScriptContextRegistry;

View File

@ -33,7 +33,7 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.indices.cache.query.terms.TermsLookup;
import org.elasticsearch.indices.TermsLookup;
import org.hamcrest.Matchers;
import org.junit.Before;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.indices.cache.query;
package org.elasticsearch.indices;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
@ -35,9 +35,9 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.cache.query.QueryCacheStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;

View File

@ -17,11 +17,11 @@
* under the License.
*/
package org.elasticsearch.indices.cache.query;
package org.elasticsearch.indices;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
import org.elasticsearch.indices.IndicesRequestCache;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
@ -50,15 +50,18 @@ public class IndicesRequestCacheIT extends ESIntegTestCase {
// which used to not work well with the query cache because of the handles stream output
// see #9500
final SearchResponse r1 = client().prepareSearch("index").setSize(0).setSearchType(SearchType.QUERY_THEN_FETCH)
.addAggregation(dateHistogram("histo").field("f").timeZone("+01:00").minDocCount(0).interval(DateHistogramInterval.MONTH)).get();
.addAggregation(dateHistogram("histo").field("f").timeZone("+01:00")
.minDocCount(0).interval(DateHistogramInterval.MONTH)).get();
assertSearchResponse(r1);
// The cached is actually used
assertThat(client().admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(), greaterThan(0L));
assertThat(client().admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache()
.getMemorySizeInBytes(), greaterThan(0L));
for (int i = 0; i < 10; ++i) {
final SearchResponse r2 = client().prepareSearch("index").setSize(0).setSearchType(SearchType.QUERY_THEN_FETCH)
.addAggregation(dateHistogram("histo").field("f").timeZone("+01:00").minDocCount(0).interval(DateHistogramInterval.MONTH)).get();
.addAggregation(dateHistogram("histo").field("f").timeZone("+01:00").minDocCount(0)
.interval(DateHistogramInterval.MONTH)).get();
assertSearchResponse(r2);
Histogram h1 = r1.getAggregations().get("histo");
Histogram h2 = r2.getAggregations().get("histo");

View File

@ -0,0 +1,366 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.cache.RemovalNotification;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.cache.request.ShardRequestCache;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
public class IndicesRequestCacheTests extends ESTestCase {
public void testBasicOperationsCache() throws Exception {
ShardRequestCache requestCacheStats = new ShardRequestCache();
IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY);
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig());
writer.addDocument(newDoc(0, "foo"));
DirectoryReader reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", "bar", 1));
TermQueryBuilder termQuery = new TermQueryBuilder("id", "0");
AtomicBoolean indexShard = new AtomicBoolean(true);
TestEntity entity = new TestEntity(requestCacheStats, reader, indexShard, 0);
// initial cache
BytesReference value = cache.getOrCompute(entity, reader, termQuery.buildAsBytes());
assertEquals("foo", value.toUtf8());
assertEquals(0, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertEquals(1, entity.loaded);
assertEquals(1, cache.count());
// cache hit
value = cache.getOrCompute(entity, reader, termQuery.buildAsBytes());
assertEquals("foo", value.toUtf8());
assertEquals(1, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertEquals(1, entity.loaded);
assertEquals(1, cache.count());
assertTrue(requestCacheStats.stats().getMemorySize().bytesAsInt() > value.length());
assertEquals(1, cache.numRegisteredCloseListeners());
// release
if (randomBoolean()) {
reader.close();
} else {
indexShard.set(false); // closed shard but reader is still open
cache.clear(entity);
}
cache.cleanCache();
assertEquals(1, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertEquals(1, entity.loaded);
assertEquals(0, cache.count());
assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt());
IOUtils.close(reader, writer, dir, cache);
assertEquals(0, cache.numRegisteredCloseListeners());
}
public void testCacheWithDifferentEntityInstance() throws Exception {
IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY);
AtomicBoolean indexShard = new AtomicBoolean(true);
ShardRequestCache requestCacheStats = new ShardRequestCache();
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig());
writer.addDocument(newDoc(0, "foo"));
DirectoryReader reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", "bar", 1));
TermQueryBuilder termQuery = new TermQueryBuilder("id", "0");
TestEntity entity = new TestEntity(requestCacheStats, reader, indexShard, 0);
// initial cache
BytesReference value = cache.getOrCompute(entity, reader, termQuery.buildAsBytes());
assertEquals("foo", value.toUtf8());
assertEquals(0, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertEquals(1, entity.loaded);
assertEquals(1, cache.count());
assertEquals(1, cache.numRegisteredCloseListeners());
final int cacheSize = requestCacheStats.stats().getMemorySize().bytesAsInt();
value = cache.getOrCompute(new TestEntity(requestCacheStats, reader, indexShard, 0), reader, termQuery.buildAsBytes());
assertEquals("foo", value.toUtf8());
assertEquals(1, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertEquals(1, entity.loaded);
assertEquals(1, cache.count());
assertEquals(cacheSize, requestCacheStats.stats().getMemorySize().bytesAsInt());
assertEquals(1, cache.numRegisteredCloseListeners());
IOUtils.close(reader, writer, dir, cache);
}
public void testCacheDifferentReaders() throws Exception {
IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY);
AtomicBoolean indexShard = new AtomicBoolean(true);
ShardRequestCache requestCacheStats = new ShardRequestCache();
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig());
writer.addDocument(newDoc(0, "foo"));
DirectoryReader reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", "bar", 1));
TermQueryBuilder termQuery = new TermQueryBuilder("id", "0");
TestEntity entity = new TestEntity(requestCacheStats, reader, indexShard, 0);
writer.updateDocument(new Term("id", "0"), newDoc(0, "bar"));
DirectoryReader secondReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", "bar", 1));
TestEntity secondEntity = new TestEntity(requestCacheStats, secondReader, indexShard, 0);
// initial cache
BytesReference value = cache.getOrCompute(entity, reader, termQuery.buildAsBytes());
assertEquals("foo", value.toUtf8());
assertEquals(0, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertEquals(1, entity.loaded);
assertEquals(1, cache.count());
assertTrue(requestCacheStats.stats().getMemorySize().bytesAsInt() > value.length());
final int cacheSize = requestCacheStats.stats().getMemorySize().bytesAsInt();
assertEquals(1, cache.numRegisteredCloseListeners());
// cache the second
value = cache.getOrCompute(secondEntity, secondReader, termQuery.buildAsBytes());
assertEquals("bar", value.toUtf8());
assertEquals(0, requestCacheStats.stats().getHitCount());
assertEquals(2, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertEquals(1, entity.loaded);
assertEquals(1, secondEntity.loaded);
assertEquals(2, cache.count());
assertTrue(requestCacheStats.stats().getMemorySize().bytesAsInt() > cacheSize + value.length());
assertEquals(2, cache.numRegisteredCloseListeners());
value = cache.getOrCompute(secondEntity, secondReader, termQuery.buildAsBytes());
assertEquals("bar", value.toUtf8());
assertEquals(1, requestCacheStats.stats().getHitCount());
assertEquals(2, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertEquals(1, entity.loaded);
assertEquals(1, secondEntity.loaded);
assertEquals(2, cache.count());
value = cache.getOrCompute(entity, reader, termQuery.buildAsBytes());
assertEquals("foo", value.toUtf8());
assertEquals(2, requestCacheStats.stats().getHitCount());
assertEquals(2, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertEquals(1, entity.loaded);
assertEquals(1, secondEntity.loaded);
assertEquals(2, cache.count());
reader.close();
cache.cleanCache();
assertEquals(2, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertEquals(1, entity.loaded);
assertEquals(1, secondEntity.loaded);
assertEquals(1, cache.count());
assertEquals(cacheSize, requestCacheStats.stats().getMemorySize().bytesAsInt());
assertEquals(1, cache.numRegisteredCloseListeners());
// release
if (randomBoolean()) {
secondReader.close();
} else {
indexShard.set(false); // closed shard but reader is still open
cache.clear(secondEntity);
}
cache.cleanCache();
assertEquals(2, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertEquals(1, entity.loaded);
assertEquals(1, secondEntity.loaded);
assertEquals(0, cache.count());
assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt());
IOUtils.close(secondReader, writer, dir, cache);
assertEquals(0, cache.numRegisteredCloseListeners());
}
public void testEviction() throws Exception {
IndicesRequestCache cache = new IndicesRequestCache(Settings.builder()
.put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), "113b") // the first 2 cache entries add up to 112b
.build());
AtomicBoolean indexShard = new AtomicBoolean(true);
ShardRequestCache requestCacheStats = new ShardRequestCache();
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig());
writer.addDocument(newDoc(0, "foo"));
DirectoryReader reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", "bar", 1));
TermQueryBuilder termQuery = new TermQueryBuilder("id", "0");
TestEntity entity = new TestEntity(requestCacheStats, reader, indexShard, 0);
writer.updateDocument(new Term("id", "0"), newDoc(0, "bar"));
DirectoryReader secondReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", "bar", 1));
TestEntity secondEntity = new TestEntity(requestCacheStats, secondReader, indexShard, 0);
writer.updateDocument(new Term("id", "0"), newDoc(0, "baz"));
DirectoryReader thirdReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", "bar", 1));
TestEntity thirddEntity = new TestEntity(requestCacheStats, thirdReader, indexShard, 0);
BytesReference value1 = cache.getOrCompute(entity, reader, termQuery.buildAsBytes());
assertEquals("foo", value1.toUtf8());
BytesReference value2 = cache.getOrCompute(secondEntity, secondReader, termQuery.buildAsBytes());
assertEquals("bar", value2.toUtf8());
logger.info(requestCacheStats.stats().getMemorySize().toString());
BytesReference value3 = cache.getOrCompute(thirddEntity, thirdReader, termQuery.buildAsBytes());
assertEquals("baz", value3.toUtf8());
assertEquals(2, cache.count());
assertEquals(1, requestCacheStats.stats().getEvictions());
IOUtils.close(reader, secondReader, thirdReader, writer, dir, cache);
}
public void testClearAllEntityIdentity() throws Exception {
IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY);
AtomicBoolean indexShard = new AtomicBoolean(true);
ShardRequestCache requestCacheStats = new ShardRequestCache();
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig());
writer.addDocument(newDoc(0, "foo"));
DirectoryReader reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", "bar", 1));
TermQueryBuilder termQuery = new TermQueryBuilder("id", "0");
TestEntity entity = new TestEntity(requestCacheStats, reader, indexShard, 0);
writer.updateDocument(new Term("id", "0"), newDoc(0, "bar"));
DirectoryReader secondReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", "bar", 1));
TestEntity secondEntity = new TestEntity(requestCacheStats, secondReader, indexShard, 0);
writer.updateDocument(new Term("id", "0"), newDoc(0, "baz"));
DirectoryReader thirdReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer, true), new ShardId("foo", "bar", 1));
AtomicBoolean differentIdentity = new AtomicBoolean(true);
TestEntity thirddEntity = new TestEntity(requestCacheStats, thirdReader, differentIdentity, 0);
BytesReference value1 = cache.getOrCompute(entity, reader, termQuery.buildAsBytes());
assertEquals("foo", value1.toUtf8());
BytesReference value2 = cache.getOrCompute(secondEntity, secondReader, termQuery.buildAsBytes());
assertEquals("bar", value2.toUtf8());
logger.info(requestCacheStats.stats().getMemorySize().toString());
BytesReference value3 = cache.getOrCompute(thirddEntity, thirdReader, termQuery.buildAsBytes());
assertEquals("baz", value3.toUtf8());
assertEquals(3, cache.count());
final long hitCount = requestCacheStats.stats().getHitCount();
// clear all for the indexShard Idendity even though is't still open
cache.clear(randomFrom(entity, secondEntity));
cache.cleanCache();
assertEquals(1, cache.count());
// third has not been validated since it's a different identity
value3 = cache.getOrCompute(thirddEntity, thirdReader, termQuery.buildAsBytes());
assertEquals(hitCount + 1, requestCacheStats.stats().getHitCount());
assertEquals("baz", value3.toUtf8());
IOUtils.close(reader, secondReader, thirdReader, writer, dir, cache);
}
public Iterable<Field> newDoc(int id, String value) {
return Arrays.asList(newField("id", Integer.toString(id), StringField.TYPE_STORED), newField("value", value,
StringField.TYPE_STORED));
}
private class TestEntity implements IndicesRequestCache.CacheEntity {
private final DirectoryReader reader;
private final int id;
private final AtomicBoolean identity;
private final ShardRequestCache shardRequestCache;
private int loaded;
private TestEntity(ShardRequestCache shardRequestCache, DirectoryReader reader, AtomicBoolean identity, int id) {
this.reader = reader;
this.id = id;
this.identity = identity;
this.shardRequestCache = shardRequestCache;
}
@Override
public IndicesRequestCache.Value loadValue() throws IOException {
IndexSearcher searcher = new IndexSearcher(reader);
TopDocs topDocs = searcher.search(new TermQuery(new Term("id", Integer.toString(this.id))), 1);
assertEquals(1, topDocs.totalHits);
Document document = reader.document(topDocs.scoreDocs[0].doc);
BytesArray value = new BytesArray(document.get("value"));
loaded++;
return new IndicesRequestCache.Value(value, value.length());
}
@Override
public void onCached(IndicesRequestCache.Key key, IndicesRequestCache.Value value) {
shardRequestCache.onCached(key, value);
}
@Override
public boolean isOpen() {
return identity.get();
}
@Override
public Object getCacheIdentity() {
return identity;
}
@Override
public void onHit() {
shardRequestCache.onHit();
}
@Override
public void onMiss() {
shardRequestCache.onMiss();
}
@Override
public void onRemoval(RemovalNotification<IndicesRequestCache.Key, IndicesRequestCache.Value> notification) {
shardRequestCache.onRemoval(notification.getKey(), notification.getValue(),
notification.getRemovalReason() == RemovalNotification.RemovalReason.EVICTED);
}
}
}

View File

@ -17,10 +17,11 @@
* under the License.
*/
package org.elasticsearch.indices.cache.query.terms;
package org.elasticsearch.indices;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.indices.TermsLookup;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;

View File

@ -48,7 +48,8 @@ import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.MergeSchedulerConfig;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.IndicesRequestCache;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
@ -78,7 +79,7 @@ public class IndexStatsIT extends ESIntegTestCase {
protected Settings nodeSettings(int nodeOrdinal) {
//Filter/Query cache is cleaned periodically, default is 60s, so make sure it runs often. Thread.sleep for 60s is bad
return Settings.settingsBuilder().put(super.nodeSettings(nodeOrdinal))
.put(IndicesRequestCache.INDICES_CACHE_REQUEST_CLEAN_INTERVAL.getKey(), "1ms")
.put(IndicesService.INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), "1ms")
.put(IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.getKey(), true)
.put(IndexModule.INDEX_QUERY_CACHE_TYPE_SETTING.getKey(), IndexModule.INDEX_QUERY_CACHE)
.build();

View File

@ -20,7 +20,6 @@
package org.elasticsearch.search.query;
import org.apache.lucene.util.English;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
@ -29,9 +28,7 @@ import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.MultiMatchQueryBuilder;
@ -42,7 +39,7 @@ import org.elasticsearch.index.query.WrapperQueryBuilder;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders;
import org.elasticsearch.index.search.MatchQuery;
import org.elasticsearch.index.search.MatchQuery.Type;
import org.elasticsearch.indices.cache.query.terms.TermsLookup;
import org.elasticsearch.indices.TermsLookup;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
@ -86,7 +83,6 @@ import static org.elasticsearch.index.query.QueryBuilders.termsQuery;
import static org.elasticsearch.index.query.QueryBuilders.typeQuery;
import static org.elasticsearch.index.query.QueryBuilders.wildcardQuery;
import static org.elasticsearch.index.query.QueryBuilders.wrapperQuery;
import static org.elasticsearch.test.VersionUtils.randomVersion;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFirstHit;

View File

@ -46,7 +46,7 @@ import org.elasticsearch.index.query.MoreLikeThisQueryBuilder;
import org.elasticsearch.index.query.MoreLikeThisQueryBuilder.Item;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.indices.cache.query.terms.TermsLookup;
import org.elasticsearch.indices.TermsLookup;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.test.ESIntegTestCase;

View File

@ -274,6 +274,12 @@ now doesn't accept a value less than `100ms` which prevents fsyncing too often i
The deprecated settings `index.cache.query.enable` and `indices.cache.query.size` have been removed and are replaced with
`index.requests.cache.enable` and `indices.requests.cache.size` respectively.
`indices.requests.cache.clean_interval` has been replaced with `indices.cache.clean_interval` and is no longer supported.
==== Field Data Cache Settings
`indices.fielddata.cache.clean_interval` has been replaced with `indices.cache.clean_interval` and is no longer supported.
==== Allocation settings
Allocation settings deprecated in 1.x have been removed:

View File

@ -108,7 +108,7 @@ import org.elasticsearch.index.MergeSchedulerConfig;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.index.IndexWarmer;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
import org.elasticsearch.indices.IndicesRequestCache;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.node.NodeMocksPlugin;
import org.elasticsearch.plugins.Plugin;