diff --git a/processing/src/main/java/io/druid/query/RetryQueryRunner.java b/processing/src/main/java/io/druid/query/RetryQueryRunner.java index 37637a0862c..4d602a8d1ad 100644 --- a/processing/src/main/java/io/druid/query/RetryQueryRunner.java +++ b/processing/src/main/java/io/druid/query/RetryQueryRunner.java @@ -19,6 +19,7 @@ package io.druid.query; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; @@ -90,9 +91,11 @@ public class RetryQueryRunner implements QueryRunner if (!config.isReturnPartialResults() && !finalMissingSegs.isEmpty()) { throw new SegmentMissingException("No results found for segments[%s]", finalMissingSegs); } + return toolChest.mergeSequencesUnordered(Sequences.simple(listOfSequences)).toYielder(initValue, accumulator); + } + else { + return Iterables.getOnlyElement(listOfSequences).toYielder(initValue, accumulator); } - - return toolChest.mergeSequencesUnordered(Sequences.simple(listOfSequences)).toYielder(initValue, accumulator); } }; } diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 87d1a7eaeca..f2bc7c89d30 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -257,14 +257,14 @@ public class CachingClusteredClient implements QueryRunner @Override public Sequence get() { - ArrayList>> sequencesByInterval = Lists.newArrayList(); + ArrayList> sequencesByInterval = Lists.newArrayList(); addSequencesFromCache(sequencesByInterval); addSequencesFromServer(sequencesByInterval); return mergeCachedAndUncachedSequences(sequencesByInterval, toolChest); } - private void addSequencesFromCache(ArrayList>> listOfSequences) + private void addSequencesFromCache(ArrayList> listOfSequences) { if (strategy == null) { return; @@ -301,11 +301,11 @@ public class CachingClusteredClient implements QueryRunner } } ); - listOfSequences.add(Pair.of(cachedResultPair.lhs, Sequences.map(cachedSequence, pullFromCacheFunction))); + listOfSequences.add(Sequences.map(cachedSequence, pullFromCacheFunction)); } } - private void addSequencesFromServer(ArrayList>> listOfSequences) + private void addSequencesFromServer(ArrayList> listOfSequences) { listOfSequences.ensureCapacity(listOfSequences.size() + serverSegments.size()); @@ -326,7 +326,6 @@ public class CachingClusteredClient implements QueryRunner } final MultipleSpecificSegmentSpec segmentSpec = new MultipleSpecificSegmentSpec(descriptors); - final List intervals = segmentSpec.getIntervals(); final Sequence resultSeqToAdd; if (!server.isAssignable() || !populateCache || isBySegment) { // Direct server queryable @@ -466,12 +465,7 @@ public class CachingClusteredClient implements QueryRunner ); } - listOfSequences.add( - Pair.of( - new Interval(intervals.get(0).getStart(), intervals.get(intervals.size() - 1).getEnd()), - resultSeqToAdd - ) - ); + listOfSequences.add(resultSeqToAdd); } } }// End of Supplier @@ -479,7 +473,7 @@ public class CachingClusteredClient implements QueryRunner } protected Sequence mergeCachedAndUncachedSequences( - List>> sequencesByInterval, + List> sequencesByInterval, QueryToolChest> toolChest ) { @@ -489,17 +483,7 @@ public class CachingClusteredClient implements QueryRunner return toolChest.mergeSequencesUnordered( Sequences.simple( - Lists.transform( - sequencesByInterval, - new Function>, Sequence>() - { - @Override - public Sequence apply(Pair> input) - { - return input.rhs; - } - } - ) + sequencesByInterval ) ); } diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index 664741a23e7..98421894624 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -762,31 +762,21 @@ public class CachingClusteredClientTest @Test public void testOutOfOrderSequenceMerging() throws Exception { - List>>> sequences = + List>> sequences = Lists.newArrayList( - Pair.of( - // this could ne the result of a historical node returning the merged result of - // a) an empty result for segment 2011-01-02/2011-01-05 - // and b) result for a second partition for 2011-01-05/2011-01-10 - new Interval("2011-01-02/2011-01-10"), - Sequences.simple( - makeTopNResults( - new DateTime("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, - new DateTime("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, - new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 - ) + Sequences.simple( + makeTopNResults( + new DateTime("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, + new DateTime("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, + new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 ) ), - - Pair.of( - new Interval("2011-01-05/2011-01-10"), - Sequences.simple( - makeTopNResults( - new DateTime("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, - new DateTime("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, - new DateTime("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, - new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 - ) + Sequences.simple( + makeTopNResults( + new DateTime("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, + new DateTime("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, + new DateTime("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, + new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 ) ) );