Clear cache: allow to invalidate specific filter cache keys
closes #2653
This commit is contained in:
parent
c12c456192
commit
4714a6acc9
|
@ -35,6 +35,7 @@ public class ClearIndicesCacheRequest extends BroadcastOperationRequest<ClearInd
|
|||
private boolean fieldDataCache = false;
|
||||
private boolean idCache = false;
|
||||
private String[] fields = null;
|
||||
private String[] filterKeys = null;
|
||||
|
||||
ClearIndicesCacheRequest() {
|
||||
}
|
||||
|
@ -72,6 +73,15 @@ public class ClearIndicesCacheRequest extends BroadcastOperationRequest<ClearInd
|
|||
return this.fields;
|
||||
}
|
||||
|
||||
public ClearIndicesCacheRequest filterKeys(String... filterKeys) {
|
||||
this.filterKeys = filterKeys;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String[] filterKeys() {
|
||||
return this.filterKeys;
|
||||
}
|
||||
|
||||
public boolean idCache() {
|
||||
return this.idCache;
|
||||
}
|
||||
|
@ -86,13 +96,8 @@ public class ClearIndicesCacheRequest extends BroadcastOperationRequest<ClearInd
|
|||
filterCache = in.readBoolean();
|
||||
fieldDataCache = in.readBoolean();
|
||||
idCache = in.readBoolean();
|
||||
int size = in.readVInt();
|
||||
if (size > 0) {
|
||||
fields = new String[size];
|
||||
for (int i = 0; i < size; i++) {
|
||||
fields[i] = in.readString();
|
||||
}
|
||||
}
|
||||
fields = in.readStringArray();
|
||||
filterKeys = in.readStringArray();
|
||||
}
|
||||
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
|
@ -100,13 +105,7 @@ public class ClearIndicesCacheRequest extends BroadcastOperationRequest<ClearInd
|
|||
out.writeBoolean(filterCache);
|
||||
out.writeBoolean(fieldDataCache);
|
||||
out.writeBoolean(idCache);
|
||||
if (fields == null) {
|
||||
out.writeVInt(0);
|
||||
} else {
|
||||
out.writeVInt(fields.length);
|
||||
for (String field : fields) {
|
||||
out.writeString(field);
|
||||
}
|
||||
}
|
||||
out.writeStringArrayNullable(fields);
|
||||
out.writeStringArrayNullable(filterKeys);
|
||||
}
|
||||
}
|
|
@ -48,6 +48,11 @@ public class ClearIndicesCacheRequestBuilder extends BroadcastOperationRequestBu
|
|||
return this;
|
||||
}
|
||||
|
||||
public ClearIndicesCacheRequestBuilder setFilterKeys(String... filterKeys) {
|
||||
request.filterKeys(filterKeys);
|
||||
return this;
|
||||
}
|
||||
|
||||
public ClearIndicesCacheRequestBuilder setIdCache(boolean idCache) {
|
||||
request.idCache(idCache);
|
||||
return this;
|
||||
|
|
|
@ -34,6 +34,7 @@ class ShardClearIndicesCacheRequest extends BroadcastShardOperationRequest {
|
|||
private boolean fieldDataCache = false;
|
||||
private boolean idCache = false;
|
||||
private String[] fields = null;
|
||||
private String[] filterKeys = null;
|
||||
|
||||
ShardClearIndicesCacheRequest() {
|
||||
}
|
||||
|
@ -44,6 +45,7 @@ class ShardClearIndicesCacheRequest extends BroadcastShardOperationRequest {
|
|||
fieldDataCache = request.fieldDataCache();
|
||||
idCache = request.idCache();
|
||||
fields = request.fields();
|
||||
filterKeys = request.filterKeys();
|
||||
}
|
||||
|
||||
public boolean filterCache() {
|
||||
|
@ -62,6 +64,10 @@ class ShardClearIndicesCacheRequest extends BroadcastShardOperationRequest {
|
|||
return this.fields;
|
||||
}
|
||||
|
||||
public String[] filterKeys() {
|
||||
return this.filterKeys;
|
||||
}
|
||||
|
||||
public ShardClearIndicesCacheRequest waitForOperations(boolean waitForOperations) {
|
||||
this.filterCache = waitForOperations;
|
||||
return this;
|
||||
|
@ -73,13 +79,8 @@ class ShardClearIndicesCacheRequest extends BroadcastShardOperationRequest {
|
|||
filterCache = in.readBoolean();
|
||||
fieldDataCache = in.readBoolean();
|
||||
idCache = in.readBoolean();
|
||||
int size = in.readVInt();
|
||||
if (size > 0) {
|
||||
fields = new String[size];
|
||||
for (int i = 0; i < size; i++) {
|
||||
fields[i] = in.readString();
|
||||
}
|
||||
}
|
||||
fields = in.readStringArray();
|
||||
filterKeys = in.readStringArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -88,13 +89,7 @@ class ShardClearIndicesCacheRequest extends BroadcastShardOperationRequest {
|
|||
out.writeBoolean(filterCache);
|
||||
out.writeBoolean(fieldDataCache);
|
||||
out.writeBoolean(idCache);
|
||||
if (fields == null) {
|
||||
out.writeVInt(0);
|
||||
} else {
|
||||
out.writeVInt(fields.length);
|
||||
for (String field : fields) {
|
||||
out.writeString(field);
|
||||
}
|
||||
}
|
||||
out.writeStringArrayNullable(fields);
|
||||
out.writeStringArrayNullable(filterKeys);
|
||||
}
|
||||
}
|
|
@ -124,6 +124,10 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
|
|||
clearedAtLeastOne = true;
|
||||
service.cache().filter().clear("api");
|
||||
}
|
||||
if (request.filterKeys() != null && request.filterKeys().length > 0) {
|
||||
clearedAtLeastOne = true;
|
||||
service.cache().filter().clear("api", request.filterKeys());
|
||||
}
|
||||
if (request.fieldDataCache()) {
|
||||
clearedAtLeastOne = true;
|
||||
if (request.fields() == null || request.fields().length == 0) {
|
||||
|
|
|
@ -47,6 +47,8 @@ public interface FilterCache extends IndexComponent, CloseableComponent {
|
|||
|
||||
void clear(String reason);
|
||||
|
||||
void clear(String reason, String[] keys);
|
||||
|
||||
EntriesStats entriesStats();
|
||||
|
||||
long evictions();
|
||||
|
|
|
@ -59,6 +59,11 @@ public class NoneFilterCache extends AbstractIndexComponent implements FilterCac
|
|||
// nothing to do here
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear(String reason, String[] keys) {
|
||||
// nothing to do there
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear(IndexReader reader) {
|
||||
// nothing to do here
|
||||
|
|
|
@ -88,6 +88,16 @@ public class WeightedFilterCache extends AbstractIndexComponent implements Filte
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear(String reason, String[] keys) {
|
||||
logger.debug("clear keys [], reason [{}]", reason, keys);
|
||||
for (String key : keys) {
|
||||
for (Object readerKey : seenReaders.keySet()) {
|
||||
indicesFilterCache.cache().invalidate(new FilterCacheKey(this, readerKey, new CacheKeyFilter.Key(key)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClose(SegmentReader owner) {
|
||||
clear(owner);
|
||||
|
|
|
@ -67,6 +67,7 @@ public class RestClearIndicesCacheAction extends BaseRestHandler {
|
|||
clearIndicesCacheRequest.fieldDataCache(request.paramAsBoolean("field_data", clearIndicesCacheRequest.fieldDataCache()));
|
||||
clearIndicesCacheRequest.idCache(request.paramAsBoolean("id", clearIndicesCacheRequest.idCache()));
|
||||
clearIndicesCacheRequest.fields(request.paramAsStringArray("fields", clearIndicesCacheRequest.fields()));
|
||||
clearIndicesCacheRequest.filterKeys(request.paramAsStringArray("filter_keys", clearIndicesCacheRequest.filterKeys()));
|
||||
|
||||
BroadcastOperationThreading operationThreading = BroadcastOperationThreading.fromString(request.param("operationThreading"), BroadcastOperationThreading.SINGLE_THREAD);
|
||||
if (operationThreading == BroadcastOperationThreading.NO_THREADS) {
|
||||
|
|
79
src/test/java/org/elasticsearch/test/integration/indices/cache/ClearCacheTests.java
vendored
Normal file
79
src/test/java/org/elasticsearch/test/integration/indices/cache/ClearCacheTests.java
vendored
Normal file
|
@ -0,0 +1,79 @@
|
|||
/*
|
||||
* Licensed to ElasticSearch and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. ElasticSearch licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.test.integration.indices.cache;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.index.query.FilterBuilders;
|
||||
import org.elasticsearch.test.integration.AbstractNodesTests;
|
||||
import org.testng.annotations.AfterClass;
|
||||
import org.testng.annotations.BeforeClass;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import static org.elasticsearch.index.query.QueryBuilders.filteredQuery;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
||||
/**
|
||||
*/
|
||||
@Test
|
||||
public class ClearCacheTests extends AbstractNodesTests {
|
||||
|
||||
private Client client;
|
||||
|
||||
@BeforeClass
|
||||
public void createNodes() throws Exception {
|
||||
startNode("node1", ImmutableSettings.settingsBuilder().put("index.cache.stats.refresh_interval", 0));
|
||||
client = getClient();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public void closeNodes() {
|
||||
client.close();
|
||||
closeAllNodes();
|
||||
}
|
||||
|
||||
protected Client getClient() {
|
||||
return client("node1");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClearCacheFilterKeys() {
|
||||
client.admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet();
|
||||
client.prepareIndex("test", "type", "1").setSource("field", "value").execute().actionGet();
|
||||
client.admin().indices().prepareRefresh().execute().actionGet();
|
||||
|
||||
NodesStatsResponse nodesStats = client.admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet();
|
||||
assertThat(nodesStats.nodes()[0].indices().getCache().filterSizeInBytes(), equalTo(0l));
|
||||
|
||||
SearchResponse searchResponse = client.prepareSearch().setQuery(filteredQuery(matchAllQuery(), FilterBuilders.termFilter("field", "value").cacheKey("test_key"))).execute().actionGet();
|
||||
assertThat(searchResponse.hits().getHits().length, equalTo(1));
|
||||
nodesStats = client.admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet();
|
||||
assertThat(nodesStats.nodes()[0].indices().getCache().filterSizeInBytes(), greaterThan(0l));
|
||||
|
||||
client.admin().indices().prepareClearCache().setFilterKeys("test_key").execute().actionGet();
|
||||
nodesStats = client.admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet();
|
||||
assertThat(nodesStats.nodes()[0].indices().getCache().filterSizeInBytes(), equalTo(0l));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue