diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 505c5039ec9..99f0031e4f7 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -68,7 +68,6 @@ import org.joda.time.Interval; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -326,18 +325,52 @@ public class CachingClusteredClient implements QueryRunner continue; } - final Sequence resultSeqToAdd; final MultipleSpecificSegmentSpec segmentSpec = new MultipleSpecificSegmentSpec(descriptors); List intervals = segmentSpec.getIntervals(); + final Sequence resultSeqToAdd; if (!server.isAssignable() || !populateCache || isBySegment) { // Direct server queryable - resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec), responseContext); + if (!isBySegment) { + resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec), responseContext); + } else { + @SuppressWarnings("unchecked") + final Query>> bySegmentQuery = (Query>>) query; + @SuppressWarnings("unchecked") + final Sequence>> resultSequence = clientQueryable.run( + bySegmentQuery.withQuerySegmentSpec(segmentSpec), + responseContext + ); + + resultSeqToAdd = (Sequence) Sequences.map( + resultSequence, + new Function>, Result>>() + { + @Override + public Result> apply(Result> input) + { + final BySegmentResultValueClass bySegmentValue = input.getValue(); + return new Result<>( + input.getTimestamp(), + new BySegmentResultValueClass( + Lists.transform( + bySegmentValue.getResults(), + toolChest.makePreComputeManipulatorFn( + query, + MetricManipulatorFns.deserializing() + ) + ), + bySegmentValue.getSegmentId(), + bySegmentValue.getInterval() + ) + ); + } + } + ); + } } else { // Requires some manipulation on broker side - - final QueryRunner>> clientQueryableWithSegments = clientQueryable; - - final Sequence>> runningSequence = clientQueryableWithSegments.run( + @SuppressWarnings("unchecked") + final Sequence>> runningSequence = clientQueryable.run( rewrittenQuery.withQuerySegmentSpec(segmentSpec), responseContext ); @@ -416,7 +449,6 @@ public class CachingClusteredClient implements QueryRunner ); } - listOfSequences.add( Pair.of( new Interval(intervals.get(0).getStart(), intervals.get(intervals.size() - 1).getEnd()),