Fix the request cache keys to not hold references to the SearchContext. (#21284)

Currently the request cache adds a `CacheEntity` object. It looks quite innocent
but in practice it has a reference to a lambda that knows how to create a value.
The issue is that this lambda has implicit references to the SearchContext
object, which can be quite heavy since it holds a structured representation of
aggregations for instance.

This pull request splits the `CacheEntity` object from the object that generates
cache values.
This commit is contained in:
Adrien Grand 2016-11-03 08:58:15 +01:00 committed by GitHub
parent 5f77ddf45d
commit 5d79eab982
5 changed files with 170 additions and 161 deletions

View File

@ -123,7 +123,7 @@ public class BytesStreamOutput extends StreamOutput implements BytesStream {
}
@Override
public void close() throws IOException {
public void close() {
// empty for now.
}

View File

@ -19,40 +19,15 @@
package org.elasticsearch.indices;
import org.apache.lucene.index.DirectoryReader;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.cache.RemovalNotification;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.cache.request.ShardRequestCache;
import org.elasticsearch.index.shard.IndexShard;
import java.io.IOException;
/**
* Abstract base class for the an {@link IndexShard} level {@linkplain IndicesRequestCache.CacheEntity}.
*/
abstract class AbstractIndexShardCacheEntity implements IndicesRequestCache.CacheEntity {
@FunctionalInterface
public interface Loader {
void load(StreamOutput out) throws IOException;
}
private final Loader loader;
private boolean loadedFromCache = true;
protected AbstractIndexShardCacheEntity(Loader loader) {
this.loader = loader;
}
/**
* When called after passing this through
* {@link IndicesRequestCache#getOrCompute(IndicesRequestCache.CacheEntity, DirectoryReader, BytesReference)} this will return whether
* or not the result was loaded from the cache.
*/
public final boolean loadedFromCache() {
return loadedFromCache;
}
/**
* Get the {@linkplain ShardRequestCache} used to track cache statistics.
@ -60,27 +35,7 @@ abstract class AbstractIndexShardCacheEntity implements IndicesRequestCache.Cach
protected abstract ShardRequestCache stats();
@Override
public final IndicesRequestCache.Value loadValue() throws IOException {
/* BytesStreamOutput allows to pass the expected size but by default uses
* BigArrays.PAGE_SIZE_IN_BYTES which is 16k. A common cached result ie.
* a date histogram with 3 buckets is ~100byte so 16k might be very wasteful
* since we don't shrink to the actual size once we are done serializing.
* By passing 512 as the expected size we will resize the byte array in the stream
* slowly until we hit the page size and don't waste too much memory for small query
* results.*/
final int expectedSizeInBytes = 512;
try (BytesStreamOutput out = new BytesStreamOutput(expectedSizeInBytes)) {
loader.load(out);
// for now, keep the paged data structure, which might have unused bytes to fill a page, but better to keep
// the memory properly paged instead of having varied sized bytes
final BytesReference reference = out.bytes();
loadedFromCache = false;
return new IndicesRequestCache.Value(reference, out.ramBytesUsed());
}
}
@Override
public final void onCached(IndicesRequestCache.Key key, IndicesRequestCache.Value value) {
public final void onCached(IndicesRequestCache.Key key, BytesReference value) {
stats().onCached(key, value);
}
@ -95,7 +50,7 @@ abstract class AbstractIndexShardCacheEntity implements IndicesRequestCache.Cach
}
@Override
public final void onRemoval(RemovalNotification<IndicesRequestCache.Key, IndicesRequestCache.Value> notification) {
public final void onRemoval(RemovalNotification<IndicesRequestCache.Key, BytesReference> notification) {
stats().onRemoval(notification.getKey(), notification.getValue(),
notification.getRemovalReason() == RemovalNotification.RemovalReason.EVICTED);
}

View File

@ -41,12 +41,12 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;
/**
* The indices request cache allows to cache a shard level request stage responses, helping with improving
@ -62,7 +62,7 @@ import java.util.concurrent.ConcurrentMap;
* is functional.
*/
public final class IndicesRequestCache extends AbstractComponent implements RemovalListener<IndicesRequestCache.Key,
IndicesRequestCache.Value>, Closeable {
BytesReference>, Closeable {
/**
* A setting to enable or disable request caching on an index level. Its dynamic by default
@ -79,14 +79,14 @@ public final class IndicesRequestCache extends AbstractComponent implements Remo
private final Set<CleanupKey> keysToClean = ConcurrentCollections.newConcurrentSet();
private final ByteSizeValue size;
private final TimeValue expire;
private final Cache<Key, Value> cache;
private final Cache<Key, BytesReference> cache;
IndicesRequestCache(Settings settings) {
super(settings);
this.size = INDICES_CACHE_QUERY_SIZE.get(settings);
this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null;
long sizeInBytes = size.getBytes();
CacheBuilder<Key, Value> cacheBuilder = CacheBuilder.<Key, Value>builder()
CacheBuilder<Key, BytesReference> cacheBuilder = CacheBuilder.<Key, BytesReference>builder()
.setMaximumWeight(sizeInBytes).weigher((k, v) -> k.ramBytesUsed() + v.ramBytesUsed()).removalListener(this);
if (expire != null) {
cacheBuilder.setExpireAfterAccess(expire);
@ -105,15 +105,16 @@ public final class IndicesRequestCache extends AbstractComponent implements Remo
}
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
public void onRemoval(RemovalNotification<Key, BytesReference> notification) {
notification.getKey().entity.onRemoval(notification);
}
BytesReference getOrCompute(CacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) throws Exception {
BytesReference getOrCompute(CacheEntity cacheEntity, Supplier<BytesReference> loader,
DirectoryReader reader, BytesReference cacheKey) throws Exception {
final Key key = new Key(cacheEntity, reader.getVersion(), cacheKey);
Loader loader = new Loader(cacheEntity);
Value value = cache.computeIfAbsent(key, loader);
if (loader.isLoaded()) {
Loader cacheLoader = new Loader(cacheEntity, loader);
BytesReference value = cache.computeIfAbsent(key, cacheLoader);
if (cacheLoader.isLoaded()) {
key.entity.onMiss();
// see if its the first time we see this reader, and make sure to register a cleanup key
CleanupKey cleanupKey = new CleanupKey(cacheEntity, reader.getVersion());
@ -126,16 +127,18 @@ public final class IndicesRequestCache extends AbstractComponent implements Remo
} else {
key.entity.onHit();
}
return value.reference;
return value;
}
private static class Loader implements CacheLoader<Key, Value> {
private static class Loader implements CacheLoader<Key, BytesReference> {
private final CacheEntity entity;
private final Supplier<BytesReference> loader;
private boolean loaded;
Loader(CacheEntity entity) {
Loader(CacheEntity entity, Supplier<BytesReference> loader) {
this.entity = entity;
this.loader = loader;
}
public boolean isLoaded() {
@ -143,8 +146,8 @@ public final class IndicesRequestCache extends AbstractComponent implements Remo
}
@Override
public Value load(Key key) throws Exception {
Value value = entity.loadValue();
public BytesReference load(Key key) throws Exception {
BytesReference value = loader.get();
entity.onCached(key, value);
loaded = true;
return value;
@ -154,16 +157,12 @@ public final class IndicesRequestCache extends AbstractComponent implements Remo
/**
* Basic interface to make this cache testable.
*/
interface CacheEntity {
/**
* Loads the actual cache value. this is the heavy lifting part.
*/
Value loadValue() throws IOException;
interface CacheEntity extends Accountable {
/**
* Called after the value was loaded via {@link #loadValue()}
* Called after the value was loaded.
*/
void onCached(Key key, Value value);
void onCached(Key key, BytesReference value);
/**
* Returns <code>true</code> iff the resource behind this entity is still open ie.
@ -190,32 +189,12 @@ public final class IndicesRequestCache extends AbstractComponent implements Remo
/**
* Called when this entity instance is removed
*/
void onRemoval(RemovalNotification<Key, Value> notification);
}
static class Value implements Accountable {
final BytesReference reference;
final long ramBytesUsed;
Value(BytesReference reference, long ramBytesUsed) {
this.reference = reference;
this.ramBytesUsed = ramBytesUsed;
}
@Override
public long ramBytesUsed() {
return ramBytesUsed;
}
@Override
public Collection<Accountable> getChildResources() {
return Collections.emptyList();
}
void onRemoval(RemovalNotification<Key, BytesReference> notification);
}
static class Key implements Accountable {
private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class);
public final CacheEntity entity; // use as identity equality
public final long readerVersion; // use the reader version to now keep a reference to a "short" lived reader until its reaped
public final BytesReference value;
@ -228,7 +207,7 @@ public final class IndicesRequestCache extends AbstractComponent implements Remo
@Override
public long ramBytesUsed() {
return RamUsageEstimator.NUM_BYTES_OBJECT_REF + Long.BYTES + value.length();
return BASE_RAM_BYTES_USED + entity.ramBytesUsed() + value.length();
}
@Override

View File

@ -23,11 +23,11 @@ import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
@ -51,9 +51,11 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
@ -98,7 +100,6 @@ import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.indices.AbstractIndexShardCacheEntity.Loader;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
@ -132,8 +133,10 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
@ -1110,7 +1113,7 @@ public class IndicesService extends AbstractLifecycleComponent
if (shard == null) {
return;
}
indicesRequestCache.clear(new IndexShardCacheEntity(shard, null));
indicesRequestCache.clear(new IndexShardCacheEntity(shard));
logger.trace("{} explicit cache clear", shard.shardId());
}
@ -1122,13 +1125,19 @@ public class IndicesService extends AbstractLifecycleComponent
*/
public void loadIntoContext(ShardSearchRequest request, SearchContext context, QueryPhase queryPhase) throws Exception {
assert canCache(request, context);
final IndexShardCacheEntity entity = new IndexShardCacheEntity(context.indexShard(), out -> {
queryPhase.execute(context);
context.queryResult().writeToNoId(out);
});
final DirectoryReader directoryReader = context.searcher().getDirectoryReader();
final BytesReference bytesReference = indicesRequestCache.getOrCompute(entity, directoryReader, request.cacheKey());
if (entity.loadedFromCache()) {
boolean[] loadedFromCache = new boolean[] { true };
BytesReference bytesReference = cacheShardLevelResult(context.indexShard(), directoryReader, request.cacheKey(), out -> {
queryPhase.execute(context);
try {
context.queryResult().writeToNoId(out);
} catch (IOException e) {
throw new AssertionError("Could not serialize response", e);
}
loadedFromCache[0] = false;
});
if (loadedFromCache[0]) {
// restore the cached query result into the context
final QuerySearchResult result = context.queryResult();
StreamInput in = new NamedWriteableAwareStreamInput(bytesReference.streamInput(), namedWriteableRegistry);
@ -1154,7 +1163,11 @@ public class IndicesService extends AbstractLifecycleComponent
}
BytesReference cacheKey = new BytesArray("fieldstats:" + field);
BytesReference statsRef = cacheShardLevelResult(shard, searcher.getDirectoryReader(), cacheKey, out -> {
out.writeOptionalWriteable(fieldType.stats(searcher.reader()));
try {
out.writeOptionalWriteable(fieldType.stats(searcher.reader()));
} catch (IOException e) {
throw new IllegalStateException("Failed to write field stats output", e);
}
});
try (StreamInput in = statsRef.streamInput()) {
return in.readOptionalWriteable(FieldStats::readFrom);
@ -1173,17 +1186,33 @@ public class IndicesService extends AbstractLifecycleComponent
* @param loader loads the data into the cache if needed
* @return the contents of the cache or the result of calling the loader
*/
private BytesReference cacheShardLevelResult(IndexShard shard, DirectoryReader reader, BytesReference cacheKey, Loader loader)
private BytesReference cacheShardLevelResult(IndexShard shard, DirectoryReader reader, BytesReference cacheKey, Consumer<StreamOutput> loader)
throws Exception {
IndexShardCacheEntity cacheEntity = new IndexShardCacheEntity(shard, loader);
return indicesRequestCache.getOrCompute(cacheEntity, reader, cacheKey);
IndexShardCacheEntity cacheEntity = new IndexShardCacheEntity(shard);
Supplier<BytesReference> supplier = () -> {
/* BytesStreamOutput allows to pass the expected size but by default uses
* BigArrays.PAGE_SIZE_IN_BYTES which is 16k. A common cached result ie.
* a date histogram with 3 buckets is ~100byte so 16k might be very wasteful
* since we don't shrink to the actual size once we are done serializing.
* By passing 512 as the expected size we will resize the byte array in the stream
* slowly until we hit the page size and don't waste too much memory for small query
* results.*/
final int expectedSizeInBytes = 512;
try (BytesStreamOutput out = new BytesStreamOutput(expectedSizeInBytes)) {
loader.accept(out);
// for now, keep the paged data structure, which might have unused bytes to fill a page, but better to keep
// the memory properly paged instead of having varied sized bytes
return out.bytes();
}
};
return indicesRequestCache.getOrCompute(cacheEntity, supplier, reader, cacheKey);
}
static final class IndexShardCacheEntity extends AbstractIndexShardCacheEntity {
private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexShardCacheEntity.class);
private final IndexShard indexShard;
protected IndexShardCacheEntity(IndexShard indexShard, Loader loader) {
super(loader);
protected IndexShardCacheEntity(IndexShard indexShard) {
this.indexShard = indexShard;
}
@ -1201,6 +1230,13 @@ public class IndicesService extends AbstractLifecycleComponent
public Object getCacheIdentity() {
return indexShard;
}
@Override
public long ramBytesUsed() {
// No need to take the IndexShard into account since it is shared
// across many entities
return BASE_RAM_BYTES_USED;
}
}
@FunctionalInterface

View File

@ -31,7 +31,7 @@ import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -43,6 +43,7 @@ import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
public class IndicesRequestCacheTests extends ESTestCase {
@ -59,23 +60,25 @@ public class IndicesRequestCacheTests extends ESTestCase {
AtomicBoolean indexShard = new AtomicBoolean(true);
// initial cache
TestEntity entity = new TestEntity(requestCacheStats, reader, indexShard, 0);
BytesReference value = cache.getOrCompute(entity, reader, termQuery.buildAsBytes());
TestEntity entity = new TestEntity(requestCacheStats, indexShard);
Loader loader = new Loader(reader, 0);
BytesReference value = cache.getOrCompute(entity, loader, reader, termQuery.buildAsBytes());
assertEquals("foo", value.streamInput().readString());
assertEquals(0, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertFalse(entity.loadedFromCache());
assertFalse(loader.loadedFromCache);
assertEquals(1, cache.count());
// cache hit
entity = new TestEntity(requestCacheStats, reader, indexShard, 0);
value = cache.getOrCompute(entity, reader, termQuery.buildAsBytes());
entity = new TestEntity(requestCacheStats, indexShard);
loader = new Loader(reader, 0);
value = cache.getOrCompute(entity, loader, reader, termQuery.buildAsBytes());
assertEquals("foo", value.streamInput().readString());
assertEquals(1, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertTrue(entity.loadedFromCache());
assertTrue(loader.loadedFromCache);
assertEquals(1, cache.count());
assertTrue(requestCacheStats.stats().getMemorySize().bytesAsInt() > value.length());
assertEquals(1, cache.numRegisteredCloseListeners());
@ -91,7 +94,7 @@ public class IndicesRequestCacheTests extends ESTestCase {
assertEquals(1, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertTrue(entity.loadedFromCache());
assertTrue(loader.loadedFromCache);
assertEquals(0, cache.count());
assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt());
@ -114,46 +117,50 @@ public class IndicesRequestCacheTests extends ESTestCase {
DirectoryReader secondReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1));
// initial cache
TestEntity entity = new TestEntity(requestCacheStats, reader, indexShard, 0);
BytesReference value = cache.getOrCompute(entity, reader, termQuery.buildAsBytes());
TestEntity entity = new TestEntity(requestCacheStats, indexShard);
Loader loader = new Loader(reader, 0);
BytesReference value = cache.getOrCompute(entity, loader, reader, termQuery.buildAsBytes());
assertEquals("foo", value.streamInput().readString());
assertEquals(0, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertFalse(entity.loadedFromCache());
assertFalse(loader.loadedFromCache);
assertEquals(1, cache.count());
assertTrue(requestCacheStats.stats().getMemorySize().bytesAsInt() > value.length());
final int cacheSize = requestCacheStats.stats().getMemorySize().bytesAsInt();
assertEquals(1, cache.numRegisteredCloseListeners());
// cache the second
TestEntity secondEntity = new TestEntity(requestCacheStats, secondReader, indexShard, 0);
value = cache.getOrCompute(secondEntity, secondReader, termQuery.buildAsBytes());
TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard);
loader = new Loader(secondReader, 0);
value = cache.getOrCompute(entity, loader, secondReader, termQuery.buildAsBytes());
assertEquals("bar", value.streamInput().readString());
assertEquals(0, requestCacheStats.stats().getHitCount());
assertEquals(2, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertFalse(secondEntity.loadedFromCache());
assertFalse(loader.loadedFromCache);
assertEquals(2, cache.count());
assertTrue(requestCacheStats.stats().getMemorySize().bytesAsInt() > cacheSize + value.length());
assertEquals(2, cache.numRegisteredCloseListeners());
secondEntity = new TestEntity(requestCacheStats, secondReader, indexShard, 0);
value = cache.getOrCompute(secondEntity, secondReader, termQuery.buildAsBytes());
secondEntity = new TestEntity(requestCacheStats, indexShard);
loader = new Loader(secondReader, 0);
value = cache.getOrCompute(secondEntity, loader, secondReader, termQuery.buildAsBytes());
assertEquals("bar", value.streamInput().readString());
assertEquals(1, requestCacheStats.stats().getHitCount());
assertEquals(2, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertTrue(secondEntity.loadedFromCache());
assertTrue(loader.loadedFromCache);
assertEquals(2, cache.count());
entity = new TestEntity(requestCacheStats, reader, indexShard, 0);
value = cache.getOrCompute(entity, reader, termQuery.buildAsBytes());
entity = new TestEntity(requestCacheStats, indexShard);
loader = new Loader(reader, 0);
value = cache.getOrCompute(entity, loader, reader, termQuery.buildAsBytes());
assertEquals("foo", value.streamInput().readString());
assertEquals(2, requestCacheStats.stats().getHitCount());
assertEquals(2, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertTrue(entity.loadedFromCache());
assertTrue(loader.loadedFromCache);
assertEquals(2, cache.count());
// Closing the cache doesn't change returned entities
@ -161,8 +168,8 @@ public class IndicesRequestCacheTests extends ESTestCase {
cache.cleanCache();
assertEquals(2, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertTrue(entity.loadedFromCache());
assertTrue(secondEntity.loadedFromCache());
assertTrue(loader.loadedFromCache);
assertTrue(loader.loadedFromCache);
assertEquals(1, cache.count());
assertEquals(cacheSize, requestCacheStats.stats().getMemorySize().bytesAsInt());
assertEquals(1, cache.numRegisteredCloseListeners());
@ -178,8 +185,8 @@ public class IndicesRequestCacheTests extends ESTestCase {
cache.cleanCache();
assertEquals(2, requestCacheStats.stats().getMissCount());
assertEquals(0, requestCacheStats.stats().getEvictions());
assertTrue(entity.loadedFromCache());
assertTrue(secondEntity.loadedFromCache());
assertTrue(loader.loadedFromCache);
assertTrue(loader.loadedFromCache);
assertEquals(0, cache.count());
assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt());
@ -200,16 +207,18 @@ public class IndicesRequestCacheTests extends ESTestCase {
DirectoryReader reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer),
new ShardId("foo", "bar", 1));
TermQueryBuilder termQuery = new TermQueryBuilder("id", "0");
TestEntity entity = new TestEntity(requestCacheStats, reader, indexShard, 0);
TestEntity entity = new TestEntity(requestCacheStats, indexShard);
Loader loader = new Loader(reader, 0);
writer.updateDocument(new Term("id", "0"), newDoc(0, "bar"));
DirectoryReader secondReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer),
new ShardId("foo", "bar", 1));
TestEntity secondEntity = new TestEntity(requestCacheStats, secondReader, indexShard, 0);
BytesReference value1 = cache.getOrCompute(entity, reader, termQuery.buildAsBytes());
TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard);
Loader secondLoader = new Loader(secondReader, 0);
BytesReference value1 = cache.getOrCompute(entity, loader, reader, termQuery.buildAsBytes());
assertEquals("foo", value1.streamInput().readString());
BytesReference value2 = cache.getOrCompute(secondEntity, secondReader, termQuery.buildAsBytes());
BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termQuery.buildAsBytes());
assertEquals("bar", value2.streamInput().readString());
size = requestCacheStats.stats().getMemorySize();
IOUtils.close(reader, secondReader, writer, dir, cache);
@ -226,24 +235,27 @@ public class IndicesRequestCacheTests extends ESTestCase {
DirectoryReader reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer),
new ShardId("foo", "bar", 1));
TermQueryBuilder termQuery = new TermQueryBuilder("id", "0");
TestEntity entity = new TestEntity(requestCacheStats, reader, indexShard, 0);
TestEntity entity = new TestEntity(requestCacheStats, indexShard);
Loader loader = new Loader(reader, 0);
writer.updateDocument(new Term("id", "0"), newDoc(0, "bar"));
DirectoryReader secondReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer),
new ShardId("foo", "bar", 1));
TestEntity secondEntity = new TestEntity(requestCacheStats, secondReader, indexShard, 0);
TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard);
Loader secondLoader = new Loader(secondReader, 0);
writer.updateDocument(new Term("id", "0"), newDoc(0, "baz"));
DirectoryReader thirdReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer),
new ShardId("foo", "bar", 1));
TestEntity thirddEntity = new TestEntity(requestCacheStats, thirdReader, indexShard, 0);
TestEntity thirddEntity = new TestEntity(requestCacheStats, indexShard);
Loader thirdLoader = new Loader(thirdReader, 0);
BytesReference value1 = cache.getOrCompute(entity, reader, termQuery.buildAsBytes());
BytesReference value1 = cache.getOrCompute(entity, loader, reader, termQuery.buildAsBytes());
assertEquals("foo", value1.streamInput().readString());
BytesReference value2 = cache.getOrCompute(secondEntity, secondReader, termQuery.buildAsBytes());
BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termQuery.buildAsBytes());
assertEquals("bar", value2.streamInput().readString());
logger.info("Memory size: {}", requestCacheStats.stats().getMemorySize());
BytesReference value3 = cache.getOrCompute(thirddEntity, thirdReader, termQuery.buildAsBytes());
BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termQuery.buildAsBytes());
assertEquals("baz", value3.streamInput().readString());
assertEquals(2, cache.count());
assertEquals(1, requestCacheStats.stats().getEvictions());
@ -262,25 +274,28 @@ public class IndicesRequestCacheTests extends ESTestCase {
DirectoryReader reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer),
new ShardId("foo", "bar", 1));
TermQueryBuilder termQuery = new TermQueryBuilder("id", "0");
TestEntity entity = new TestEntity(requestCacheStats, reader, indexShard, 0);
TestEntity entity = new TestEntity(requestCacheStats, indexShard);
Loader loader = new Loader(reader, 0);
writer.updateDocument(new Term("id", "0"), newDoc(0, "bar"));
DirectoryReader secondReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer),
new ShardId("foo", "bar", 1));
TestEntity secondEntity = new TestEntity(requestCacheStats, secondReader, indexShard, 0);
TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard);
Loader secondLoader = new Loader(secondReader, 0);
writer.updateDocument(new Term("id", "0"), newDoc(0, "baz"));
DirectoryReader thirdReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer),
new ShardId("foo", "bar", 1));
AtomicBoolean differentIdentity = new AtomicBoolean(true);
TestEntity thirddEntity = new TestEntity(requestCacheStats, thirdReader, differentIdentity, 0);
TestEntity thirddEntity = new TestEntity(requestCacheStats, differentIdentity);
Loader thirdLoader = new Loader(thirdReader, 0);
BytesReference value1 = cache.getOrCompute(entity, reader, termQuery.buildAsBytes());
BytesReference value1 = cache.getOrCompute(entity, loader, reader, termQuery.buildAsBytes());
assertEquals("foo", value1.streamInput().readString());
BytesReference value2 = cache.getOrCompute(secondEntity, secondReader, termQuery.buildAsBytes());
BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termQuery.buildAsBytes());
assertEquals("bar", value2.streamInput().readString());
logger.info("Memory size: {}", requestCacheStats.stats().getMemorySize());
BytesReference value3 = cache.getOrCompute(thirddEntity, thirdReader, termQuery.buildAsBytes());
BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termQuery.buildAsBytes());
assertEquals("baz", value3.streamInput().readString());
assertEquals(3, cache.count());
final long hitCount = requestCacheStats.stats().getHitCount();
@ -289,7 +304,7 @@ public class IndicesRequestCacheTests extends ESTestCase {
cache.cleanCache();
assertEquals(1, cache.count());
// third has not been validated since it's a different identity
value3 = cache.getOrCompute(thirddEntity, thirdReader, termQuery.buildAsBytes());
value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termQuery.buildAsBytes());
assertEquals(hitCount + 1, requestCacheStats.stats().getHitCount());
assertEquals("baz", value3.streamInput().readString());
@ -303,20 +318,39 @@ public class IndicesRequestCacheTests extends ESTestCase {
StringField.TYPE_STORED));
}
private static class Loader implements Supplier<BytesReference> {
private final DirectoryReader reader;
private final int id;
public boolean loadedFromCache = true;
public Loader(DirectoryReader reader, int id) {
super();
this.reader = reader;
this.id = id;
}
@Override
public BytesReference get() {
try (BytesStreamOutput out = new BytesStreamOutput()) {
IndexSearcher searcher = new IndexSearcher(reader);
TopDocs topDocs = searcher.search(new TermQuery(new Term("id", Integer.toString(id))), 1);
assertEquals(1, topDocs.totalHits);
Document document = reader.document(topDocs.scoreDocs[0].doc);
out.writeString(document.get("value"));
loadedFromCache = false;
return out.bytes();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
private class TestEntity extends AbstractIndexShardCacheEntity {
private final AtomicBoolean standInForIndexShard;
private final ShardRequestCache shardRequestCache;
private TestEntity(ShardRequestCache shardRequestCache, DirectoryReader reader, AtomicBoolean standInForIndexShard, int id) {
super(new Loader() {
@Override
public void load(StreamOutput out) throws IOException {
IndexSearcher searcher = new IndexSearcher(reader);
TopDocs topDocs = searcher.search(new TermQuery(new Term("id", Integer.toString(id))), 1);
assertEquals(1, topDocs.totalHits);
Document document = reader.document(topDocs.scoreDocs[0].doc);
out.writeString(document.get("value"));
}
});
private TestEntity(ShardRequestCache shardRequestCache, AtomicBoolean standInForIndexShard) {
this.standInForIndexShard = standInForIndexShard;
this.shardRequestCache = shardRequestCache;
}
@ -335,5 +369,10 @@ public class IndicesRequestCacheTests extends ESTestCase {
public Object getCacheIdentity() {
return standInForIndexShard;
}
@Override
public long ramBytesUsed() {
return 42;
}
}
}