Query Cache: Support shard level query response caching

The query cache allow to cache the (binary serialized) response of the shard level query phase execution based on the actual request as the key. The cache is fully coherent with the semantics of NRT, with a refresh (that actually ended up refreshing) causing previous cached entries on the relevant shard to be invalidated and eventually evicted.

This change enables query caching as an opt in index level setting, called `index.cache.query.enable` and defaults to `false`. The setting can be changed dynamically on an index. The cache is only enabled for search requests with search_type count.

The indices query cache is a node level query cache. The `indices.cache.query.size` controls what is the size (bytes wise) the cache will take, and defaults to `1%` of the heap. Note, this cache is very effective with small values in it already. There is also the advanced option to set `indices.cache.query.expire` that allow to control after a certain time of inaccessibility the cache will be evicted.

Note, the request takes the search "body" as is (bytes), and uses it as the key. This means same JSON but with different key order will constitute different cache entries.

This change includes basic stats (shard level, index/indices level, and node level) for the query cache, showing how much is used and eviction rates.

While this is a good first step, and the goal is to get it in, there are a few things that would be great additions to this work, but they can be done as additional pull requests:

- More stats, specifically cache hit and cache miss, per shard.
- Request level flag, defaults to "not set" (inheriting what the setting is).
- Allowing to change the cache size using the cluster update settings API
- Consider enabling the cache to query phase also when asking hits are involved, note, this will only include the "top docs", not the actual hits.
- See if there is a performant manner to solve the "out of order" of keys in the JSON case.
- Maybe introduce a filter element, that is outside of the request, that is checked, and if it matches all docs in a shard, will not be used as part of the key. This will help with time based indices and moving windows for shards that fall "inside" the window to be more effective caching wise.
- Add a more infra level support in search context that allows for any element to mark the search as non deterministic (on top of the support for "now"), and use it to not cache search responses.

closes #7161
This commit is contained in:
Shay Banon 2014-06-28 16:54:32 +02:00
parent 35e67c84fa
commit 418ce50ec4
39 changed files with 1122 additions and 217 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.indices.cache.clear;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -34,6 +35,7 @@ public class ClearIndicesCacheRequest extends BroadcastOperationRequest<ClearInd
private boolean fieldDataCache = false;
private boolean idCache = false;
private boolean recycler = false;
private boolean queryCache = false;
private String[] fields = null;
private String[] filterKeys = null;
@ -54,6 +56,15 @@ public class ClearIndicesCacheRequest extends BroadcastOperationRequest<ClearInd
return this;
}
public boolean queryCache() {
return this.queryCache;
}
public ClearIndicesCacheRequest queryCache(boolean queryCache) {
this.queryCache = queryCache;
return this;
}
public boolean fieldDataCache() {
return this.fieldDataCache;
}
@ -107,6 +118,9 @@ public class ClearIndicesCacheRequest extends BroadcastOperationRequest<ClearInd
recycler = in.readBoolean();
fields = in.readStringArray();
filterKeys = in.readStringArray();
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
queryCache = in.readBoolean();
}
}
public void writeTo(StreamOutput out) throws IOException {
@ -117,7 +131,8 @@ public class ClearIndicesCacheRequest extends BroadcastOperationRequest<ClearInd
out.writeBoolean(recycler);
out.writeStringArrayNullable(fields);
out.writeStringArrayNullable(filterKeys);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeBoolean(queryCache);
}
}
}

View File

@ -38,6 +38,11 @@ public class ClearIndicesCacheRequestBuilder extends BroadcastOperationRequestBu
return this;
}
public ClearIndicesCacheRequestBuilder setQueryCache(boolean queryCache) {
request.queryCache(queryCache);
return this;
}
public ClearIndicesCacheRequestBuilder setFieldDataCache(boolean fieldDataCache) {
request.fieldDataCache(fieldDataCache);
return this;

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.indices.cache.clear;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -34,6 +35,7 @@ class ShardClearIndicesCacheRequest extends BroadcastShardOperationRequest {
private boolean fieldDataCache = false;
private boolean idCache = false;
private boolean recycler;
private boolean queryCache = false;
private String[] fields = null;
private String[] filterKeys = null;
@ -49,12 +51,17 @@ class ShardClearIndicesCacheRequest extends BroadcastShardOperationRequest {
fields = request.fields();
filterKeys = request.filterKeys();
recycler = request.recycler();
queryCache = request.queryCache();
}
public boolean filterCache() {
return filterCache;
}
public boolean queryCache() {
return queryCache;
}
public boolean fieldDataCache() {
return this.fieldDataCache;
}
@ -89,6 +96,9 @@ class ShardClearIndicesCacheRequest extends BroadcastShardOperationRequest {
recycler = in.readBoolean();
fields = in.readStringArray();
filterKeys = in.readStringArray();
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
queryCache = in.readBoolean();
}
}
@Override
@ -100,5 +110,8 @@ class ShardClearIndicesCacheRequest extends BroadcastShardOperationRequest {
out.writeBoolean(recycler);
out.writeStringArrayNullable(fields);
out.writeStringArrayNullable(filterKeys);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeBoolean(queryCache);
}
}
}

View File

@ -36,8 +36,10 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cache.filter.terms.IndicesTermsFilterCache;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -53,16 +55,16 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
private final IndicesService indicesService;
private final IndicesTermsFilterCache termsFilterCache;
private final CacheRecycler cacheRecycler;
private final IndicesQueryCache indicesQueryCache;
@Inject
public TransportClearIndicesCacheAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, IndicesService indicesService, IndicesTermsFilterCache termsFilterCache,
CacheRecycler cacheRecycler, ActionFilters actionFilters) {
IndicesQueryCache indicesQueryCache, ActionFilters actionFilters) {
super(settings, ClearIndicesCacheAction.NAME, threadPool, clusterService, transportService, actionFilters);
this.indicesService = indicesService;
this.termsFilterCache = termsFilterCache;
this.cacheRecycler = cacheRecycler;
this.indicesQueryCache = indicesQueryCache;
}
@Override
@ -116,6 +118,7 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
protected ShardClearIndicesCacheResponse shardOperation(ShardClearIndicesCacheRequest request) throws ElasticsearchException {
IndexService service = indicesService.indexService(request.index());
if (service != null) {
IndexShard shard = service.shard(request.shardId());
// we always clear the query cache
service.cache().queryParserCache().clear();
boolean clearedAtLeastOne = false;
@ -139,6 +142,10 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
}
}
}
if (request.queryCache()) {
clearedAtLeastOne = true;
indicesQueryCache.clear(shard);
}
if (request.recycler()) {
logger.debug("Clear CacheRecycler on index [{}]", service.index());
clearedAtLeastOne = true;
@ -158,6 +165,7 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
service.cache().clear("api");
service.fieldData().clear();
termsFilterCache.clear("api");
indicesQueryCache.clear(shard);
}
}
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.cache.filter.FilterCacheStats;
import org.elasticsearch.index.cache.id.IdCacheStats;
import org.elasticsearch.index.cache.query.QueryCacheStats;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.fielddata.FieldDataStats;
import org.elasticsearch.index.flush.FlushStats;
@ -111,6 +112,9 @@ public class CommonStats implements Streamable, ToXContent {
case Suggest:
suggest = new SuggestStats();
break;
case QueryCache:
queryCache = new QueryCacheStats();
break;
default:
throw new IllegalStateException("Unknown Flag: " + flag);
}
@ -174,6 +178,9 @@ public class CommonStats implements Streamable, ToXContent {
case Suggest:
suggest = indexShard.suggestStats();
break;
case QueryCache:
queryCache = indexShard.queryCache().stats();
break;
default:
throw new IllegalStateException("Unknown Flag: " + flag);
}
@ -231,6 +238,9 @@ public class CommonStats implements Streamable, ToXContent {
@Nullable
public SuggestStats suggest;
@Nullable
public QueryCacheStats queryCache;
public void add(CommonStats stats) {
if (docs == null) {
if (stats.getDocs() != null) {
@ -370,6 +380,14 @@ public class CommonStats implements Streamable, ToXContent {
} else {
suggest.add(stats.getSuggest());
}
if (queryCache == null) {
if (stats.getQueryCache() != null) {
queryCache = new QueryCacheStats();
queryCache.add(stats.getQueryCache());
}
} else {
queryCache.add(stats.getQueryCache());
}
}
@Nullable
@ -457,6 +475,11 @@ public class CommonStats implements Streamable, ToXContent {
return suggest;
}
@Nullable
public QueryCacheStats getQueryCache() {
return queryCache;
}
public static CommonStats readCommonStats(StreamInput in) throws IOException {
CommonStats stats = new CommonStats();
stats.readFrom(in);
@ -514,6 +537,9 @@ public class CommonStats implements Streamable, ToXContent {
if (in.getVersion().onOrAfter(Version.V_1_2_0)) {
suggest = in.readOptionalStreamable(new SuggestStats());
}
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
queryCache = in.readOptionalStreamable(new QueryCacheStats());
}
}
@Override
@ -612,6 +638,9 @@ public class CommonStats implements Streamable, ToXContent {
if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
out.writeOptionalStreamable(suggest);
}
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeOptionalStreamable(queryCache);
}
}
// note, requires a wrapping object
@ -668,6 +697,9 @@ public class CommonStats implements Streamable, ToXContent {
if (suggest != null) {
suggest.toXContent(builder, params);
}
if (queryCache != null) {
queryCache.toXContent(builder, params);
}
return builder;
}
}

View File

@ -224,7 +224,8 @@ public class CommonStatsFlags implements Streamable, Cloneable {
Completion("completion"),
Segments("segments"),
Translog("translog"),
Suggest("suggest");
Suggest("suggest"),
QueryCache("query_cache");
private final String restName;

View File

@ -256,6 +256,15 @@ public class IndicesStatsRequest extends BroadcastOperationRequest<IndicesStatsR
return flags.isSet(Flag.Suggest);
}
public IndicesStatsRequest queryCache(boolean queryCache) {
flags.set(Flag.QueryCache, queryCache);
return this;
}
public boolean queryCache() {
return flags.isSet(Flag.QueryCache);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -163,6 +163,11 @@ public class IndicesStatsRequestBuilder extends BroadcastOperationRequestBuilder
return this;
}
public IndicesStatsRequestBuilder setQueryCache(boolean queryCache) {
request.queryCache(queryCache);
return this;
}
@Override
protected void doExecute(ActionListener<IndicesStatsResponse> listener) {
client.stats(request, listener);

View File

@ -192,6 +192,9 @@ public class TransportIndicesStatsAction extends TransportBroadcastOperationActi
if (request.request.suggest()) {
flags.set(CommonStatsFlags.Flag.Suggest);
}
if (request.request.queryCache()) {
flags.set(CommonStatsFlags.Flag.QueryCache);
}
return new ShardStats(indexShard, flags);
}

View File

@ -35,6 +35,7 @@ import org.elasticsearch.search.fetch.FetchSearchResultProvider;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;
import static org.elasticsearch.action.search.type.TransportSearchHelper.buildScrollId;
@ -55,7 +56,7 @@ public class TransportSearchCountAction extends TransportSearchTypeAction {
new AsyncAction(searchRequest, listener).start();
}
private class AsyncAction extends BaseAsyncAction<QuerySearchResult> {
private class AsyncAction extends BaseAsyncAction<QuerySearchResultProvider> {
private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
super(request, listener);
@ -67,7 +68,7 @@ public class TransportSearchCountAction extends TransportSearchTypeAction {
}
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<QuerySearchResult> listener) {
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<QuerySearchResultProvider> listener) {
searchService.sendExecuteQuery(node, request, listener);
}

View File

@ -40,6 +40,7 @@ import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.atomic.AtomicInteger;
@ -60,7 +61,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
new AsyncAction(searchRequest, listener).start();
}
private class AsyncAction extends BaseAsyncAction<QuerySearchResult> {
private class AsyncAction extends BaseAsyncAction<QuerySearchResultProvider> {
final AtomicArray<FetchSearchResult> fetchResults;
final AtomicArray<IntArrayList> docIdsToLoad;
@ -77,7 +78,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
}
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<QuerySearchResult> listener) {
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<QuerySearchResultProvider> listener) {
searchService.sendExecuteQuery(node, request, listener);
}
@ -97,9 +98,9 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
);
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResult queryResult = firstResults.get(entry.index);
QuerySearchResultProvider queryResult = firstResults.get(entry.index);
DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.cache.query;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
/**
*/
public class QueryCacheStats implements Streamable, ToXContent {
long memorySize;
long evictions;
public QueryCacheStats() {
}
public QueryCacheStats(long memorySize, long evictions) {
this.memorySize = memorySize;
this.evictions = evictions;
}
public void add(QueryCacheStats stats) {
this.memorySize += stats.memorySize;
this.evictions += stats.evictions;
}
public long getMemorySizeInBytes() {
return this.memorySize;
}
public ByteSizeValue getMemorySize() {
return new ByteSizeValue(memorySize);
}
public long getEvictions() {
return this.evictions;
}
@Override
public void readFrom(StreamInput in) throws IOException {
memorySize = in.readVLong();
evictions = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(memorySize);
out.writeVLong(evictions);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.QUERY_CACHE_STATS);
builder.byteSizeField(Fields.MEMORY_SIZE_IN_BYTES, Fields.MEMORY_SIZE, memorySize);
builder.field(Fields.EVICTIONS, getEvictions());
builder.endObject();
return builder;
}
static final class Fields {
static final XContentBuilderString QUERY_CACHE_STATS = new XContentBuilderString("query_cache");
static final XContentBuilderString MEMORY_SIZE = new XContentBuilderString("memory_size");
static final XContentBuilderString MEMORY_SIZE_IN_BYTES = new XContentBuilderString("memory_size_in_bytes");
static final XContentBuilderString EVICTIONS = new XContentBuilderString("evictions");
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.cache.query;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.apache.lucene.search.DocIdSet;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.docset.DocIdSets;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.cache.filter.FilterCacheStats;
import org.elasticsearch.index.cache.filter.weighted.WeightedFilterCache;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
/**
*/
public class ShardQueryCache extends AbstractIndexShardComponent implements RemovalListener<IndicesQueryCache.Key, BytesReference> {
final CounterMetric evictionsMetric = new CounterMetric();
final CounterMetric totalMetric = new CounterMetric();
@Inject
public ShardQueryCache(ShardId shardId, @IndexSettings Settings indexSettings) {
super(shardId, indexSettings);
}
public QueryCacheStats stats() {
return new QueryCacheStats(totalMetric.count(), evictionsMetric.count());
}
public void onCached(IndicesQueryCache.Key key, BytesReference value) {
totalMetric.inc(key.ramBytesUsed() + value.length());
}
@Override
public void onRemoval(RemovalNotification<IndicesQueryCache.Key, BytesReference> removalNotification) {
if (removalNotification.wasEvicted()) {
evictionsMetric.inc();
}
long dec = 0;
if (removalNotification.getKey() != null) {
dec += removalNotification.getKey().ramBytesUsed();
}
if (removalNotification.getValue() != null) {
dec += removalNotification.getValue().length();
}
totalMetric.dec(dec);
}
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.cache.query;
import org.elasticsearch.common.inject.AbstractModule;
/**
*/
public class ShardQueryCacheModule extends AbstractModule {
@Override
protected void configure() {
bind(ShardQueryCache.class).asEagerSingleton();
}
}

View File

@ -34,6 +34,7 @@ import org.elasticsearch.index.aliases.IndexAliasesService;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.filter.ShardFilterCacheModule;
import org.elasticsearch.index.cache.query.ShardQueryCacheModule;
import org.elasticsearch.index.deletionpolicy.DeletionPolicyModule;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineModule;
@ -329,6 +330,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
modules.add(new MergePolicyModule(indexSettings));
modules.add(new MergeSchedulerModule(indexSettings));
modules.add(new ShardFilterCacheModule());
modules.add(new ShardQueryCacheModule());
modules.add(new ShardFieldDataModule());
modules.add(new TranslogModule(indexSettings));
modules.add(new EngineModule(indexSettings));

View File

@ -40,6 +40,7 @@ import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.support.AbstractIndexStore;
import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.ttl.IndicesTTLService;
import org.elasticsearch.indices.warmer.InternalIndicesWarmer;
@ -119,6 +120,7 @@ public class IndexDynamicSettingsModule extends AbstractModule {
indexDynamicSettings.addDynamicSetting(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_PERIOD, Validator.TIME);
indexDynamicSettings.addDynamicSetting(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH);
indexDynamicSettings.addDynamicSetting(InternalIndicesWarmer.INDEX_WARMER_ENABLED);
indexDynamicSettings.addDynamicSetting(IndicesQueryCache.INDEX_CACHE_QUERY_ENABLED, Validator.BOOLEAN);
}
public void addDynamicSettings(String... settings) {

View File

@ -27,6 +27,7 @@ import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.cache.filter.FilterCacheStats;
import org.elasticsearch.index.cache.filter.ShardFilterCache;
import org.elasticsearch.index.cache.id.IdCacheStats;
import org.elasticsearch.index.cache.query.ShardQueryCache;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
@ -77,6 +78,8 @@ public interface IndexShard extends IndexShardComponent {
ShardFilterCache filterCache();
ShardQueryCache queryCache();
ShardFieldData fieldData();
ShardRouting routingEntry();

View File

@ -50,6 +50,7 @@ import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.filter.FilterCacheStats;
import org.elasticsearch.index.cache.filter.ShardFilterCache;
import org.elasticsearch.index.cache.id.IdCacheStats;
import org.elasticsearch.index.cache.query.ShardQueryCache;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.*;
@ -121,6 +122,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
private final ShardGetService getService;
private final ShardIndexWarmerService shardWarmerService;
private final ShardFilterCache shardFilterCache;
private final ShardQueryCache shardQueryCache;
private final ShardFieldData shardFieldData;
private final PercolatorQueriesRegistry percolatorQueriesRegistry;
private final ShardPercolateService shardPercolateService;
@ -153,7 +155,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, Engine engine, MergeSchedulerProvider mergeScheduler, Translog translog,
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService, ShardGetService getService, ShardSearchService searchService, ShardIndexWarmerService shardWarmerService,
ShardFilterCache shardFilterCache, ShardFieldData shardFieldData, PercolatorQueriesRegistry percolatorQueriesRegistry, ShardPercolateService shardPercolateService, CodecService codecService,
ShardTermVectorService termVectorService, IndexFieldDataService indexFieldDataService, IndexService indexService, ShardSuggestService shardSuggestService) {
ShardTermVectorService termVectorService, IndexFieldDataService indexFieldDataService, IndexService indexService, ShardSuggestService shardSuggestService, ShardQueryCache shardQueryCache) {
super(shardId, indexSettings);
this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
this.indexSettingsService = indexSettingsService;
@ -172,6 +174,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
this.searchService = searchService;
this.shardWarmerService = shardWarmerService;
this.shardFilterCache = shardFilterCache;
this.shardQueryCache = shardQueryCache;
this.shardFieldData = shardFieldData;
this.percolatorQueriesRegistry = percolatorQueriesRegistry;
this.shardPercolateService = shardPercolateService;
@ -256,6 +259,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
return this.shardFilterCache;
}
@Override
public ShardQueryCache queryCache() {
return this.shardQueryCache;
}
@Override
public ShardFieldData fieldData() {
return this.shardFieldData;

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.analysis.IndicesAnalysisModule;
import org.elasticsearch.indices.cache.filter.IndicesFilterCache;
import org.elasticsearch.indices.cache.filter.terms.IndicesTermsFilterCache;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;
@ -72,6 +73,7 @@ public class IndicesModule extends AbstractModule implements SpawnModules {
bind(IndicesClusterStateService.class).asEagerSingleton();
bind(IndexingMemoryController.class).asEagerSingleton();
bind(IndicesFilterCache.class).asEagerSingleton();
bind(IndicesQueryCache.class).asEagerSingleton();
bind(IndicesFieldDataCache.class).asEagerSingleton();
bind(IndicesTermsFilterCache.class).asEagerSingleton();
bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton();

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.cache.filter.FilterCacheStats;
import org.elasticsearch.index.cache.id.IdCacheStats;
import org.elasticsearch.index.cache.query.QueryCacheStats;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.fielddata.FieldDataStats;
import org.elasticsearch.index.flush.FlushStats;
@ -136,6 +137,11 @@ public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
return stats.getFilterCache();
}
@Nullable
public QueryCacheStats getQueryCache() {
return stats.getQueryCache();
}
@Nullable
public IdCacheStats getIdCache() {
return stats.getIdCache();

View File

@ -185,7 +185,6 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList
keys.add(it.next());
it.remove();
}
cache.cleanUp();
if (!keys.isEmpty()) {
for (Iterator<WeightedFilterCache.FilterCacheKey> it = cache.asMap().keySet().iterator(); it.hasNext(); ) {
WeightedFilterCache.FilterCacheKey filterCacheKey = it.next();
@ -195,6 +194,7 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList
}
}
}
cache.cleanUp();
schedule();
keys.clear();
}

View File

@ -0,0 +1,498 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.cache.query;
import com.carrotsearch.hppc.ObjectOpenHashSet;
import com.carrotsearch.hppc.ObjectSet;
import com.google.common.cache.*;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.MemorySizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QueryPhase;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.Strings.hasLength;
/**
* The indices query cache allows to cache a shard level query stage responses, helping with improving
* similar requests that are potentially expensive (because of aggs for example). The cache is fully coherent
* with the semantics of NRT (the index reader version is part of the cache key), and relies on size based
* eviction to evict old reader associated cache entries as well as scheduler reaper to clean readers that
* are no longer used or closed shards.
* <p/>
* Currently, the cache is only enabled for {@link SearchType#COUNT}, and can only be opted in on an index
* level setting that can be dynamically changed and defaults to false.
* <p/>
* There are still several TODOs left in this class, some easily addressable, some more complex, but the support
* is functional.
*/
public class IndicesQueryCache extends AbstractComponent implements RemovalListener<IndicesQueryCache.Key, BytesReference> {
/**
* A setting to enable or disable query caching on an index level. Its dynamic by default
* since we are checking on the cluster state IndexMetaData always.
*/
public static final String INDEX_CACHE_QUERY_ENABLED = "index.cache.query.enable";
public static final String INDICES_CACHE_QUERY_CLEAN_INTERVAL = "indices.cache.query.clean_interval";
public static final String INDICES_CACHE_QUERY_SIZE = "indices.cache.query.size";
public static final String INDICES_CACHE_QUERY_EXPIRE = "indices.cache.query.expire";
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final TimeValue cleanInterval;
private final Reaper reaper;
final ConcurrentMap<CleanupKey, Boolean> registeredClosedListeners = ConcurrentCollections.newConcurrentMap();
final Set<CleanupKey> keysToClean = ConcurrentCollections.newConcurrentSet();
//TODO make these changes configurable on the cluster level
private volatile String size;
private volatile TimeValue expire;
//TODO expose this in our stats APIs
private volatile Cache<Key, BytesReference> cache;
@Inject
public IndicesQueryCache(Settings settings, ClusterService clusterService, ThreadPool threadPool) {
super(settings);
this.clusterService = clusterService;
this.threadPool = threadPool;
this.cleanInterval = settings.getAsTime(INDICES_CACHE_QUERY_CLEAN_INTERVAL, TimeValue.timeValueSeconds(60));
// this cache can be very small yet still be very effective
this.size = settings.get(INDICES_CACHE_QUERY_SIZE, "1%");
this.expire = settings.getAsTime(INDICES_CACHE_QUERY_EXPIRE, null);
buildCache();
this.reaper = new Reaper();
threadPool.schedule(cleanInterval, ThreadPool.Names.SAME, reaper);
}
private void buildCache() {
long sizeInBytes = MemorySizeValue.parseBytesSizeValueOrHeapRatio(size).bytes();
if (sizeInBytes > ByteSizeValue.MAX_GUAVA_CACHE_SIZE.bytes()) {
logger.warn("reducing requested query cache size of [{}] to the maximum allowed size of [{}]", new ByteSizeValue(sizeInBytes), ByteSizeValue.MAX_GUAVA_CACHE_SIZE);
sizeInBytes = ByteSizeValue.MAX_GUAVA_CACHE_SIZE.bytes();
// Even though it feels wrong for size and sizeInBytes to get out of
// sync we don't update size here because it might cause the cache
// to be rebuilt every time new settings are applied.
}
CacheBuilder<Key, BytesReference> cacheBuilder = CacheBuilder.newBuilder()
.maximumWeight(sizeInBytes).weigher(new QueryCacheWeigher()).removalListener(this);
// defaults to 4, but this is a busy map for all indices, increase it a bit
cacheBuilder.concurrencyLevel(16);
if (expire != null) {
cacheBuilder.expireAfterAccess(expire.millis(), TimeUnit.MILLISECONDS);
}
cache = cacheBuilder.build();
}
private static class QueryCacheWeigher implements Weigher<Key, BytesReference> {
@Override
public int weigh(Key key, BytesReference value) {
// TODO add sizeInBytes to BytesReference, since it might be paged.... (Accountable)
return (int) (key.ramBytesUsed() + value.length());
}
}
public void close() {
reaper.close();
cache.invalidateAll();
}
public void clear(IndexShard shard) {
if (shard == null) {
return;
}
keysToClean.add(new CleanupKey(shard, -1));
logger.trace("{} explicit cache clear", shard.shardId());
reaper.reap();
}
@Override
public void onRemoval(RemovalNotification<Key, BytesReference> notification) {
if (notification.getKey() == null) {
return;
}
notification.getKey().shard.queryCache().onRemoval(notification);
}
/**
* Can the shard request be cached at all?
*/
public boolean canCache(ShardSearchRequest request, SearchContext context) {
// TODO: for now, template is not supported, though we could use the generated bytes as the key
if (hasLength(request.templateSource())) {
return false;
}
// for now, only enable it for search type count
if (request.searchType() != SearchType.COUNT) {
return false;
}
IndexMetaData index = clusterService.state().getMetaData().index(request.index());
if (index == null) { // in case we didn't yet have the cluster state, or it just got deleted
return false;
}
if (!index.settings().getAsBoolean(INDEX_CACHE_QUERY_ENABLED, Boolean.FALSE)) {
return false;
}
// if the reader is not a directory reader, we can't get the version from it
if (!(context.searcher().getIndexReader() instanceof DirectoryReader)) {
return false;
}
// if now in millis is used (or in the future, a more generic "isDeterministic" flag
// then we can't cache based on "now" key within the search request, as it is not deterministic
if (context.nowInMillisUsed()) {
return false;
}
// TODO allow to have a queryCache level flag on the request as well
return true;
}
/**
* Loads the cache result, computing it if needed by executing the query phase. The combination of load + compute allows
* to have a single load operation that will cause other requests with the same key to wait till its loaded an reuse
* the same cache.
*/
public QuerySearchResultProvider load(final ShardSearchRequest request, final SearchContext context, final QueryPhase queryPhase) throws Exception {
assert canCache(request, context);
Key key = buildKey(request, context);
Loader loader = new Loader(queryPhase, context, key);
BytesReference value = cache.get(key, loader);
if (loader.isLoaded()) {
// see if its the first time we see this reader, and make sure to register a cleanup key
CleanupKey cleanupKey = new CleanupKey(context.indexShard(), ((DirectoryReader) context.searcher().getIndexReader()).getVersion());
if (!registeredClosedListeners.containsKey(cleanupKey)) {
Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE);
if (previous == null) {
context.searcher().getIndexReader().addReaderClosedListener(cleanupKey);
}
}
}
// try and be smart, and reuse an already loaded and constructed QueryResult of in VM execution
return new BytesQuerySearchResult(context.id(), context.shardTarget(), value, loader.isLoaded() ? context.queryResult() : null);
}
private static class Loader implements Callable<BytesReference> {
private final QueryPhase queryPhase;
private final SearchContext context;
private final IndicesQueryCache.Key key;
private boolean loaded;
Loader(QueryPhase queryPhase, SearchContext context, IndicesQueryCache.Key key) {
this.queryPhase = queryPhase;
this.context = context;
this.key = key;
}
public boolean isLoaded() {
return this.loaded;
}
@Override
public BytesReference call() throws Exception {
queryPhase.execute(context);
BytesStreamOutput out = new BytesStreamOutput();
context.queryResult().writeToNoId(out);
// for now, keep the paged data structure, which might have unused bytes to fill a page, but better to keep
// the memory properly paged instead of having varied sized bytes
BytesReference value = out.bytes();
assert verifyCacheSerializationSameAsQueryResult(value, context, context.queryResult());
loaded = true;
key.shard.queryCache().onCached(key, value);
return value;
}
}
public static class Key implements Accountable {
public final IndexShard shard; // use as identity equality
public final long readerVersion; // use the reader version to now keep a reference to a "short" lived reader until its reaped
public final BytesReference value;
Key(IndexShard shard, long readerVersion, BytesReference value) {
this.shard = shard;
this.readerVersion = readerVersion;
this.value = value;
}
@Override
public long ramBytesUsed() {
return RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_LONG + value.length();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
Key key = (Key) o;
if (readerVersion != key.readerVersion) return false;
if (!shard.equals(key.shard)) return false;
if (!value.equals(key.value)) return false;
return true;
}
@Override
public int hashCode() {
int result = shard.hashCode();
result = 31 * result + (int) (readerVersion ^ (readerVersion >>> 32));
result = 31 * result + value.hashCode();
return result;
}
}
private class CleanupKey implements IndexReader.ReaderClosedListener {
IndexShard indexShard;
long readerVersion; // use the reader version to now keep a reference to a "short" lived reader until its reaped
private CleanupKey(IndexShard indexShard, long readerVersion) {
this.indexShard = indexShard;
this.readerVersion = readerVersion;
}
@Override
public void onClose(IndexReader reader) {
Boolean remove = registeredClosedListeners.remove(this);
if (remove != null) {
keysToClean.add(this);
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
CleanupKey that = (CleanupKey) o;
if (readerVersion != that.readerVersion) return false;
if (!indexShard.equals(that.indexShard)) return false;
return true;
}
@Override
public int hashCode() {
int result = indexShard.hashCode();
result = 31 * result + (int) (readerVersion ^ (readerVersion >>> 32));
return result;
}
}
private class Reaper implements Runnable {
private final ObjectSet<CleanupKey> currentKeysToClean = ObjectOpenHashSet.newInstance();
private final ObjectSet<IndexShard> currentFullClean = ObjectOpenHashSet.newInstance();
private volatile boolean closed;
void close() {
closed = true;
}
@Override
public void run() {
if (closed) {
return;
}
if (keysToClean.isEmpty()) {
schedule();
return;
}
try {
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
reap();
schedule();
}
});
} catch (EsRejectedExecutionException ex) {
logger.debug("Can not run ReaderCleaner - execution rejected", ex);
}
}
private void schedule() {
try {
threadPool.schedule(cleanInterval, ThreadPool.Names.SAME, this);
} catch (EsRejectedExecutionException ex) {
logger.debug("Can not schedule ReaderCleaner - execution rejected", ex);
}
}
synchronized void reap() {
currentKeysToClean.clear();
currentFullClean.clear();
for (Iterator<CleanupKey> iterator = keysToClean.iterator(); iterator.hasNext(); ) {
CleanupKey cleanupKey = iterator.next();
iterator.remove();
if (cleanupKey.readerVersion == -1 || cleanupKey.indexShard.state() == IndexShardState.CLOSED) {
// -1 indicates full cleanup, as does a closed shard
currentFullClean.add(cleanupKey.indexShard);
} else {
currentKeysToClean.add(cleanupKey);
}
}
if (!currentKeysToClean.isEmpty() || !currentFullClean.isEmpty()) {
CleanupKey lookupKey = new CleanupKey(null, -1);
for (Iterator<Key> iterator = cache.asMap().keySet().iterator(); iterator.hasNext(); ) {
Key key = iterator.next();
if (currentFullClean.contains(key.shard)) {
iterator.remove();
} else {
lookupKey.indexShard = key.shard;
lookupKey.readerVersion = key.readerVersion;
if (currentKeysToClean.contains(lookupKey)) {
iterator.remove();
}
}
}
}
cache.cleanUp();
currentKeysToClean.clear();
currentFullClean.clear();
}
}
private static boolean verifyCacheSerializationSameAsQueryResult(BytesReference cacheData, SearchContext context, QuerySearchResult result) throws Exception {
BytesStreamOutput out1 = new BytesStreamOutput();
new BytesQuerySearchResult(context.id(), context.shardTarget(), cacheData).writeTo(out1);
BytesStreamOutput out2 = new BytesStreamOutput();
result.writeTo(out2);
return out1.bytes().equals(out2.bytes());
}
private static Key buildKey(ShardSearchRequest request, SearchContext context) throws Exception {
// TODO: for now, this will create different keys for different JSON order
// TODO: tricky to get around this, need to parse and order all, which can be expensive
BytesStreamOutput out = new BytesStreamOutput();
request.writeTo(out, true);
// copy it over, most requests are small, we might as well copy to make sure we are not sliced...
// we could potentially keep it without copying, but then pay the price of extra unused bytes up to a page
return new Key(context.indexShard(),
((DirectoryReader) context.searcher().getIndexReader()).getVersion(),
out.bytes().copyBytesArray());
}
/**
* this class aim is to just provide an on the wire *write* format that is the same as {@link QuerySearchResult}
* and also provide a nice wrapper for in node communication for an already constructed {@link QuerySearchResult}.
*/
private static class BytesQuerySearchResult extends QuerySearchResultProvider {
private long id;
private SearchShardTarget shardTarget;
private BytesReference data;
private transient QuerySearchResult result;
private BytesQuerySearchResult(long id, SearchShardTarget shardTarget, BytesReference data) {
this(id, shardTarget, data, null);
}
private BytesQuerySearchResult(long id, SearchShardTarget shardTarget, BytesReference data, QuerySearchResult result) {
this.id = id;
this.shardTarget = shardTarget;
this.data = data;
this.result = result;
}
@Override
public boolean includeFetch() {
return false;
}
@Override
public QuerySearchResult queryResult() {
if (result == null) {
result = new QuerySearchResult(id, shardTarget);
try {
result.readFromWithId(id, data.streamInput());
} catch (Exception e) {
throw new ElasticsearchParseException("failed to parse a cached query", e);
}
}
return result;
}
@Override
public long id() {
return id;
}
@Override
public SearchShardTarget shardTarget() {
return shardTarget;
}
@Override
public void shardTarget(SearchShardTarget shardTarget) {
this.shardTarget = shardTarget;
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new ElasticsearchIllegalStateException("readFrom should not be called");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
// shardTarget.writeTo(out); not needed
data.writeTo(out); // we need to write teh bytes as is, to be the same as QuerySearchResult
}
}
}

View File

@ -341,7 +341,7 @@ public class PercolateContext extends SearchContext {
}
@Override
public long nowInMillis() {
protected long nowInMillisImpl() {
throw new UnsupportedOperationException();
}

View File

@ -80,6 +80,7 @@ public class RestIndicesStatsAction extends BaseRestHandler {
indicesStatsRequest.fieldData(metrics.contains("fielddata"));
indicesStatsRequest.completion(metrics.contains("completion"));
indicesStatsRequest.suggest(metrics.contains("suggest"));
indicesStatsRequest.queryCache(metrics.contains("query_cache"));
}
if (request.hasParam("groups")) {

View File

@ -123,6 +123,12 @@ public class RestIndicesAction extends AbstractCatAction {
table.addCell("filter_cache.evictions", "sibling:pri;alias:fce,filterCacheEvictions;default:false;text-align:right;desc:filter cache evictions");
table.addCell("pri.filter_cache.evictions", "default:false;text-align:right;desc:filter cache evictions");
table.addCell("query_cache.memory_size", "sibling:pri;alias:qcm,queryCacheMemory;default:false;text-align:right;desc:used query cache");
table.addCell("pri.query_cache.memory_size", "default:false;text-align:right;desc:used query cache");
table.addCell("query_cache.evictions", "sibling:pri;alias:qce,queryCacheEvictions;default:false;text-align:right;desc:query cache evictions");
table.addCell("pri.query_cache.evictions", "default:false;text-align:right;desc:query cache evictions");
table.addCell("flush.total", "sibling:pri;alias:ft,flushTotal;default:false;text-align:right;desc:number of flushes");
table.addCell("pri.flush.total", "default:false;text-align:right;desc:number of flushes");
@ -302,6 +308,12 @@ public class RestIndicesAction extends AbstractCatAction {
table.addCell(indexStats == null ? null : indexStats.getTotal().getFilterCache().getEvictions());
table.addCell(indexStats == null ? null : indexStats.getPrimaries().getFilterCache().getEvictions());
table.addCell(indexStats == null ? null : indexStats.getTotal().getQueryCache().getMemorySize());
table.addCell(indexStats == null ? null : indexStats.getPrimaries().getQueryCache().getMemorySize());
table.addCell(indexStats == null ? null : indexStats.getTotal().getQueryCache().getEvictions());
table.addCell(indexStats == null ? null : indexStats.getPrimaries().getQueryCache().getEvictions());
table.addCell(indexStats == null ? null : indexStats.getTotal().getFlush().getTotal());
table.addCell(indexStats == null ? null : indexStats.getPrimaries().getFlush().getTotal());

View File

@ -122,6 +122,9 @@ public class RestNodesAction extends AbstractCatAction {
table.addCell("filter_cache.memory_size", "alias:fcm,filterCacheMemory;default:false;text-align:right;desc:used filter cache");
table.addCell("filter_cache.evictions", "alias:fce,filterCacheEvictions;default:false;text-align:right;desc:filter cache evictions");
table.addCell("query_cache.memory_size", "alias:qcm,queryCacheMemory;default:false;text-align:right;desc:used query cache");
table.addCell("query_cache.evictions", "alias:qce,queryCacheEvictions;default:false;text-align:right;desc:query cache evictions");
table.addCell("flush.total", "alias:ft,flushTotal;default:false;text-align:right;desc:number of flushes");
table.addCell("flush.total_time", "alias:ftt,flushTotalTime;default:false;text-align:right;desc:time spent in flush");
@ -226,6 +229,9 @@ public class RestNodesAction extends AbstractCatAction {
table.addCell(stats == null ? null : stats.getIndices().getFilterCache().getMemorySize());
table.addCell(stats == null ? null : stats.getIndices().getFilterCache().getEvictions());
table.addCell(stats == null ? null : stats.getIndices().getQueryCache().getMemorySize());
table.addCell(stats == null ? null : stats.getIndices().getQueryCache().getEvictions());
table.addCell(stats == null ? null : stats.getIndices().getFlush().getTotal());
table.addCell(stats == null ? null : stats.getIndices().getFlush().getTotalTime());

View File

@ -36,6 +36,7 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasables;
@ -61,6 +62,7 @@ import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.warmer.IndicesWarmer;
import org.elasticsearch.indices.warmer.IndicesWarmer.WarmerContext;
import org.elasticsearch.script.ExecutableScript;
@ -84,6 +86,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicLong;
@ -125,6 +128,8 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
private final FetchPhase fetchPhase;
private final IndicesQueryCache indicesQueryCache;
private final long defaultKeepAlive;
private final ScheduledFuture<?> keepAliveReaper;
@ -137,7 +142,8 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
@Inject
public SearchService(Settings settings, ClusterService clusterService, IndicesService indicesService, IndicesLifecycle indicesLifecycle, IndicesWarmer indicesWarmer, ThreadPool threadPool,
ScriptService scriptService, CacheRecycler cacheRecycler, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase) {
ScriptService scriptService, CacheRecycler cacheRecycler, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase,
IndicesQueryCache indicesQueryCache) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
@ -150,6 +156,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
this.dfsPhase = dfsPhase;
this.queryPhase = queryPhase;
this.fetchPhase = fetchPhase;
this.indicesQueryCache = indicesQueryCache;
TimeValue keepAliveInterval = componentSettings.getAsTime(KEEPALIVE_INTERVAL_COMPONENENT_KEY, timeValueMinutes(1));
// we can have 5 minutes here, since we make sure to clean with search requests and when shard/index closes
@ -252,21 +259,35 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
}
}
public QuerySearchResult executeQueryPhase(ShardSearchRequest request) throws ElasticsearchException {
public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) throws ElasticsearchException {
SearchContext context = createAndPutContext(request);
try {
context.indexShard().searchService().onPreQueryPhase(context);
long time = System.nanoTime();
contextProcessing(context);
QuerySearchResultProvider result;
boolean canCache = indicesQueryCache.canCache(request, context);
if (canCache) {
result = indicesQueryCache.load(request, context, queryPhase);
} else {
queryPhase.execute(context);
result = context.queryResult();
}
if (context.searchType() == SearchType.COUNT) {
freeContext(context.id());
} else {
contextProcessedSuccessfully(context);
}
context.indexShard().searchService().onQueryPhase(context, System.nanoTime() - time);
return context.queryResult();
return result;
} catch (Throwable e) {
// execution exception can happen while loading the cache, strip it
if (e instanceof ExecutionException) {
e = e.getCause();
}
context.indexShard().searchService().onFailedQueryPhase(context);
logger.trace("Query phase failed", e);
freeContext(context);

View File

@ -40,6 +40,7 @@ import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.search.query.ScrollQuerySearchResult;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
@ -211,16 +212,16 @@ public class SearchServiceTransportAction extends AbstractComponent {
}
}
public void sendExecuteQuery(DiscoveryNode node, final ShardSearchRequest request, final SearchServiceListener<QuerySearchResult> listener) {
public void sendExecuteQuery(DiscoveryNode node, final ShardSearchRequest request, final SearchServiceListener<QuerySearchResultProvider> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
execute(new Callable<QuerySearchResult>() {
execute(new Callable<QuerySearchResultProvider>() {
@Override
public QuerySearchResult call() throws Exception {
public QuerySearchResultProvider call() throws Exception {
return searchService.executeQueryPhase(request);
}
}, listener);
} else {
transportService.sendRequest(node, QUERY_ACTION_NAME, request, new BaseTransportResponseHandler<QuerySearchResult>() {
transportService.sendRequest(node, QUERY_ACTION_NAME, request, new BaseTransportResponseHandler<QuerySearchResultProvider>() {
@Override
public QuerySearchResult newInstance() {
@ -228,7 +229,7 @@ public class SearchServiceTransportAction extends AbstractComponent {
}
@Override
public void handleResponse(QuerySearchResult response) {
public void handleResponse(QuerySearchResultProvider response) {
listener.onResult(response);
}
@ -690,7 +691,7 @@ public class SearchServiceTransportAction extends AbstractComponent {
@Override
public void messageReceived(ShardSearchRequest request, TransportChannel channel) throws Exception {
QuerySearchResult result = searchService.executeQueryPhase(request);
QuerySearchResultProvider result = searchService.executeQueryPhase(request);
channel.sendResponse(result);
}

View File

@ -170,7 +170,7 @@ public class TopHitsContext extends SearchContext {
}
@Override
public long nowInMillis() {
protected long nowInMillisImpl() {
return context.nowInMillis();
}

View File

@ -34,7 +34,7 @@ import static org.elasticsearch.search.query.QuerySearchResult.readQuerySearchRe
/**
*
*/
public class QueryFetchSearchResult extends TransportResponse implements QuerySearchResultProvider, FetchSearchResultProvider {
public class QueryFetchSearchResult extends QuerySearchResultProvider implements FetchSearchResultProvider {
private QuerySearchResult queryResult;
private FetchSearchResult fetchResult;

View File

@ -297,7 +297,7 @@ public class DefaultSearchContext extends SearchContext {
return this;
}
public long nowInMillis() {
protected long nowInMillisImpl() {
return request.nowInMillis();
}

View File

@ -98,6 +98,8 @@ public abstract class SearchContext implements Releasable {
}
}
private boolean nowInMillisUsed;
protected abstract void doClose();
/**
@ -129,7 +131,16 @@ public abstract class SearchContext implements Releasable {
public abstract SearchContext queryBoost(float queryBoost);
public abstract long nowInMillis();
public final long nowInMillis() {
nowInMillisUsed = true;
return nowInMillisImpl();
}
public final boolean nowInMillisUsed() {
return nowInMillisUsed;
}
protected abstract long nowInMillisImpl();
public abstract Scroll scroll();

View File

@ -259,11 +259,17 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
@Override
public void writeTo(StreamOutput out) throws IOException {
writeTo(out, false);
}
public void writeTo(StreamOutput out, boolean asKey) throws IOException {
super.writeTo(out);
out.writeString(index);
out.writeVInt(shardId);
out.writeByte(searchType.id());
if (!asKey) {
out.writeVInt(numberOfShards);
}
if (scroll == null) {
out.writeBoolean(false);
} else {
@ -274,7 +280,9 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
out.writeBytesReference(extraSource);
out.writeStringArray(types);
out.writeStringArrayNullable(filteringAliases);
if (!asKey) {
out.writeVLong(nowInMillis);
}
if (out.getVersion().onOrAfter(Version.V_1_1_0)) {
out.writeBytesReference(templateSource);

View File

@ -39,7 +39,7 @@ import static org.elasticsearch.common.lucene.Lucene.writeTopDocs;
/**
*
*/
public class QuerySearchResult extends TransportResponse implements QuerySearchResultProvider {
public class QuerySearchResult extends QuerySearchResultProvider {
private long id;
private SearchShardTarget shardTarget;
@ -159,7 +159,12 @@ public class QuerySearchResult extends TransportResponse implements QuerySearchR
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readLong();
long id = in.readLong();
readFromWithId(id, in);
}
public void readFromWithId(long id, StreamInput in) throws IOException {
this.id = id;
// shardTarget = readSearchShardTarget(in);
from = in.readVInt();
size = in.readVInt();
@ -183,6 +188,10 @@ public class QuerySearchResult extends TransportResponse implements QuerySearchR
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
writeToNoId(out);
}
public void writeToNoId(StreamOutput out) throws IOException {
// shardTarget.writeTo(out);
out.writeVInt(from);
out.writeVInt(size);

View File

@ -20,16 +20,17 @@
package org.elasticsearch.search.query;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.transport.TransportResponse;
/**
*
*/
public interface QuerySearchResultProvider extends SearchPhaseResult {
public abstract class QuerySearchResultProvider extends TransportResponse implements SearchPhaseResult {
/**
* If both query and fetch happened on the same call.
*/
boolean includeFetch();
public abstract boolean includeFetch();
QuerySearchResult queryResult();
public abstract QuerySearchResult queryResult();
}

View File

@ -1,177 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.cache;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import org.junit.Test;
import static org.elasticsearch.index.query.QueryBuilders.filteredQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.hamcrest.Matchers.*;
/**
*/
@ClusterScope(scope= Scope.SUITE, numDataNodes =1, numClientNodes = 0, randomDynamicTemplates = false)
public class CacheTests extends ElasticsearchIntegrationTest {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
//Filter cache is cleaned periodically, default is 60s, so make sure it runs often. Thread.sleep for 60s is bad
return ImmutableSettings.settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put("indices.cache.filter.clean_interval", "1ms").build();
}
@Test
public void testClearCacheFilterKeys() {
client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet();
client().prepareIndex("test", "type", "1").setSource("field", "value").execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes(), equalTo(0l));
IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("test").clear().setFilterCache(true).execute().actionGet();
assertThat(indicesStats.getTotal().getFilterCache().getMemorySizeInBytes(), equalTo(0l));
SearchResponse searchResponse = client().prepareSearch().setQuery(filteredQuery(matchAllQuery(), FilterBuilders.termFilter("field", "value").cacheKey("test_key"))).execute().actionGet();
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes(), internalCluster().hasFilterCache() ? greaterThan(0l) : is(0L));
indicesStats = client().admin().indices().prepareStats("test").clear().setFilterCache(true).execute().actionGet();
assertThat(indicesStats.getTotal().getFilterCache().getMemorySizeInBytes(), internalCluster().hasFilterCache() ? greaterThan(0l) : is(0L));
client().admin().indices().prepareClearCache().setFilterKeys("test_key").execute().actionGet();
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes(), equalTo(0l));
indicesStats = client().admin().indices().prepareStats("test").clear().setFilterCache(true).execute().actionGet();
assertThat(indicesStats.getTotal().getFilterCache().getMemorySizeInBytes(), equalTo(0l));
}
@Test
public void testFieldDataStats() {
client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet();
client().prepareIndex("test", "type", "1").setSource("field", "value1", "field2", "value1").execute().actionGet();
client().prepareIndex("test", "type", "2").setSource("field", "value2", "field2", "value2").execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0l));
IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet();
assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), equalTo(0l));
// sort to load it to field data...
client().prepareSearch().addSort("field", SortOrder.ASC).execute().actionGet();
client().prepareSearch().addSort("field", SortOrder.ASC).execute().actionGet();
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0l));
indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet();
assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), greaterThan(0l));
// sort to load it to field data...
client().prepareSearch().addSort("field2", SortOrder.ASC).execute().actionGet();
client().prepareSearch().addSort("field2", SortOrder.ASC).execute().actionGet();
// now check the per field stats
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.FieldData, true).fieldDataFields("*")).execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0l));
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getFields().get("field"), greaterThan(0l));
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getFields().get("field"), lessThan(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes()));
indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).setFieldDataFields("*").execute().actionGet();
assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), greaterThan(0l));
assertThat(indicesStats.getTotal().getFieldData().getFields().get("field"), greaterThan(0l));
assertThat(indicesStats.getTotal().getFieldData().getFields().get("field"), lessThan(indicesStats.getTotal().getFieldData().getMemorySizeInBytes()));
client().admin().indices().prepareClearCache().setFieldDataCache(true).execute().actionGet();
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0l));
indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet();
assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), equalTo(0l));
}
@Test
public void testClearAllCaches() throws Exception {
client().admin().indices().prepareCreate("test")
.setSettings(ImmutableSettings.settingsBuilder()
.put("index.number_of_replicas", 0)
.put("index.number_of_shards", 1))
.execute().actionGet();
client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
client().prepareIndex("test", "type", "1").setSource("field", "value1").execute().actionGet();
client().prepareIndex("test", "type", "2").setSource("field", "value2").execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true)
.execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0l));
assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes(), equalTo(0l));
IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("test")
.clear().setFieldData(true).setFilterCache(true)
.execute().actionGet();
assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), equalTo(0l));
assertThat(indicesStats.getTotal().getFilterCache().getMemorySizeInBytes(), equalTo(0l));
// sort to load it to field data and filter to load filter cache
client().prepareSearch()
.setPostFilter(FilterBuilders.termFilter("field", "value1"))
.addSort("field", SortOrder.ASC)
.execute().actionGet();
client().prepareSearch()
.setPostFilter(FilterBuilders.termFilter("field", "value2"))
.addSort("field", SortOrder.ASC)
.execute().actionGet();
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true)
.execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0l));
assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes(), internalCluster().hasFilterCache() ? greaterThan(0l) : is(0L));
indicesStats = client().admin().indices().prepareStats("test")
.clear().setFieldData(true).setFilterCache(true)
.execute().actionGet();
assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), greaterThan(0l));
assertThat(indicesStats.getTotal().getFilterCache().getMemorySizeInBytes(), internalCluster().hasFilterCache() ? greaterThan(0l) : is(0L));
client().admin().indices().prepareClearCache().execute().actionGet();
Thread.sleep(100); // Make sure the filter cache entries have been removed...
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true)
.execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0l));
assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes(), equalTo(0l));
indicesStats = client().admin().indices().prepareStats("test")
.clear().setFieldData(true).setFilterCache(true)
.execute().actionGet();
assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), equalTo(0l));
assertThat(indicesStats.getTotal().getFilterCache().getMemorySizeInBytes(), equalTo(0l));
}
}

View File

@ -20,12 +20,20 @@
package org.elasticsearch.indices.stats;
import org.apache.lucene.util.Version;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.*;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
@ -38,11 +46,197 @@ import java.util.Random;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.filteredQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.*;
@ClusterScope(scope = Scope.SUITE, numDataNodes = 2)
public class SimpleIndexStatsTests extends ElasticsearchIntegrationTest {
@ClusterScope(scope = Scope.SUITE, numDataNodes = 2, numClientNodes = 0, randomDynamicTemplates = false)
public class IndexStatsTests extends ElasticsearchIntegrationTest {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
//Filter/Query cache is cleaned periodically, default is 60s, so make sure it runs often. Thread.sleep for 60s is bad
return ImmutableSettings.settingsBuilder().put(super.nodeSettings(nodeOrdinal))
.put("indices.cache.filter.clean_interval", "1ms")
.put(IndicesQueryCache.INDICES_CACHE_QUERY_CLEAN_INTERVAL, "1ms")
.build();
}
@Test
public void testClearCacheFilterKeys() {
client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 2)).execute().actionGet();
client().prepareIndex("test", "type", "1").setSource("field", "value").execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFilterCache().getMemorySizeInBytes(), equalTo(0l));
IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("test").clear().setFilterCache(true).execute().actionGet();
assertThat(indicesStats.getTotal().getFilterCache().getMemorySizeInBytes(), equalTo(0l));
SearchResponse searchResponse = client().prepareSearch().setQuery(filteredQuery(matchAllQuery(), FilterBuilders.termFilter("field", "value").cacheKey("test_key"))).execute().actionGet();
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFilterCache().getMemorySizeInBytes(), internalCluster().hasFilterCache() ? greaterThan(0l) : is(0L));
indicesStats = client().admin().indices().prepareStats("test").clear().setFilterCache(true).execute().actionGet();
assertThat(indicesStats.getTotal().getFilterCache().getMemorySizeInBytes(), internalCluster().hasFilterCache() ? greaterThan(0l) : is(0L));
client().admin().indices().prepareClearCache().setFilterKeys("test_key").execute().actionGet();
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFilterCache().getMemorySizeInBytes(), equalTo(0l));
indicesStats = client().admin().indices().prepareStats("test").clear().setFilterCache(true).execute().actionGet();
assertThat(indicesStats.getTotal().getFilterCache().getMemorySizeInBytes(), equalTo(0l));
}
@Test
public void testFieldDataStats() {
client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 2)).execute().actionGet();
client().prepareIndex("test", "type", "1").setSource("field", "value1", "field2", "value1").execute().actionGet();
client().prepareIndex("test", "type", "2").setSource("field", "value2", "field2", "value2").execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0l));
IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet();
assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), equalTo(0l));
// sort to load it to field data...
client().prepareSearch().addSort("field", SortOrder.ASC).execute().actionGet();
client().prepareSearch().addSort("field", SortOrder.ASC).execute().actionGet();
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0l));
indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet();
assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), greaterThan(0l));
// sort to load it to field data...
client().prepareSearch().addSort("field2", SortOrder.ASC).execute().actionGet();
client().prepareSearch().addSort("field2", SortOrder.ASC).execute().actionGet();
// now check the per field stats
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.FieldData, true).fieldDataFields("*")).execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0l));
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getFields().get("field") + nodesStats.getNodes()[1].getIndices().getFieldData().getFields().get("field"), greaterThan(0l));
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getFields().get("field") + nodesStats.getNodes()[1].getIndices().getFieldData().getFields().get("field"), lessThan(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes()));
indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).setFieldDataFields("*").execute().actionGet();
assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), greaterThan(0l));
assertThat(indicesStats.getTotal().getFieldData().getFields().get("field"), greaterThan(0l));
assertThat(indicesStats.getTotal().getFieldData().getFields().get("field"), lessThan(indicesStats.getTotal().getFieldData().getMemorySizeInBytes()));
client().admin().indices().prepareClearCache().setFieldDataCache(true).execute().actionGet();
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0l));
indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet();
assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), equalTo(0l));
}
@Test
public void testClearAllCaches() throws Exception {
client().admin().indices().prepareCreate("test")
.setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_replicas", 0).put("index.number_of_shards", 2))
.execute().actionGet();
client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
client().prepareIndex("test", "type", "1").setSource("field", "value1").execute().actionGet();
client().prepareIndex("test", "type", "2").setSource("field", "value2").execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true)
.execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0l));
assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFilterCache().getMemorySizeInBytes(), equalTo(0l));
IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("test")
.clear().setFieldData(true).setFilterCache(true)
.execute().actionGet();
assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), equalTo(0l));
assertThat(indicesStats.getTotal().getFilterCache().getMemorySizeInBytes(), equalTo(0l));
// sort to load it to field data and filter to load filter cache
client().prepareSearch()
.setPostFilter(FilterBuilders.termFilter("field", "value1"))
.addSort("field", SortOrder.ASC)
.execute().actionGet();
client().prepareSearch()
.setPostFilter(FilterBuilders.termFilter("field", "value2"))
.addSort("field", SortOrder.ASC)
.execute().actionGet();
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true)
.execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0l));
assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFilterCache().getMemorySizeInBytes(), internalCluster().hasFilterCache() ? greaterThan(0l) : is(0L));
indicesStats = client().admin().indices().prepareStats("test")
.clear().setFieldData(true).setFilterCache(true)
.execute().actionGet();
assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), greaterThan(0l));
assertThat(indicesStats.getTotal().getFilterCache().getMemorySizeInBytes(), internalCluster().hasFilterCache() ? greaterThan(0l) : is(0L));
client().admin().indices().prepareClearCache().execute().actionGet();
Thread.sleep(100); // Make sure the filter cache entries have been removed...
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true)
.execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0l));
assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFilterCache().getMemorySizeInBytes(), equalTo(0l));
indicesStats = client().admin().indices().prepareStats("test")
.clear().setFieldData(true).setFilterCache(true)
.execute().actionGet();
assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), equalTo(0l));
assertThat(indicesStats.getTotal().getFilterCache().getMemorySizeInBytes(), equalTo(0l));
}
@Test
public void testQueryCache() throws Exception {
assertAcked(client().admin().indices().prepareCreate("idx").setSettings(IndicesQueryCache.INDEX_CACHE_QUERY_ENABLED, true).get());
ensureGreen();
int numDocs = randomIntBetween(2, 100);
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
for (int i = 0; i < numDocs; ++i) {
builders[i] = client().prepareIndex("idx", "type", Integer.toString(i)).setSource(jsonBuilder()
.startObject()
.field("common", "field")
.field("str_value", "s" + i)
.endObject());
}
indexRandom(true, builders);
assertThat(client().admin().indices().prepareStats("idx").setQueryCache(true).get().getTotal().getQueryCache().getMemorySizeInBytes(), equalTo(0l));
for (int i = 0; i < 10; i++) {
assertThat(client().prepareSearch("idx").setSearchType(SearchType.COUNT).get().getHits().getTotalHits(), equalTo((long) numDocs));
assertThat(client().admin().indices().prepareStats("idx").setQueryCache(true).get().getTotal().getQueryCache().getMemorySizeInBytes(), greaterThan(0l));
}
// index the data again...
builders = new IndexRequestBuilder[numDocs];
for (int i = 0; i < numDocs; ++i) {
builders[i] = client().prepareIndex("idx", "type", Integer.toString(i)).setSource(jsonBuilder()
.startObject()
.field("common", "field")
.field("str_value", "s" + i)
.endObject());
}
indexRandom(true, builders);
refresh();
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(client().admin().indices().prepareStats("idx").setQueryCache(true).get().getTotal().getQueryCache().getMemorySizeInBytes(), equalTo(0l));
}
});
for (int i = 0; i < 10; i++) {
assertThat(client().prepareSearch("idx").setSearchType(SearchType.COUNT).get().getHits().getTotalHits(), equalTo((long) numDocs));
assertThat(client().admin().indices().prepareStats("idx").setQueryCache(true).get().getTotal().getQueryCache().getMemorySizeInBytes(), greaterThan(0l));
}
client().admin().indices().prepareClearCache().setQueryCache(true).get(); // clean the cache
assertThat(client().admin().indices().prepareStats("idx").setQueryCache(true).get().getTotal().getQueryCache().getMemorySizeInBytes(), equalTo(0l));
}
@Test
public void simpleStats() throws Exception {
@ -318,7 +512,7 @@ public class SimpleIndexStatsTests extends ElasticsearchIntegrationTest {
@Test
public void testFlagOrdinalOrder() {
Flag[] flags = new Flag[]{Flag.Store, Flag.Indexing, Flag.Get, Flag.Search, Flag.Merge, Flag.Flush, Flag.Refresh,
Flag.FilterCache, Flag.IdCache, Flag.FieldData, Flag.Docs, Flag.Warmer, Flag.Percolate, Flag.Completion, Flag.Segments, Flag.Translog, Flag.Suggest};
Flag.FilterCache, Flag.IdCache, Flag.FieldData, Flag.Docs, Flag.Warmer, Flag.Percolate, Flag.Completion, Flag.Segments, Flag.Translog, Flag.Suggest, Flag.QueryCache};
assertThat(flags.length, equalTo(Flag.values().length));
for (int i = 0; i < flags.length; i++) {
@ -586,6 +780,9 @@ public class SimpleIndexStatsTests extends ElasticsearchIntegrationTest {
case Suggest:
builder.setSuggest(set);
break;
case QueryCache:
builder.setQueryCache(set);
break;
default:
fail("new flag? " + flag);
break;
@ -628,6 +825,8 @@ public class SimpleIndexStatsTests extends ElasticsearchIntegrationTest {
return response.getTranslog() != null;
case Suggest:
return response.getSuggest() != null;
case QueryCache:
return response.getQueryCache() != null;
default:
fail("new flag? " + flag);
return false;

View File

@ -90,6 +90,7 @@ import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.index.translog.fs.FsTranslogFile;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.rest.RestStatus;
@ -442,6 +443,10 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
// Randomly load or don't load bloom filters:
builder.put(CodecService.INDEX_CODEC_BLOOM_LOAD, random.nextBoolean());
if (random.nextBoolean()) {
builder.put(IndicesQueryCache.INDEX_CACHE_QUERY_ENABLED, random.nextBoolean());
}
return builder;
}

View File

@ -168,7 +168,7 @@ public class TestSearchContext extends SearchContext {
}
@Override
public long nowInMillis() {
protected long nowInMillisImpl() {
return 0;
}