diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesRequestCache.java b/core/src/main/java/org/elasticsearch/indices/IndicesRequestCache.java index 0fcda5c8fd5..e7bd76fb34d 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesRequestCache.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesRequestCache.java @@ -130,6 +130,16 @@ public final class IndicesRequestCache extends AbstractComponent implements Remo return value; } + /** + * Invalidates the given the cache entry for the given key and it's context + * @param cacheEntity the cache entity to invalidate for + * @param reader the reader to invalidate the cache entry for + * @param cacheKey the cache key to invalidate + */ + void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) { + cache.invalidate(new Key(cacheEntity, reader.getVersion(), cacheKey)); + } + private static class Loader implements CacheLoader { private final CacheEntity entity; diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesService.java b/core/src/main/java/org/elasticsearch/indices/IndicesService.java index 6778b941219..ee1048a81d9 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -1134,17 +1134,28 @@ public class IndicesService extends AbstractLifecycleComponent queryPhase.execute(context); try { context.queryResult().writeToNoId(out); + } catch (IOException e) { throw new AssertionError("Could not serialize response", e); } loadedFromCache[0] = false; }); + if (loadedFromCache[0]) { // restore the cached query result into the context final QuerySearchResult result = context.queryResult(); StreamInput in = new NamedWriteableAwareStreamInput(bytesReference.streamInput(), namedWriteableRegistry); result.readFromWithId(context.id(), in); result.shardTarget(context.shardTarget()); + } else if (context.queryResult().searchTimedOut()) { + // we have to invalidate the cache entry if we cached a query result form a request that timed out. + // we can't really throw exceptions in the loading part to signal a timed out search to the outside world since if there are + // multiple requests that wait for the cache entry to be calculated they'd fail all with the same exception. + // instead we all caching such a result for the time being, return the timed out result for all other searches with that cache + // key invalidate the result in the thread that caused the timeout. This will end up to be simpler and eventually correct since + // running a search that times out concurrently will likely timeout again if it's run while we have this `stale` result in the + // cache. One other option is to not cache requests with a timeout at all... + indicesRequestCache.invalidate(new IndexShardCacheEntity(context.indexShard()), directoryReader, request.cacheKey()); } } diff --git a/core/src/test/java/org/elasticsearch/indices/IndicesRequestCacheTests.java b/core/src/test/java/org/elasticsearch/indices/IndicesRequestCacheTests.java index 5d5584a156f..f129bdea398 100644 --- a/core/src/test/java/org/elasticsearch/indices/IndicesRequestCacheTests.java +++ b/core/src/test/java/org/elasticsearch/indices/IndicesRequestCacheTests.java @@ -169,7 +169,6 @@ public class IndicesRequestCacheTests extends ESTestCase { assertEquals(2, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); assertTrue(loader.loadedFromCache); - assertTrue(loader.loadedFromCache); assertEquals(1, cache.count()); assertEquals(cacheSize, requestCacheStats.stats().getMemorySize().bytesAsInt()); assertEquals(1, cache.numRegisteredCloseListeners()); @@ -186,7 +185,6 @@ public class IndicesRequestCacheTests extends ESTestCase { assertEquals(2, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); assertTrue(loader.loadedFromCache); - assertTrue(loader.loadedFromCache); assertEquals(0, cache.count()); assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); @@ -215,7 +213,7 @@ public class IndicesRequestCacheTests extends ESTestCase { new ShardId("foo", "bar", 1)); TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard); Loader secondLoader = new Loader(secondReader, 0); - + BytesReference value1 = cache.getOrCompute(entity, loader, reader, termQuery.buildAsBytes()); assertEquals("foo", value1.streamInput().readString()); BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termQuery.buildAsBytes()); @@ -347,6 +345,74 @@ public class IndicesRequestCacheTests extends ESTestCase { } + public void testInvalidate() throws Exception { + ShardRequestCache requestCacheStats = new ShardRequestCache(); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer), + new ShardId("foo", "bar", 1)); + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); + AtomicBoolean indexShard = new AtomicBoolean(true); + + // initial cache + TestEntity entity = new TestEntity(requestCacheStats, indexShard); + Loader loader = new Loader(reader, 0); + BytesReference value = cache.getOrCompute(entity, loader, reader, termQuery.buildAsBytes()); + assertEquals("foo", value.streamInput().readString()); + assertEquals(0, requestCacheStats.stats().getHitCount()); + assertEquals(1, requestCacheStats.stats().getMissCount()); + assertEquals(0, requestCacheStats.stats().getEvictions()); + assertFalse(loader.loadedFromCache); + assertEquals(1, cache.count()); + + // cache hit + entity = new TestEntity(requestCacheStats, indexShard); + loader = new Loader(reader, 0); + value = cache.getOrCompute(entity, loader, reader, termQuery.buildAsBytes()); + assertEquals("foo", value.streamInput().readString()); + assertEquals(1, requestCacheStats.stats().getHitCount()); + assertEquals(1, requestCacheStats.stats().getMissCount()); + assertEquals(0, requestCacheStats.stats().getEvictions()); + assertTrue(loader.loadedFromCache); + assertEquals(1, cache.count()); + assertTrue(requestCacheStats.stats().getMemorySize().bytesAsInt() > value.length()); + assertEquals(1, cache.numRegisteredCloseListeners()); + + // load again after invalidate + entity = new TestEntity(requestCacheStats, indexShard); + loader = new Loader(reader, 0); + cache.invalidate(entity, reader, termQuery.buildAsBytes()); + value = cache.getOrCompute(entity, loader, reader, termQuery.buildAsBytes()); + assertEquals("foo", value.streamInput().readString()); + assertEquals(1, requestCacheStats.stats().getHitCount()); + assertEquals(2, requestCacheStats.stats().getMissCount()); + assertEquals(0, requestCacheStats.stats().getEvictions()); + assertFalse(loader.loadedFromCache); + assertEquals(1, cache.count()); + assertTrue(requestCacheStats.stats().getMemorySize().bytesAsInt() > value.length()); + assertEquals(1, cache.numRegisteredCloseListeners()); + + // release + if (randomBoolean()) { + reader.close(); + } else { + indexShard.set(false); // closed shard but reader is still open + cache.clear(entity); + } + cache.cleanCache(); + assertEquals(1, requestCacheStats.stats().getHitCount()); + assertEquals(2, requestCacheStats.stats().getMissCount()); + assertEquals(0, requestCacheStats.stats().getEvictions()); + assertEquals(0, cache.count()); + assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); + + IOUtils.close(reader, writer, dir, cache); + assertEquals(0, cache.numRegisteredCloseListeners()); + } + private class TestEntity extends AbstractIndexShardCacheEntity { private final AtomicBoolean standInForIndexShard; private final ShardRequestCache shardRequestCache;