From d6eebce9984fcc421fcecb8fbd18a720faf3bee2 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 3 Apr 2014 10:38:43 +0530 Subject: [PATCH] Revert "Merge pull request #457 from metamx/stream-cache" This reverts commit 5fae4d9abcacb44df8b0cd2f999987cff37a0aef, reversing changes made to 43a0554179f7d514b2e417e8045134656ab28e06. --- .../client/CachePopulatingQueryRunner.java | 36 +++++++------------ .../CachePopulatingQueryRunnerTest.java | 2 +- 2 files changed, 14 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java b/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java index 22d609d3dcf..a9997946542 100644 --- a/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java +++ b/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java @@ -20,8 +20,6 @@ package io.druid.client; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Sequence; @@ -35,7 +33,6 @@ import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.SegmentDescriptor; -import javax.annotation.Nullable; import java.util.ArrayList; public class CachePopulatingQueryRunner implements QueryRunner @@ -75,33 +72,26 @@ public class CachePopulatingQueryRunner implements QueryRunner final boolean populateCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.POPULATE_CACHE, "true")) && strategy != null - && cacheConfig.isPopulateCache(); - - final Sequence results = base.run(query); - + && cacheConfig.isPopulateCache() + // historical only populates distributed cache since the cache lookups are done at broker. + && !(cache instanceof MapCache); if (populateCache) { - final Cache.NamedKey key = CacheUtil.computeSegmentCacheKey( + Sequence results = base.run(query); + Cache.NamedKey key = CacheUtil.computeSegmentCacheKey( segmentIdentifier, segmentDescriptor, strategy.computeCacheKey(query) ); - - final Function cacheFn = strategy.prepareForCache(); - return Sequences.map( - results, - new Function() - { - @Nullable - @Override - public T apply(@Nullable T input) - { - CacheUtil.populate(cache, mapper, key, ImmutableList.of(cacheFn.apply(input))); - return input; - } - } + ArrayList resultAsList = Sequences.toList(results, new ArrayList()); + CacheUtil.populate( + cache, + mapper, + key, + Lists.transform(resultAsList, strategy.prepareForCache()) ); + return Sequences.simple(resultAsList); } else { - return results; + return base.run(query); } } diff --git a/server/src/test/java/io/druid/client/CachePopulatingQueryRunnerTest.java b/server/src/test/java/io/druid/client/CachePopulatingQueryRunnerTest.java index bb73c5d58bc..b87cb15b317 100644 --- a/server/src/test/java/io/druid/client/CachePopulatingQueryRunnerTest.java +++ b/server/src/test/java/io/druid/client/CachePopulatingQueryRunnerTest.java @@ -123,7 +123,7 @@ public class CachePopulatingQueryRunnerTest Sequence res = runner.run(builder.build()); // base sequence is not closed yet - Assert.assertFalse("sequence must not be closed", closable.isClosed()); + Assert.assertTrue(closable.isClosed()); ArrayList results = Sequences.toList(res, new ArrayList()); Assert.assertTrue(closable.isClosed()); Assert.assertEquals(expectedRes, results);