Core: Remove terms filter cache.

This is our only cache which is not 'exact' and might allow for stalled results.
Additionally, a similar cache that we have and needs to perform lookups in other
indices in order to run queries is the script index, and for this index we rely
on the filesystem cache, so we should probably do the same with terms filters
lookups.

Close #9056
This commit is contained in:
Adrien Grand 2014-12-24 11:27:45 +01:00
parent 236e2491b4
commit bc86796592
6 changed files with 26 additions and 228 deletions

View File

@ -102,4 +102,11 @@ Some query builders have been removed or renamed:
* `filtered(...)` removed. Use `filteredQuery(...)` instead.
* `inQuery(...)` removed.
=== Terms filter lookup caching
The terms filter lookup mechanism does not support the `cache` option anymore
and relies on the filesystem cache instead. If the lookup index is not too
large, it is recommended to make it replicated to all nodes by setting
`index.auto_expand_replicas: 0-all` in order to remove the network overhead as
well.

View File

@ -121,11 +121,6 @@ The terms lookup mechanism supports the following options:
A custom routing value to be used when retrieving the
external terms doc.
`cache`::
Whether to cache the filter built from the retrieved document
(`true` - default) or whether to fetch and rebuild the filter on every
request (`false`). See "<<query-dsl-terms-filter-lookup-caching,Terms lookup caching>>" below
The values for the `terms` filter will be fetched from a field in a
document with the specified id in the specified type and index.
Internally a get request is executed to fetch the values from the
@ -137,28 +132,6 @@ across all nodes if the "reference" terms data is not large. The lookup
terms filter will prefer to execute the get request on a local node if
possible, reducing the need for networking.
["float",id="query-dsl-terms-filter-lookup-caching"]
==== Terms lookup caching
There is an additional cache involved, which caches the lookup of the
lookup document to the actual terms. This lookup cache is a LRU cache.
This cache has the following options:
`indices.cache.filter.terms.size`::
The size of the lookup cache. The default is `10mb`.
`indices.cache.filter.terms.expire_after_access`::
The time after the last read an entry should expire. Disabled by default.
`indices.cache.filter.terms.expire_after_write`::
The time after the last write an entry should expire. Disabled by default.
All options for the lookup of the documents cache can only be configured
via the `elasticsearch.yml` file.
When using the terms lookup the `execution` option isn't taken into
account and behaves as if the execution mode was set to `plain`.
[float]
==== Terms lookup twitter example
@ -194,19 +167,9 @@ curl -XGET localhost:9200/tweets/_search -d '{
}'
--------------------------------------------------
The above is highly optimized, both in a sense that the list of
followers will not be fetched if the filter is already cached in the
filter cache, and with internal LRU cache for fetching external values
for the terms filter. Also, the entry in the filter cache will not hold
`all` the terms reducing the memory required for it.
`_cache_key` is recommended to be set, so its simple to clear the cache
associated with it using the clear cache API. For example:
[source,js]
--------------------------------------------------
curl -XPOST 'localhost:9200/tweets/_cache/clear?filter_keys=user_2_friends'
--------------------------------------------------
If there are lots of matching values, then `_cache_key` is recommended to be set,
so that the filter cache will not store a reference to the potentially heavy
terms filter.
The structure of the external terms document can also include array of
inner objects, for example:

View File

@ -33,11 +33,10 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cache.filter.terms.IndicesTermsFilterCache;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -53,16 +52,14 @@ import static com.google.common.collect.Lists.newArrayList;
public class TransportClearIndicesCacheAction extends TransportBroadcastOperationAction<ClearIndicesCacheRequest, ClearIndicesCacheResponse, ShardClearIndicesCacheRequest, ShardClearIndicesCacheResponse> {
private final IndicesService indicesService;
private final IndicesTermsFilterCache termsFilterCache;
private final IndicesQueryCache indicesQueryCache;
@Inject
public TransportClearIndicesCacheAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, IndicesService indicesService, IndicesTermsFilterCache termsFilterCache,
TransportService transportService, IndicesService indicesService,
IndicesQueryCache indicesQueryCache, ActionFilters actionFilters) {
super(settings, ClearIndicesCacheAction.NAME, threadPool, clusterService, transportService, actionFilters);
this.indicesService = indicesService;
this.termsFilterCache = termsFilterCache;
this.indicesQueryCache = indicesQueryCache;
}
@ -124,12 +121,10 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
if (request.filterCache()) {
clearedAtLeastOne = true;
service.cache().filter().clear("api");
termsFilterCache.clear("api");
}
if (request.filterKeys() != null && request.filterKeys().length > 0) {
clearedAtLeastOne = true;
service.cache().filter().clear("api", request.filterKeys());
termsFilterCache.clear("api", request.filterKeys());
}
if (request.fieldDataCache()) {
clearedAtLeastOne = true;
@ -163,7 +158,6 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
} else {
service.cache().clear("api");
service.fieldData().clear();
termsFilterCache.clear("api");
indicesQueryCache.clear(shard);
}
}

View File

@ -28,6 +28,9 @@ import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.FilterCachingPolicy;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.BytesRefs;
import org.elasticsearch.common.lucene.HashedBytesRef;
@ -36,9 +39,9 @@ import org.elasticsearch.common.lucene.search.OrFilter;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.common.lucene.search.XBooleanFilter;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.indices.cache.filter.terms.IndicesTermsFilterCache;
import org.elasticsearch.indices.cache.filter.terms.TermsLookup;
import java.io.IOException;
@ -52,7 +55,7 @@ import static org.elasticsearch.index.query.support.QueryParsers.wrapSmartNameFi
public class TermsFilterParser implements FilterParser {
public static final String NAME = "terms";
private IndicesTermsFilterCache termsFilterCache;
private Client client;
public static final String EXECUTION_KEY = "execution";
public static final String EXECUTION_VALUE_PLAIN = "plain";
@ -74,8 +77,8 @@ public class TermsFilterParser implements FilterParser {
}
@Inject(optional = true)
public void setIndicesTermsFilterCache(IndicesTermsFilterCache termsFilterCache) {
this.termsFilterCache = termsFilterCache;
public void setClient(Client client) {
this.client = client;
}
@Override
@ -92,7 +95,6 @@ public class TermsFilterParser implements FilterParser {
String lookupId = null;
String lookupPath = null;
String lookupRouting = null;
boolean lookupCache = true;
HashedBytesRef cacheKey = null;
XContentParser.Token token;
@ -131,8 +133,6 @@ public class TermsFilterParser implements FilterParser {
lookupPath = parser.text();
} else if ("routing".equals(currentFieldName)) {
lookupRouting = parser.textOrNull();
} else if ("cache".equals(currentFieldName)) {
lookupCache = parser.booleanValue();
} else {
throw new QueryParsingException(parseContext.index(), "[terms] filter does not support [" + currentFieldName + "] within lookup element");
}
@ -166,7 +166,7 @@ public class TermsFilterParser implements FilterParser {
throw new QueryParsingException(parseContext.index(), "terms filter requires a field name, followed by array of terms");
}
FieldMapper fieldMapper = null;
FieldMapper<?> fieldMapper = null;
smartNameFieldMappers = parseContext.smartFieldMappers(fieldName);
String[] previousTypes = null;
if (smartNameFieldMappers != null) {
@ -181,15 +181,12 @@ public class TermsFilterParser implements FilterParser {
}
if (lookupId != null) {
// if there are no mappings, then nothing has been indexing yet against this shard, so we can return
// no match (but not cached!), since the Terms Lookup relies on the fact that there are mappings...
if (fieldMapper == null) {
return Queries.MATCH_NO_FILTER;
final TermsLookup lookup = new TermsLookup(lookupIndex, lookupType, lookupId, lookupRouting, lookupPath, parseContext);
final GetResponse getResponse = client.get(new GetRequest(lookup.getIndex(), lookup.getType(), lookup.getId()).preference("_local").routing(lookup.getRouting())).actionGet();
if (getResponse.isExists()) {
List<Object> values = XContentMapValues.extractRawValues(lookup.getPath(), getResponse.getSourceAsMap());
terms.addAll(values);
}
// external lookup, use it
TermsLookup termsLookup = new TermsLookup(lookupIndex, lookupType, lookupId, lookupRouting, lookupPath, parseContext);
terms.addAll(termsFilterCache.terms(termsLookup, lookupCache, cacheKey));
}
if (terms.isEmpty()) {

View File

@ -20,6 +20,7 @@
package org.elasticsearch.indices;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.action.update.UpdateHelper;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
@ -27,7 +28,6 @@ import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.analysis.IndicesAnalysisModule;
import org.elasticsearch.indices.cache.filter.IndicesFilterCache;
import org.elasticsearch.indices.cache.filter.terms.IndicesTermsFilterCache;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
@ -75,7 +75,6 @@ public class IndicesModule extends AbstractModule implements SpawnModules {
bind(IndicesFilterCache.class).asEagerSingleton();
bind(IndicesQueryCache.class).asEagerSingleton();
bind(IndicesFieldDataCache.class).asEagerSingleton();
bind(IndicesTermsFilterCache.class).asEagerSingleton();
bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton();
bind(IndicesTTLService.class).asEagerSingleton();
bind(IndicesWarmer.class).to(InternalIndicesWarmer.class).asEagerSingleton();

View File

@ -1,162 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.cache.filter.terms;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.Weigher;
import com.google.common.collect.ImmutableList;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.HashedBytesRef;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
*/
public class IndicesTermsFilterCache extends AbstractComponent {
private static final long BASE_RAM_BYTES_STRING = RamUsageEstimator.shallowSizeOfInstance(String.class) + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER;
private static final long BASE_RAM_BYTES_BYTES_REF = RamUsageEstimator.shallowSizeOfInstance(BytesRef.class) + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER;
private static final TermsFilterValue NO_TERMS = new TermsFilterValue(0, ImmutableList.of());
private final Client client;
private final Cache<HashedBytesRef, TermsFilterValue> cache;
@Inject
public IndicesTermsFilterCache(Settings settings, Client client) {
super(settings);
this.client = client;
ByteSizeValue size = componentSettings.getAsBytesSize("size", new ByteSizeValue(10, ByteSizeUnit.MB));
TimeValue expireAfterWrite = componentSettings.getAsTime("expire_after_write", null);
TimeValue expireAfterAccess = componentSettings.getAsTime("expire_after_access", null);
CacheBuilder<HashedBytesRef, TermsFilterValue> builder = CacheBuilder.newBuilder()
.maximumWeight(size.bytes())
.weigher(new TermsFilterValueWeigher());
if (expireAfterAccess != null) {
builder.expireAfterAccess(expireAfterAccess.millis(), TimeUnit.MILLISECONDS);
}
if (expireAfterWrite != null) {
builder.expireAfterWrite(expireAfterWrite.millis(), TimeUnit.MILLISECONDS);
}
this.cache = builder.build();
}
public List<Object> terms(final TermsLookup lookup, boolean cacheLookup, @Nullable HashedBytesRef cacheKey) throws RuntimeException {
if (!cacheLookup) {
return buildTermsFilterValue(lookup).values;
}
HashedBytesRef key;
if (cacheKey != null) {
key = cacheKey;
} else {
key = new HashedBytesRef(lookup.toString());
}
try {
return cache.get(key, new Callable<TermsFilterValue>() {
@Override
public TermsFilterValue call() throws Exception {
return buildTermsFilterValue(lookup);
}
}).values;
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
}
throw new ElasticsearchException(e.getMessage(), e.getCause());
}
}
TermsFilterValue buildTermsFilterValue(TermsLookup lookup) {
GetResponse getResponse = client.get(new GetRequest(lookup.getIndex(), lookup.getType(), lookup.getId()).preference("_local").routing(lookup.getRouting())).actionGet();
if (!getResponse.isExists()) {
return NO_TERMS;
}
List<Object> values = XContentMapValues.extractRawValues(lookup.getPath(), getResponse.getSourceAsMap());
if (values.isEmpty()) {
return NO_TERMS;
}
return new TermsFilterValue(estimateSizeInBytes(values), ImmutableList.copyOf(values));
}
long estimateSizeInBytes(List<Object> terms) {
long size = 8 + terms.size() * RamUsageEstimator.NUM_BYTES_OBJECT_REF;
for (Object term : terms) {
if (term instanceof BytesRef) {
size += BASE_RAM_BYTES_BYTES_REF + ((BytesRef) term).length;
} else if (term instanceof String) {
size += BASE_RAM_BYTES_STRING + ((String) term).length() * RamUsageEstimator.NUM_BYTES_CHAR;
} else {
size += 4;
}
}
return size;
}
public void clear(String reason) {
cache.invalidateAll();
}
public void clear(String reason, String[] keys) {
for (String key : keys) {
cache.invalidate(new BytesRef(key));
}
}
static class TermsFilterValueWeigher implements Weigher<HashedBytesRef, TermsFilterValue> {
@Override
public int weigh(HashedBytesRef key, TermsFilterValue value) {
return (int) (key.bytes.length + value.sizeInBytes);
}
}
static class TermsFilterValue {
public final long sizeInBytes;
public final ImmutableList<Object> values;
TermsFilterValue(long sizeInBytes, ImmutableList<Object> values) {
this.sizeInBytes = sizeInBytes;
this.values = values;
}
}
}