diff --git a/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java b/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java index a9997946542..22d609d3dcf 100644 --- a/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java +++ b/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java @@ -20,6 +20,8 @@ package io.druid.client; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Sequence; @@ -33,6 +35,7 @@ import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.SegmentDescriptor; +import javax.annotation.Nullable; import java.util.ArrayList; public class CachePopulatingQueryRunner implements QueryRunner @@ -72,26 +75,33 @@ public class CachePopulatingQueryRunner implements QueryRunner final boolean populateCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.POPULATE_CACHE, "true")) && strategy != null - && cacheConfig.isPopulateCache() - // historical only populates distributed cache since the cache lookups are done at broker. - && !(cache instanceof MapCache); + && cacheConfig.isPopulateCache(); + + final Sequence results = base.run(query); + if (populateCache) { - Sequence results = base.run(query); - Cache.NamedKey key = CacheUtil.computeSegmentCacheKey( + final Cache.NamedKey key = CacheUtil.computeSegmentCacheKey( segmentIdentifier, segmentDescriptor, strategy.computeCacheKey(query) ); - ArrayList resultAsList = Sequences.toList(results, new ArrayList()); - CacheUtil.populate( - cache, - mapper, - key, - Lists.transform(resultAsList, strategy.prepareForCache()) + + final Function cacheFn = strategy.prepareForCache(); + return Sequences.map( + results, + new Function() + { + @Nullable + @Override + public T apply(@Nullable T input) + { + CacheUtil.populate(cache, mapper, key, ImmutableList.of(cacheFn.apply(input))); + return input; + } + } ); - return Sequences.simple(resultAsList); } else { - return base.run(query); + return results; } } diff --git a/server/src/test/java/io/druid/client/CachePopulatingQueryRunnerTest.java b/server/src/test/java/io/druid/client/CachePopulatingQueryRunnerTest.java index b87cb15b317..bb73c5d58bc 100644 --- a/server/src/test/java/io/druid/client/CachePopulatingQueryRunnerTest.java +++ b/server/src/test/java/io/druid/client/CachePopulatingQueryRunnerTest.java @@ -123,7 +123,7 @@ public class CachePopulatingQueryRunnerTest Sequence res = runner.run(builder.build()); // base sequence is not closed yet - Assert.assertTrue(closable.isClosed()); + Assert.assertFalse("sequence must not be closed", closable.isClosed()); ArrayList results = Sequences.toList(res, new ArrayList()); Assert.assertTrue(closable.isClosed()); Assert.assertEquals(expectedRes, results);