diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 0e63f9e4ac3..c1efc018fa5 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -31,6 +31,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Inject; import com.metamx.common.Pair; @@ -332,26 +333,30 @@ public class CachingClusteredClient implements QueryRunner clientQueryable.run(rewrittenQuery.withQuerySegmentSpec(segmentSpec)), new Function>() { - private final Function prepareForCache = strategy.prepareForCache(); + private final Function cacheFn = strategy.prepareForCache(); @Override public Sequence apply(Object input) { Result result = (Result) input; final BySegmentResultValueClass value = (BySegmentResultValueClass) result.getValue(); - String segmentIdentifier = value.getSegmentId(); - final Iterable segmentResults = value.getResults(); - CachePopulator cachePopulator = cachePopulatorMap.get( - String.format("%s_%s", segmentIdentifier, value.getInterval()) - ); - if (cachePopulator != null) { - cachePopulator.populate(Iterables.transform(segmentResults, prepareForCache)); - } + final List cacheData = Lists.newLinkedList(); - return Sequences.simple( - Iterables.transform( - segmentResults, + return Sequences.withEffect( + Sequences.map( + Sequences.map( + Sequences.simple(value.getResults()), + new Function() + { + @Override + public T apply(T input) + { + cacheData.add(cacheFn.apply(input)); + return input; + } + } + ), toolChest.makeMetricManipulatorFn( rewrittenQuery, new MetricManipulationFn() @@ -363,7 +368,21 @@ public class CachingClusteredClient implements QueryRunner } } ) - ) + ), + new Runnable() + { + @Override + public void run() + { + CachePopulator cachePopulator = cachePopulatorMap.get( + String.format("%s_%s", value.getSegmentId(), value.getInterval()) + ); + if (cachePopulator != null) { + cachePopulator.populate(cacheData); + } + } + }, + MoreExecutors.sameThreadExecutor() ); } }