diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 10af6f0d8e7..9c91cc1a3a8 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -25,6 +25,7 @@ import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -35,6 +36,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Inject; import com.metamx.common.Pair; import com.metamx.common.guava.BaseSequence; +import com.metamx.common.guava.Comparators; import com.metamx.common.guava.LazySequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; @@ -43,6 +45,7 @@ import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.client.selector.QueryableDruidServer; import io.druid.client.selector.ServerSelector; +import io.druid.common.utils.JodaUtils; import io.druid.guice.annotations.Smile; import io.druid.query.BySegmentResultValueClass; import io.druid.query.CacheStrategy; @@ -121,7 +124,7 @@ public class CachingClusteredClient implements QueryRunner final Map> serverSegments = Maps.newTreeMap(); - final List> cachedResults = Lists.newArrayList(); + final List> cachedResults = Lists.newArrayList(); final Map cachePopulatorMap = Maps.newHashMap(); final boolean useCache = query.getContextUseCache(true) @@ -214,7 +217,7 @@ public class CachingClusteredClient implements QueryRunner if (cachedValue != null) { // remove cached segment from set of segments to query segments.remove(segment); - cachedResults.add(Pair.of(segmentQueryInterval.getStart(), cachedValue)); + cachedResults.add(Pair.of(segmentQueryInterval, cachedValue)); } else if (populateCache) { final String segmentIdentifier = segment.lhs.getSegment().getIdentifier(); cachePopulatorMap.put( @@ -250,35 +253,47 @@ public class CachingClusteredClient implements QueryRunner @Override public Sequence get() { - ArrayList>> listOfSequences = Lists.newArrayList(); + ArrayList>> sequencesByInterval = Lists.newArrayList(); + addSequencesFromCache(sequencesByInterval); + addSequencesFromServer(sequencesByInterval); - addSequencesFromServer(listOfSequences); - addSequencesFromCache(listOfSequences); + if(sequencesByInterval.isEmpty()) { + return Sequences.empty(); + } Collections.sort( - listOfSequences, - Ordering.natural().onResultOf(Pair.>lhsFn()) + sequencesByInterval, + Ordering.from(Comparators.intervalsByStartThenEnd()).onResultOf(Pair.>lhsFn()) ); + // result sequences from overlapping intervals could start anywhere within that interval + // therefore we cannot assume any ordering with respect to the first result from each + // and must resort to calling toolchest.mergeSequencesUnordered for those. + Iterator>> iterator = sequencesByInterval.iterator(); + Pair> current = iterator.next(); + final List> orderedSequences = Lists.newLinkedList(); - DateTime unorderedStart = null; List> unordered = Lists.newLinkedList(); - for (Pair> sequencePair : listOfSequences) { - if (unorderedStart != null && unorderedStart.getMillis() != sequencePair.lhs.getMillis()) { + + unordered.add(current.rhs); + + while(iterator.hasNext()) { + Pair> next = iterator.next(); + if(!next.lhs.overlaps(current.lhs)) { orderedSequences.add(toolChest.mergeSequencesUnordered(Sequences.simple(unordered))); unordered = Lists.newLinkedList(); } - unorderedStart = sequencePair.lhs; - unordered.add(sequencePair.rhs); + unordered.add(next.rhs); + current = next; } if(!unordered.isEmpty()) { orderedSequences.add(toolChest.mergeSequencesUnordered(Sequences.simple(unordered))); } - return toolChest.mergeSequences(Sequences.simple(orderedSequences)); + return toolChest.mergeSequencesUnordered(Sequences.simple(orderedSequences)); } - private void addSequencesFromCache(ArrayList>> listOfSequences) + private void addSequencesFromCache(ArrayList>> listOfSequences) { if (strategy == null) { return; @@ -286,7 +301,7 @@ public class CachingClusteredClient implements QueryRunner final Function pullFromCacheFunction = strategy.pullFromCache(); final TypeReference cacheObjectClazz = strategy.getCacheObjectClazz(); - for (Pair cachedResultPair : cachedResults) { + for (Pair cachedResultPair : cachedResults) { final byte[] cachedResult = cachedResultPair.rhs; Sequence cachedSequence = new BaseSequence<>( new BaseSequence.IteratorMaker>() @@ -320,7 +335,7 @@ public class CachingClusteredClient implements QueryRunner } @SuppressWarnings("unchecked") - private void addSequencesFromServer(ArrayList>> listOfSequences) + private void addSequencesFromServer(ArrayList>> listOfSequences) { for (Map.Entry> entry : serverSegments.entrySet()) { final DruidServer server = entry.getKey(); @@ -396,7 +411,13 @@ public class CachingClusteredClient implements QueryRunner ); } - listOfSequences.add(Pair.of(intervals.get(0).getStart(), resultSeqToAdd)); + + listOfSequences.add( + Pair.of( + new Interval(intervals.get(0).getStart(), intervals.get(intervals.size() - 1).getEnd()), + resultSeqToAdd + ) + ); } } }