Cache FieldStats

This caches FieldStats at the field level. For one off requests or for
few indicies this doesn't save anything, but when there are 30 indices,
5 shards, 1 replica, 100 parallel requests this is about twice as fast
as not caching. I expect lots of usage won't see much benefit from this
but pointing kibana to a cluster with many indexes and shards, will be
faster.

Closes #18717
This commit is contained in:
Nik Everett 2016-06-06 15:49:50 -04:00
parent e392e0b1df
commit 3032a7c653
8 changed files with 263 additions and 219 deletions

View File

@ -143,7 +143,7 @@ import org.elasticsearch.action.delete.TransportDeleteAction;
import org.elasticsearch.action.explain.ExplainAction;
import org.elasticsearch.action.explain.TransportExplainAction;
import org.elasticsearch.action.fieldstats.FieldStatsAction;
import org.elasticsearch.action.fieldstats.TransportFieldStatsTransportAction;
import org.elasticsearch.action.fieldstats.TransportFieldStatsAction;
import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.action.get.MultiGetAction;
import org.elasticsearch.action.get.TransportGetAction;
@ -344,7 +344,7 @@ public class ActionModule extends AbstractModule {
registerAction(GetStoredScriptAction.INSTANCE, TransportGetStoredScriptAction.class);
registerAction(DeleteStoredScriptAction.INSTANCE, TransportDeleteStoredScriptAction.class);
registerAction(FieldStatsAction.INSTANCE, TransportFieldStatsTransportAction.class);
registerAction(FieldStatsAction.INSTANCE, TransportFieldStatsAction.class);
registerAction(PutPipelineAction.INSTANCE, PutPipelineTransportAction.class);
registerAction(GetPipelineAction.INSTANCE, GetPipelineTransportAction.class);

View File

@ -33,7 +33,6 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
@ -45,27 +44,23 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Map;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Set;
import java.util.HashSet;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReferenceArray;
public class TransportFieldStatsTransportAction extends
public class TransportFieldStatsAction extends
TransportBroadcastAction<FieldStatsRequest, FieldStatsResponse, FieldStatsShardRequest, FieldStatsShardResponse> {
private final IndicesService indicesService;
@Inject
public TransportFieldStatsTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
public TransportFieldStatsAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
IndicesService indicesService) {
@ -195,26 +190,18 @@ public class TransportFieldStatsTransportAction extends
MapperService mapperService = indexServices.mapperService();
IndexShard shard = indexServices.getShard(shardId.id());
try (Engine.Searcher searcher = shard.acquireSearcher("fieldstats")) {
// Resolve patterns and deduplicate
Set<String> fieldNames = new HashSet<>();
for (String field : request.getFields()) {
Collection<String> matchFields;
if (Regex.isSimpleMatchPattern(field)) {
matchFields = mapperService.simpleMatchToIndexNames(field);
} else {
matchFields = Collections.singleton(field);
}
for (String matchField : matchFields) {
MappedFieldType fieldType = mapperService.fullName(matchField);
if (fieldType == null) {
// ignore.
continue;
}
FieldStats<?> stats = fieldType.stats(searcher.reader());
if (stats != null) {
fieldStats.put(matchField, stats);
}
fieldNames.addAll(mapperService.simpleMatchToIndexNames(field));
}
for (String field : fieldNames) {
FieldStats<?> stats = indicesService.getFieldStats(shard, searcher, field);
if (stats != null) {
fieldStats.put(field, stats);
}
}
} catch (IOException e) {
} catch (Exception e) {
throw ExceptionsHelper.convertToElastic(e);
}
return new FieldStatsShardResponse(shardId, fieldStats);

View File

@ -20,15 +20,10 @@
package org.elasticsearch.index.cache.request;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.common.cache.RemovalListener;
import org.elasticsearch.common.cache.RemovalNotification;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesRequestCache;
/**
* Tracks the portion of the request cache in use for a particular shard.
*/
public final class ShardRequestCache {

View File

@ -145,7 +145,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final ShardSearchStats searchStats = new ShardSearchStats();
private final ShardGetService getService;
private final ShardIndexWarmerService shardWarmerService;
private final ShardRequestCache shardQueryCache;
private final ShardRequestCache requestCacheStats;
private final ShardFieldData shardFieldData;
private final IndexFieldDataService indexFieldDataService;
private final ShardBitsetFilterCache shardBitsetFilterCache;
@ -241,7 +241,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
this.searchOperationListener = new SearchOperationListener.CompositeListener(searchListenersList, logger);
this.getService = new ShardGetService(indexSettings, this, mapperService);
this.shardWarmerService = new ShardIndexWarmerService(shardId, indexSettings);
this.shardQueryCache = new ShardRequestCache();
this.requestCacheStats = new ShardRequestCache();
this.shardFieldData = new ShardFieldData();
this.indexFieldDataService = indexFieldDataService;
this.shardBitsetFilterCache = new ShardBitsetFilterCache(shardId, indexSettings);
@ -303,7 +303,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
public ShardRequestCache requestCache() {
return this.shardQueryCache;
return this.requestCacheStats;
}
public ShardFieldData fieldData() {

View File

@ -0,0 +1,102 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices;
import org.apache.lucene.index.DirectoryReader;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.cache.RemovalNotification;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.cache.request.ShardRequestCache;
import org.elasticsearch.index.shard.IndexShard;
import java.io.IOException;
/**
* Abstract base class for the an {@link IndexShard} level {@linkplain IndicesRequestCache.CacheEntity}.
*/
abstract class AbstractIndexShardCacheEntity implements IndicesRequestCache.CacheEntity {
@FunctionalInterface
public interface Loader {
void load(StreamOutput out) throws IOException;
}
private final Loader loader;
private boolean loadedFromCache = true;
protected AbstractIndexShardCacheEntity(Loader loader) {
this.loader = loader;
}
/**
* When called after passing this through
* {@link IndicesRequestCache#getOrCompute(IndicesRequestCache.CacheEntity, DirectoryReader, BytesReference)} this will return whether
* or not the result was loaded from the cache.
*/
public final boolean loadedFromCache() {
return loadedFromCache;
}
/**
* Get the {@linkplain ShardRequestCache} used to track cache statistics.
*/
protected abstract ShardRequestCache stats();
@Override
public final IndicesRequestCache.Value loadValue() throws IOException {
/* BytesStreamOutput allows to pass the expected size but by default uses
* BigArrays.PAGE_SIZE_IN_BYTES which is 16k. A common cached result ie.
* a date histogram with 3 buckets is ~100byte so 16k might be very wasteful
* since we don't shrink to the actual size once we are done serializing.
* By passing 512 as the expected size we will resize the byte array in the stream
* slowly until we hit the page size and don't waste too much memory for small query
* results.*/
final int expectedSizeInBytes = 512;
try (BytesStreamOutput out = new BytesStreamOutput(expectedSizeInBytes)) {
loader.load(out);
// for now, keep the paged data structure, which might have unused bytes to fill a page, but better to keep
// the memory properly paged instead of having varied sized bytes
final BytesReference reference = out.bytes();
loadedFromCache = false;
return new IndicesRequestCache.Value(reference, out.ramBytesUsed());
}
}
@Override
public final void onCached(IndicesRequestCache.Key key, IndicesRequestCache.Value value) {
stats().onCached(key, value);
}
@Override
public final void onHit() {
stats().onHit();
}
@Override
public final void onMiss() {
stats().onMiss();
}
@Override
public final void onRemoval(RemovalNotification<IndicesRequestCache.Key, IndicesRequestCache.Value> notification) {
stats().onRemoval(notification.getKey(), notification.getValue(),
notification.getRemovalReason() == RemovalNotification.RemovalReason.EVICTED);
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.indices;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.CollectionUtil;
@ -28,6 +29,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag;
import org.elasticsearch.action.fieldstats.FieldStats;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.search.SearchType;
@ -39,12 +41,11 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.cache.RemovalNotification;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
@ -71,9 +72,11 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.NodeServicesProvider;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.cache.request.ShardRequestCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.recovery.RecoveryStats;
@ -87,6 +90,7 @@ import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.indices.AbstractIndexShardCacheEntity.Loader;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
@ -1085,9 +1089,10 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService>
if (shard == null) {
return;
}
indicesRequestCache.clear(new IndexShardCacheEntity(shard));
indicesRequestCache.clear(new IndexShardCacheEntity(shard, null));
logger.trace("{} explicit cache clear", shard.shardId());
}
/**
* Loads the cache result, computing it if needed by executing the query phase and otherwise deserializing the cached
* value into the {@link SearchContext#queryResult() context's query result}. The combination of load + compute allows
@ -1096,10 +1101,13 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService>
*/
public void loadIntoContext(ShardSearchRequest request, SearchContext context, QueryPhase queryPhase) throws Exception {
assert canCache(request, context);
final IndexShardCacheEntity entity = new IndexShardCacheEntity(context.indexShard(), queryPhase, context);
final IndexShardCacheEntity entity = new IndexShardCacheEntity(context.indexShard(), out -> {
queryPhase.execute(context);
context.queryResult().writeToNoId(out);
});
final DirectoryReader directoryReader = context.searcher().getDirectoryReader();
final BytesReference bytesReference = indicesRequestCache.getOrCompute(entity, directoryReader, request.cacheKey());
if (entity.loaded == false) { // if we have loaded this we don't need to do anything
if (entity.loadedFromCache()) {
// restore the cached query result into the context
final QuerySearchResult result = context.queryResult();
StreamInput in = new NamedWriteableAwareStreamInput(bytesReference.streamInput(), namedWriteableRegistry);
@ -1108,51 +1116,53 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService>
}
}
static final class IndexShardCacheEntity implements IndicesRequestCache.CacheEntity {
private final QueryPhase queryPhase;
private final SearchContext context;
/**
* Fetch {@linkplain FieldStats} for a field. These stats are cached until the shard changes.
* @param shard the shard to use with the cache key
* @param searcher searcher to use to lookup the field stats
* @param field the actual field
*/
public FieldStats<?> getFieldStats(IndexShard shard, Engine.Searcher searcher, String field) throws Exception {
MappedFieldType fieldType = shard.mapperService().fullName(field);
if (fieldType == null) {
return null;
}
BytesReference cacheKey = new BytesArray("fieldstats:" + field);
BytesReference statsRef = cacheShardLevelResult(shard, searcher.getDirectoryReader(), cacheKey, out -> {
out.writeOptionalWriteable(fieldType.stats(searcher.reader()));
});
try (StreamInput in = StreamInput.wrap(statsRef)) {
return in.readOptionalWriteable(FieldStats::readFrom);
}
}
/**
* Cache something calculated at the shard level.
* @param shard the shard this item is part of
* @param reader a reader for this shard. Used to invalidate the cache when there are changes.
* @param cacheKey key for the thing being cached within this shard
* @param loader loads the data into the cache if needed
* @return the contents of the cache or the result of calling the loader
*/
private BytesReference cacheShardLevelResult(IndexShard shard, DirectoryReader reader, BytesReference cacheKey, Loader loader)
throws Exception {
IndexShardCacheEntity cacheEntity = new IndexShardCacheEntity(shard, loader);
return indicesRequestCache.getOrCompute(cacheEntity, reader, cacheKey);
}
final static class IndexShardCacheEntity extends AbstractIndexShardCacheEntity {
private final IndexShard indexShard;
private final ShardRequestCache requestCache;
private boolean loaded = false;
IndexShardCacheEntity(IndexShard indexShard) {
this(indexShard, null, null);
}
public IndexShardCacheEntity(IndexShard indexShard, QueryPhase queryPhase, SearchContext context) {
this.queryPhase = queryPhase;
this.context = context;
protected IndexShardCacheEntity(IndexShard indexShard, Loader loader) {
super(loader);
this.indexShard = indexShard;
this.requestCache = indexShard.requestCache();
}
@Override
public IndicesRequestCache.Value loadValue() throws IOException {
queryPhase.execute(context);
/* BytesStreamOutput allows to pass the expected size but by default uses
* BigArrays.PAGE_SIZE_IN_BYTES which is 16k. A common cached result ie.
* a date histogram with 3 buckets is ~100byte so 16k might be very wasteful
* since we don't shrink to the actual size once we are done serializing.
* By passing 512 as the expected size we will resize the byte array in the stream
* slowly until we hit the page size and don't waste too much memory for small query
* results.*/
final int expectedSizeInBytes = 512;
try (BytesStreamOutput out = new BytesStreamOutput(expectedSizeInBytes)) {
context.queryResult().writeToNoId(out);
// for now, keep the paged data structure, which might have unused bytes to fill a page, but better to keep
// the memory properly paged instead of having varied sized bytes
final BytesReference reference = out.bytes();
loaded = true;
return new IndicesRequestCache.Value(reference, out.ramBytesUsed());
}
protected ShardRequestCache stats() {
return indexShard.requestCache();
}
@Override
public void onCached(IndicesRequestCache.Key key, IndicesRequestCache.Value value) {
requestCache.onCached(key, value);
}
@Override
public boolean isOpen() {
return indexShard.state() != IndexShardState.CLOSED;
@ -1162,22 +1172,6 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService>
public Object getCacheIdentity() {
return indexShard;
}
@Override
public void onHit() {
requestCache.onHit();
}
@Override
public void onMiss() {
requestCache.onMiss();
}
@Override
public void onRemoval(RemovalNotification<IndicesRequestCache.Key, IndicesRequestCache.Value> notification) {
requestCache.onRemoval(notification.getKey(), notification.getValue(), notification.getRemovalReason() == RemovalNotification.RemovalReason.EVICTED);
}
}
@FunctionalInterface

View File

@ -20,14 +20,13 @@
package org.elasticsearch.fieldstats;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.fieldstats.FieldStats;
import org.elasticsearch.action.fieldstats.FieldStatsAction;
import org.elasticsearch.action.fieldstats.FieldStatsResponse;
import org.elasticsearch.action.fieldstats.IndexConstraint;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.cache.request.RequestCacheStats;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.ArrayList;
@ -40,11 +39,12 @@ import static org.elasticsearch.action.fieldstats.IndexConstraint.Property.MAX;
import static org.elasticsearch.action.fieldstats.IndexConstraint.Property.MIN;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.nullValue;
/**
* Tests for the {@link FieldStatsAction}.
*/
public class FieldStatsIntegrationIT extends ESIntegTestCase {
@ -150,7 +150,7 @@ public class FieldStatsIntegrationIT extends ESIntegTestCase {
.setFields("byte", "short", "integer", "long", "float", "double", "string").get();
assertAllSuccessful(response);
for (FieldStats stats : response.getAllFieldStats().values()) {
for (FieldStats<?> stats : response.getAllFieldStats().values()) {
assertThat(stats.getMaxDoc(), equalTo((long) numDocs));
assertThat(stats.getDocCount(), equalTo((long) numDocs));
assertThat(stats.getDensity(), equalTo(100));
@ -462,6 +462,33 @@ public class FieldStatsIntegrationIT extends ESIntegTestCase {
assertThat(response.getIndicesMergedFieldStats().get("test2").get("foo").getMaxValue(), equalTo(100L));
}
public void testCached() throws Exception {
assertAcked(client().admin().indices().prepareCreate("test").setSettings("index.number_of_replicas", 0));
indexRange("test", "value", 0, 99);
// First query should be a cache miss
FieldStatsResponse fieldStats = client().prepareFieldStats().setFields("value").get();
assertEquals(100, fieldStats.getAllFieldStats().get("value").getDocCount());
RequestCacheStats indexStats = client().admin().indices().prepareStats().get().getIndex("test").getTotal().getRequestCache();
assertEquals(0, indexStats.getHitCount());
assertThat(indexStats.getMemorySizeInBytes(), greaterThan(0L));
// Second query should be a cache hit
fieldStats = client().prepareFieldStats().setFields("value").get();
assertEquals(100, fieldStats.getAllFieldStats().get("value").getDocCount());
indexStats = client().admin().indices().prepareStats().get().getIndex("test").getTotal().getRequestCache();
assertThat(indexStats.getHitCount(), greaterThan(0L));
assertThat(indexStats.getMemorySizeInBytes(), greaterThan(0L));
// Indexing some new documents and refreshing should give you consistent data.
long oldHitCount = indexStats.getHitCount();
indexRange("test", "value", 100, 199);
fieldStats = client().prepareFieldStats().setFields("value").get();
assertEquals(200, fieldStats.getAllFieldStats().get("value").getDocCount());
// Because we refreshed the index we don't have any more hits in the cache. This is read from the index.
assertEquals(oldHitCount, indexStats.getHitCount());
}
private void indexRange(String index, long from, long to) throws Exception {
indexRange(index, "value", from, to);
}

View File

@ -30,9 +30,9 @@ import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.cache.RemovalNotification;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -58,29 +58,30 @@ public class IndicesRequestCacheTests extends ESTestCase {
new ShardId("foo", "bar", 1));
TermQueryBuilder termQuery = new TermQueryBuilder("id", "0");
AtomicBoolean indexShard = new AtomicBoolean(true);
TestEntity entity = new TestEntity(requestCacheStats, reader, indexShard, 0);
// initial cache
TestEntity entity = new TestEntity(requestCacheStats, reader, indexShard, 0);
BytesReference value = cache.getOrCompute(entity, reader, termQuery.buildAsBytes());
assertEquals("foo", value.toUtf8());
assertEquals("foo", StreamInput.wrap(value).readString());
assertEquals(0, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertEquals(1, entity.loaded);
assertFalse(entity.loadedFromCache());
assertEquals(1, cache.count());
// cache hit
entity = new TestEntity(requestCacheStats, reader, indexShard, 0);
value = cache.getOrCompute(entity, reader, termQuery.buildAsBytes());
assertEquals("foo", value.toUtf8());
assertEquals("foo", StreamInput.wrap(value).readString());
assertEquals(1, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertEquals(1, entity.loaded);
assertTrue(entity.loadedFromCache());
assertEquals(1, cache.count());
assertTrue(requestCacheStats.stats().getMemorySize().bytesAsInt() > value.length());
assertEquals(1, cache.numRegisteredCloseListeners());
// release
// Closing the cache doesn't modify an already returned CacheEntity
if (randomBoolean()) {
reader.close();
} else {
@ -91,7 +92,7 @@ public class IndicesRequestCacheTests extends ESTestCase {
assertEquals(1, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertEquals(1, entity.loaded);
assertTrue(entity.loadedFromCache());
assertEquals(0, cache.count());
assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt());
@ -99,43 +100,6 @@ public class IndicesRequestCacheTests extends ESTestCase {
assertEquals(0, cache.numRegisteredCloseListeners());
}
public void testCacheWithDifferentEntityInstance() throws Exception {
IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY);
AtomicBoolean indexShard = new AtomicBoolean(true);
ShardRequestCache requestCacheStats = new ShardRequestCache();
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig());
writer.addDocument(newDoc(0, "foo"));
DirectoryReader reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer),
new ShardId("foo", "bar", 1));
TermQueryBuilder termQuery = new TermQueryBuilder("id", "0");
TestEntity entity = new TestEntity(requestCacheStats, reader, indexShard, 0);
// initial cache
BytesReference value = cache.getOrCompute(entity, reader, termQuery.buildAsBytes());
assertEquals("foo", value.toUtf8());
assertEquals(0, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertEquals(1, entity.loaded);
assertEquals(1, cache.count());
assertEquals(1, cache.numRegisteredCloseListeners());
final int cacheSize = requestCacheStats.stats().getMemorySize().bytesAsInt();
value = cache.getOrCompute(new TestEntity(requestCacheStats, reader, indexShard, 0), reader, termQuery.buildAsBytes());
assertEquals("foo", value.toUtf8());
assertEquals(1, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertEquals(1, entity.loaded);
assertEquals(1, cache.count());
assertEquals(cacheSize, requestCacheStats.stats().getMemorySize().bytesAsInt());
assertEquals(1, cache.numRegisteredCloseListeners());
IOUtils.close(reader, writer, dir, cache);
}
public void testCacheDifferentReaders() throws Exception {
IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY);
AtomicBoolean indexShard = new AtomicBoolean(true);
@ -146,62 +110,60 @@ public class IndicesRequestCacheTests extends ESTestCase {
writer.addDocument(newDoc(0, "foo"));
DirectoryReader reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1));
TermQueryBuilder termQuery = new TermQueryBuilder("id", "0");
TestEntity entity = new TestEntity(requestCacheStats, reader, indexShard, 0);
writer.updateDocument(new Term("id", "0"), newDoc(0, "bar"));
DirectoryReader secondReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1));
TestEntity secondEntity = new TestEntity(requestCacheStats, secondReader, indexShard, 0);
// initial cache
TestEntity entity = new TestEntity(requestCacheStats, reader, indexShard, 0);
BytesReference value = cache.getOrCompute(entity, reader, termQuery.buildAsBytes());
assertEquals("foo", value.toUtf8());
assertEquals("foo", StreamInput.wrap(value).readString());
assertEquals(0, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertEquals(1, entity.loaded);
assertFalse(entity.loadedFromCache());
assertEquals(1, cache.count());
assertTrue(requestCacheStats.stats().getMemorySize().bytesAsInt() > value.length());
final int cacheSize = requestCacheStats.stats().getMemorySize().bytesAsInt();
assertEquals(1, cache.numRegisteredCloseListeners());
// cache the second
TestEntity secondEntity = new TestEntity(requestCacheStats, secondReader, indexShard, 0);
value = cache.getOrCompute(secondEntity, secondReader, termQuery.buildAsBytes());
assertEquals("bar", value.toUtf8());
assertEquals("bar", StreamInput.wrap(value).readString());
assertEquals(0, requestCacheStats.stats().getHitCount());
assertEquals(2, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertEquals(1, entity.loaded);
assertEquals(1, secondEntity.loaded);
assertFalse(secondEntity.loadedFromCache());
assertEquals(2, cache.count());
assertTrue(requestCacheStats.stats().getMemorySize().bytesAsInt() > cacheSize + value.length());
assertEquals(2, cache.numRegisteredCloseListeners());
secondEntity = new TestEntity(requestCacheStats, secondReader, indexShard, 0);
value = cache.getOrCompute(secondEntity, secondReader, termQuery.buildAsBytes());
assertEquals("bar", value.toUtf8());
assertEquals("bar", StreamInput.wrap(value).readString());
assertEquals(1, requestCacheStats.stats().getHitCount());
assertEquals(2, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertEquals(1, entity.loaded);
assertEquals(1, secondEntity.loaded);
assertTrue(secondEntity.loadedFromCache());
assertEquals(2, cache.count());
entity = new TestEntity(requestCacheStats, reader, indexShard, 0);
value = cache.getOrCompute(entity, reader, termQuery.buildAsBytes());
assertEquals("foo", value.toUtf8());
assertEquals("foo", StreamInput.wrap(value).readString());
assertEquals(2, requestCacheStats.stats().getHitCount());
assertEquals(2, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertEquals(1, entity.loaded);
assertEquals(1, secondEntity.loaded);
assertTrue(entity.loadedFromCache());
assertEquals(2, cache.count());
// Closing the cache doesn't change returned entities
reader.close();
cache.cleanCache();
assertEquals(2, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertEquals(1, entity.loaded);
assertEquals(1, secondEntity.loaded);
assertTrue(entity.loadedFromCache());
assertTrue(secondEntity.loadedFromCache());
assertEquals(1, cache.count());
assertEquals(cacheSize, requestCacheStats.stats().getMemorySize().bytesAsInt());
assertEquals(1, cache.numRegisteredCloseListeners());
@ -217,14 +179,13 @@ public class IndicesRequestCacheTests extends ESTestCase {
cache.cleanCache();
assertEquals(2, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertEquals(1, entity.loaded);
assertEquals(1, secondEntity.loaded);
assertTrue(entity.loadedFromCache());
assertTrue(secondEntity.loadedFromCache());
assertEquals(0, cache.count());
assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt());
IOUtils.close(secondReader, writer, dir, cache);
assertEquals(0, cache.numRegisteredCloseListeners());
}
public void testEviction() throws Exception {
@ -248,9 +209,9 @@ public class IndicesRequestCacheTests extends ESTestCase {
TestEntity secondEntity = new TestEntity(requestCacheStats, secondReader, indexShard, 0);
BytesReference value1 = cache.getOrCompute(entity, reader, termQuery.buildAsBytes());
assertEquals("foo", value1.toUtf8());
assertEquals("foo", StreamInput.wrap(value1).readString());
BytesReference value2 = cache.getOrCompute(secondEntity, secondReader, termQuery.buildAsBytes());
assertEquals("bar", value2.toUtf8());
assertEquals("bar", StreamInput.wrap(value2).readString());
size = requestCacheStats.stats().getMemorySize();
IOUtils.close(reader, secondReader, writer, dir, cache);
}
@ -279,12 +240,12 @@ public class IndicesRequestCacheTests extends ESTestCase {
TestEntity thirddEntity = new TestEntity(requestCacheStats, thirdReader, indexShard, 0);
BytesReference value1 = cache.getOrCompute(entity, reader, termQuery.buildAsBytes());
assertEquals("foo", value1.toUtf8());
assertEquals("foo", StreamInput.wrap(value1).readString());
BytesReference value2 = cache.getOrCompute(secondEntity, secondReader, termQuery.buildAsBytes());
assertEquals("bar", value2.toUtf8());
assertEquals("bar", StreamInput.wrap(value2).readString());
logger.info("Memory size: {}", requestCacheStats.stats().getMemorySize());
BytesReference value3 = cache.getOrCompute(thirddEntity, thirdReader, termQuery.buildAsBytes());
assertEquals("baz", value3.toUtf8());
assertEquals("baz", StreamInput.wrap(value3).readString());
assertEquals(2, cache.count());
assertEquals(1, requestCacheStats.stats().getEvictions());
IOUtils.close(reader, secondReader, thirdReader, writer, dir, cache);
@ -316,12 +277,12 @@ public class IndicesRequestCacheTests extends ESTestCase {
TestEntity thirddEntity = new TestEntity(requestCacheStats, thirdReader, differentIdentity, 0);
BytesReference value1 = cache.getOrCompute(entity, reader, termQuery.buildAsBytes());
assertEquals("foo", value1.toUtf8());
assertEquals("foo", StreamInput.wrap(value1).readString());
BytesReference value2 = cache.getOrCompute(secondEntity, secondReader, termQuery.buildAsBytes());
assertEquals("bar", value2.toUtf8());
assertEquals("bar", StreamInput.wrap(value2).readString());
logger.info("Memory size: {}", requestCacheStats.stats().getMemorySize());
BytesReference value3 = cache.getOrCompute(thirddEntity, thirdReader, termQuery.buildAsBytes());
assertEquals("baz", value3.toUtf8());
assertEquals("baz", StreamInput.wrap(value3).readString());
assertEquals(3, cache.count());
final long hitCount = requestCacheStats.stats().getHitCount();
// clear all for the indexShard Idendity even though is't still open
@ -331,7 +292,7 @@ public class IndicesRequestCacheTests extends ESTestCase {
// third has not been validated since it's a different identity
value3 = cache.getOrCompute(thirddEntity, thirdReader, termQuery.buildAsBytes());
assertEquals(hitCount + 1, requestCacheStats.stats().getHitCount());
assertEquals("baz", value3.toUtf8());
assertEquals("baz", StreamInput.wrap(value3).readString());
IOUtils.close(reader, secondReader, thirdReader, writer, dir, cache);
@ -343,59 +304,37 @@ public class IndicesRequestCacheTests extends ESTestCase {
StringField.TYPE_STORED));
}
private class TestEntity implements IndicesRequestCache.CacheEntity {
private final DirectoryReader reader;
private final int id;
private final AtomicBoolean identity;
private class TestEntity extends AbstractIndexShardCacheEntity {
private final AtomicBoolean standInForIndexShard;
private final ShardRequestCache shardRequestCache;
private int loaded;
private TestEntity(ShardRequestCache shardRequestCache, DirectoryReader reader, AtomicBoolean identity, int id) {
this.reader = reader;
this.id = id;
this.identity = identity;
private TestEntity(ShardRequestCache shardRequestCache, DirectoryReader reader, AtomicBoolean standInForIndexShard, int id) {
super(new Loader() {
@Override
public void load(StreamOutput out) throws IOException {
IndexSearcher searcher = new IndexSearcher(reader);
TopDocs topDocs = searcher.search(new TermQuery(new Term("id", Integer.toString(id))), 1);
assertEquals(1, topDocs.totalHits);
Document document = reader.document(topDocs.scoreDocs[0].doc);
out.writeString(document.get("value"));
}
});
this.standInForIndexShard = standInForIndexShard;
this.shardRequestCache = shardRequestCache;
}
@Override
public IndicesRequestCache.Value loadValue() throws IOException {
IndexSearcher searcher = new IndexSearcher(reader);
TopDocs topDocs = searcher.search(new TermQuery(new Term("id", Integer.toString(this.id))), 1);
assertEquals(1, topDocs.totalHits);
Document document = reader.document(topDocs.scoreDocs[0].doc);
BytesArray value = new BytesArray(document.get("value"));
loaded++;
return new IndicesRequestCache.Value(value, value.length());
}
@Override
public void onCached(IndicesRequestCache.Key key, IndicesRequestCache.Value value) {
shardRequestCache.onCached(key, value);
protected ShardRequestCache stats() {
return shardRequestCache;
}
@Override
public boolean isOpen() {
return identity.get();
return standInForIndexShard.get();
}
@Override
public Object getCacheIdentity() {
return identity;
}
@Override
public void onHit() {
shardRequestCache.onHit();
}
@Override
public void onMiss() {
shardRequestCache.onMiss();
}
@Override
public void onRemoval(RemovalNotification<IndicesRequestCache.Key, IndicesRequestCache.Value> notification) {
shardRequestCache.onRemoval(notification.getKey(), notification.getValue(),
notification.getRemovalReason() == RemovalNotification.RemovalReason.EVICTED);
return standInForIndexShard;
}
}
}