Adds trace logging to IndicesRequestCache (#34180)

* Adds trace logging to IndicesRequestCache

This change adds trace level logging to `IndicesrrequestCache` witht eh
primary aim of helping to identify the cause of teh failures in
https://github.com/elastic/elasticsearch/issues/32827. The cache will
log at trace level when a cache hit or miss occurs including the reader
version and the cache key. Note that this change adds a
`cacheKeyRenderer` whcih supplies a human readable String of the cache
key since the actual cache key itself is a `BytesReference` containing
the wire protocol serialised form of the request.

Logging is also added for the case where a search timeout occurs and fr
that reason the cache entry is invalidated.

* Adds comment to remaind us to remove cacheKeyRenderer
This commit is contained in:
Colin Goodheart-Smithe 2018-10-03 08:58:33 +01:00 committed by GitHub
parent d7893fd1e4
commit 2d64e3db9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 48 additions and 26 deletions

View File

@ -21,6 +21,9 @@ package org.elasticsearch.indices;
import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.ObjectSet;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.Accountable;
@ -75,6 +78,8 @@ public final class IndicesRequestCache extends AbstractComponent implements Remo
public static final Setting<TimeValue> INDICES_CACHE_QUERY_EXPIRE =
Setting.positiveTimeSetting("indices.requests.cache.expire", new TimeValue(0), Property.NodeScope);
private static final Logger LOGGER = LogManager.getLogger(IndicesRequestCache.class);
private final ConcurrentMap<CleanupKey, Boolean> registeredClosedListeners = ConcurrentCollections.newConcurrentMap();
private final Set<CleanupKey> keysToClean = ConcurrentCollections.newConcurrentSet();
private final ByteSizeValue size;
@ -109,13 +114,19 @@ public final class IndicesRequestCache extends AbstractComponent implements Remo
notification.getKey().entity.onRemoval(notification);
}
// NORELEASE The cacheKeyRenderer has been added in order to debug
// https://github.com/elastic/elasticsearch/issues/32827, it should be
// removed when this issue is solved
BytesReference getOrCompute(CacheEntity cacheEntity, Supplier<BytesReference> loader,
DirectoryReader reader, BytesReference cacheKey) throws Exception {
DirectoryReader reader, BytesReference cacheKey, Supplier<String> cacheKeyRenderer) throws Exception {
final Key key = new Key(cacheEntity, reader.getVersion(), cacheKey);
Loader cacheLoader = new Loader(cacheEntity, loader);
BytesReference value = cache.computeIfAbsent(key, cacheLoader);
if (cacheLoader.isLoaded()) {
key.entity.onMiss();
if (logger.isTraceEnabled()) {
logger.trace("Cache miss for reader version [{}] and request:\n {}", reader.getVersion(), cacheKeyRenderer.get());
}
// see if its the first time we see this reader, and make sure to register a cleanup key
CleanupKey cleanupKey = new CleanupKey(cacheEntity, reader.getVersion());
if (!registeredClosedListeners.containsKey(cleanupKey)) {
@ -126,6 +137,9 @@ public final class IndicesRequestCache extends AbstractComponent implements Remo
}
} else {
key.entity.onHit();
if (logger.isTraceEnabled()) {
logger.trace("Cache hit for reader version [{}] and request:\n {}", reader.getVersion(), cacheKeyRenderer.get());
}
}
return value;
}

View File

@ -1191,7 +1191,9 @@ public class IndicesService extends AbstractLifecycleComponent
final DirectoryReader directoryReader = context.searcher().getDirectoryReader();
boolean[] loadedFromCache = new boolean[] { true };
BytesReference bytesReference = cacheShardLevelResult(context.indexShard(), directoryReader, request.cacheKey(), out -> {
BytesReference bytesReference = cacheShardLevelResult(context.indexShard(), directoryReader, request.cacheKey(), () -> {
return "Shard: " + request.shardId() + "\nSource:\n" + request.source();
}, out -> {
queryPhase.execute(context);
try {
context.queryResult().writeToNoId(out);
@ -1217,6 +1219,10 @@ public class IndicesService extends AbstractLifecycleComponent
// running a search that times out concurrently will likely timeout again if it's run while we have this `stale` result in the
// cache. One other option is to not cache requests with a timeout at all...
indicesRequestCache.invalidate(new IndexShardCacheEntity(context.indexShard()), directoryReader, request.cacheKey());
if (logger.isTraceEnabled()) {
logger.trace("Query timed out, invalidating cache entry for request on shard [{}]:\n {}", request.shardId(),
request.source());
}
}
}
@ -1232,8 +1238,8 @@ public class IndicesService extends AbstractLifecycleComponent
* @param loader loads the data into the cache if needed
* @return the contents of the cache or the result of calling the loader
*/
private BytesReference cacheShardLevelResult(IndexShard shard, DirectoryReader reader, BytesReference cacheKey, Consumer<StreamOutput> loader)
throws Exception {
private BytesReference cacheShardLevelResult(IndexShard shard, DirectoryReader reader, BytesReference cacheKey,
Supplier<String> cacheKeyRenderer, Consumer<StreamOutput> loader) throws Exception {
IndexShardCacheEntity cacheEntity = new IndexShardCacheEntity(shard);
Supplier<BytesReference> supplier = () -> {
/* BytesStreamOutput allows to pass the expected size but by default uses
@ -1251,7 +1257,7 @@ public class IndicesService extends AbstractLifecycleComponent
return out.bytes();
}
};
return indicesRequestCache.getOrCompute(cacheEntity, supplier, reader, cacheKey);
return indicesRequestCache.getOrCompute(cacheEntity, supplier, reader, cacheKey, cacheKeyRenderer);
}
static final class IndexShardCacheEntity extends AbstractIndexShardCacheEntity {

View File

@ -33,6 +33,7 @@ import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.joda.time.DateTimeZone;
import java.time.ZoneOffset;
@ -49,6 +50,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSear
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
@TestLogging(value = "org.elasticsearch.indices.IndicesRequestCache:TRACE")
public class IndicesRequestCacheIT extends ESIntegTestCase {
// One of the primary purposes of the query cache is to cache aggs results
@ -417,8 +419,8 @@ public class IndicesRequestCacheIT extends ESIntegTestCase {
.getRequestCache();
// Check the hit count and miss count together so if they are not
// correct we can see both values
assertEquals(Arrays.asList(expectedHits, expectedMisses),
Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount()));
assertEquals(Arrays.asList(expectedHits, expectedMisses, 0L),
Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount(), requestCacheStats.getEvictions()));
}
}

View File

@ -31,7 +31,6 @@ import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
@ -39,6 +38,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.cache.request.ShardRequestCache;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.shard.ShardId;
@ -68,7 +68,7 @@ public class IndicesRequestCacheTests extends ESTestCase {
// initial cache
TestEntity entity = new TestEntity(requestCacheStats, indexShard);
Loader loader = new Loader(reader, 0);
BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes);
BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value.streamInput().readString());
assertEquals(0, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
@ -79,7 +79,7 @@ public class IndicesRequestCacheTests extends ESTestCase {
// cache hit
entity = new TestEntity(requestCacheStats, indexShard);
loader = new Loader(reader, 0);
value = cache.getOrCompute(entity, loader, reader, termBytes);
value = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value.streamInput().readString());
assertEquals(1, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
@ -126,7 +126,7 @@ public class IndicesRequestCacheTests extends ESTestCase {
// initial cache
TestEntity entity = new TestEntity(requestCacheStats, indexShard);
Loader loader = new Loader(reader, 0);
BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes);
BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value.streamInput().readString());
assertEquals(0, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
@ -140,7 +140,7 @@ public class IndicesRequestCacheTests extends ESTestCase {
// cache the second
TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard);
loader = new Loader(secondReader, 0);
value = cache.getOrCompute(entity, loader, secondReader, termBytes);
value = cache.getOrCompute(entity, loader, secondReader, termBytes, () -> termQuery.toString());
assertEquals("bar", value.streamInput().readString());
assertEquals(0, requestCacheStats.stats().getHitCount());
assertEquals(2, requestCacheStats.stats().getMissCount());
@ -152,7 +152,7 @@ public class IndicesRequestCacheTests extends ESTestCase {
secondEntity = new TestEntity(requestCacheStats, indexShard);
loader = new Loader(secondReader, 0);
value = cache.getOrCompute(secondEntity, loader, secondReader, termBytes);
value = cache.getOrCompute(secondEntity, loader, secondReader, termBytes, () -> termQuery.toString());
assertEquals("bar", value.streamInput().readString());
assertEquals(1, requestCacheStats.stats().getHitCount());
assertEquals(2, requestCacheStats.stats().getMissCount());
@ -162,7 +162,7 @@ public class IndicesRequestCacheTests extends ESTestCase {
entity = new TestEntity(requestCacheStats, indexShard);
loader = new Loader(reader, 0);
value = cache.getOrCompute(entity, loader, reader, termBytes);
value = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value.streamInput().readString());
assertEquals(2, requestCacheStats.stats().getHitCount());
assertEquals(2, requestCacheStats.stats().getMissCount());
@ -222,9 +222,9 @@ public class IndicesRequestCacheTests extends ESTestCase {
TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard);
Loader secondLoader = new Loader(secondReader, 0);
BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes);
BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value1.streamInput().readString());
BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes);
BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes, () -> termQuery.toString());
assertEquals("bar", value2.streamInput().readString());
size = requestCacheStats.stats().getMemorySize();
IOUtils.close(reader, secondReader, writer, dir, cache);
@ -257,12 +257,12 @@ public class IndicesRequestCacheTests extends ESTestCase {
TestEntity thirddEntity = new TestEntity(requestCacheStats, indexShard);
Loader thirdLoader = new Loader(thirdReader, 0);
BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes);
BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value1.streamInput().readString());
BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes);
BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes, () -> termQuery.toString());
assertEquals("bar", value2.streamInput().readString());
logger.info("Memory size: {}", requestCacheStats.stats().getMemorySize());
BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes);
BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes, () -> termQuery.toString());
assertEquals("baz", value3.streamInput().readString());
assertEquals(2, cache.count());
assertEquals(1, requestCacheStats.stats().getEvictions());
@ -298,12 +298,12 @@ public class IndicesRequestCacheTests extends ESTestCase {
TestEntity thirddEntity = new TestEntity(requestCacheStats, differentIdentity);
Loader thirdLoader = new Loader(thirdReader, 0);
BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes);
BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value1.streamInput().readString());
BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes);
BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes, () -> termQuery.toString());
assertEquals("bar", value2.streamInput().readString());
logger.info("Memory size: {}", requestCacheStats.stats().getMemorySize());
BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes);
BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes, () -> termQuery.toString());
assertEquals("baz", value3.streamInput().readString());
assertEquals(3, cache.count());
final long hitCount = requestCacheStats.stats().getHitCount();
@ -312,7 +312,7 @@ public class IndicesRequestCacheTests extends ESTestCase {
cache.cleanCache();
assertEquals(1, cache.count());
// third has not been validated since it's a different identity
value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes);
value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes, () -> termQuery.toString());
assertEquals(hitCount + 1, requestCacheStats.stats().getHitCount());
assertEquals("baz", value3.streamInput().readString());
@ -371,7 +371,7 @@ public class IndicesRequestCacheTests extends ESTestCase {
// initial cache
TestEntity entity = new TestEntity(requestCacheStats, indexShard);
Loader loader = new Loader(reader, 0);
BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes);
BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value.streamInput().readString());
assertEquals(0, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
@ -382,7 +382,7 @@ public class IndicesRequestCacheTests extends ESTestCase {
// cache hit
entity = new TestEntity(requestCacheStats, indexShard);
loader = new Loader(reader, 0);
value = cache.getOrCompute(entity, loader, reader, termBytes);
value = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value.streamInput().readString());
assertEquals(1, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
@ -396,7 +396,7 @@ public class IndicesRequestCacheTests extends ESTestCase {
entity = new TestEntity(requestCacheStats, indexShard);
loader = new Loader(reader, 0);
cache.invalidate(entity, reader, termBytes);
value = cache.getOrCompute(entity, loader, reader, termBytes);
value = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value.streamInput().readString());
assertEquals(1, requestCacheStats.stats().getHitCount());
assertEquals(2, requestCacheStats.stats().getMissCount());