From ee344154705ca594abca9f6ca6351f650a802c34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 1 Apr 2014 22:11:58 -0700 Subject: [PATCH 1/2] populate cache without materializing results --- .../client/CachePopulatingQueryRunner.java | 36 ++++++++++++------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java b/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java index a9997946542..22d609d3dcf 100644 --- a/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java +++ b/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java @@ -20,6 +20,8 @@ package io.druid.client; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Sequence; @@ -33,6 +35,7 @@ import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.SegmentDescriptor; +import javax.annotation.Nullable; import java.util.ArrayList; public class CachePopulatingQueryRunner implements QueryRunner @@ -72,26 +75,33 @@ public class CachePopulatingQueryRunner implements QueryRunner final boolean populateCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.POPULATE_CACHE, "true")) && strategy != null - && cacheConfig.isPopulateCache() - // historical only populates distributed cache since the cache lookups are done at broker. - && !(cache instanceof MapCache); + && cacheConfig.isPopulateCache(); + + final Sequence results = base.run(query); + if (populateCache) { - Sequence results = base.run(query); - Cache.NamedKey key = CacheUtil.computeSegmentCacheKey( + final Cache.NamedKey key = CacheUtil.computeSegmentCacheKey( segmentIdentifier, segmentDescriptor, strategy.computeCacheKey(query) ); - ArrayList resultAsList = Sequences.toList(results, new ArrayList()); - CacheUtil.populate( - cache, - mapper, - key, - Lists.transform(resultAsList, strategy.prepareForCache()) + + final Function cacheFn = strategy.prepareForCache(); + return Sequences.map( + results, + new Function() + { + @Nullable + @Override + public T apply(@Nullable T input) + { + CacheUtil.populate(cache, mapper, key, ImmutableList.of(cacheFn.apply(input))); + return input; + } + } ); - return Sequences.simple(resultAsList); } else { - return base.run(query); + return results; } } From 458a873505a911c535775b0b4a52dda2098599dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Wed, 2 Apr 2014 15:37:54 -0700 Subject: [PATCH 2/2] make test reflect the fact that resulting sequence is not consumed yet --- .../java/io/druid/client/CachePopulatingQueryRunnerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/io/druid/client/CachePopulatingQueryRunnerTest.java b/server/src/test/java/io/druid/client/CachePopulatingQueryRunnerTest.java index b87cb15b317..bb73c5d58bc 100644 --- a/server/src/test/java/io/druid/client/CachePopulatingQueryRunnerTest.java +++ b/server/src/test/java/io/druid/client/CachePopulatingQueryRunnerTest.java @@ -123,7 +123,7 @@ public class CachePopulatingQueryRunnerTest Sequence res = runner.run(builder.build()); // base sequence is not closed yet - Assert.assertTrue(closable.isClosed()); + Assert.assertFalse("sequence must not be closed", closable.isClosed()); ArrayList results = Sequences.toList(res, new ArrayList()); Assert.assertTrue(closable.isClosed()); Assert.assertEquals(expectedRes, results);