diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryLimitRowIterator.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryLimitRowIterator.java index 4ec29f5208a..4103b147697 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryLimitRowIterator.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryLimitRowIterator.java @@ -123,7 +123,7 @@ public class ScanQueryLimitRowIterator implements CloseableIterator) srv.getEvents())); yielder = yielder.next(null); count++; } diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java index 4bde1d3e8d9..645f57bf425 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java @@ -20,11 +20,13 @@ package org.apache.druid.query.scan; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.inject.Inject; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; @@ -38,18 +40,19 @@ import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; -import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.segment.Segment; import org.joda.time.Interval; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Comparator; import java.util.Deque; -import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.PriorityQueue; import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; public class ScanQueryRunnerFactory implements QueryRunnerFactory { @@ -88,11 +91,16 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory { ScanQuery query = (ScanQuery) queryPlus.getQuery(); - int numSegments = 0; - final Iterator> segmentIt = queryRunners.iterator(); - for (; segmentIt.hasNext(); numSegments++) { - segmentIt.next(); + + List descriptorsOrdered = + ((MultipleSpecificSegmentSpec) query.getQuerySegmentSpec()).getDescriptors(); // Ascending time order + List> queryRunnersOrdered = Lists.newArrayList(queryRunners); // Ascending time order by default + + if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) { + descriptorsOrdered = Lists.reverse(descriptorsOrdered); + queryRunnersOrdered = Lists.reverse(queryRunnersOrdered); } + // Note: this variable is effective only when queryContext has a timeout. // See the comment of CTX_TIMEOUT_AT. final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery()); @@ -113,16 +121,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory descriptorsOrdered = - ((MultipleSpecificSegmentSpec) query.getQuerySegmentSpec()).getDescriptors(); // Ascending time order - List> queryRunnersOrdered = Lists.newArrayList(queryRunners); // Ascending time order by default - - if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) { - descriptorsOrdered = Lists.reverse(descriptorsOrdered); - queryRunnersOrdered = Lists.reverse(queryRunnersOrdered); - } - - return sortAndLimitScanResultValues( + return sortAndLimitScanResultValuesPriorityQueue( Sequences.concat(Sequences.map( Sequences.simple(queryRunnersOrdered), input -> input.run(queryPlus, responseContext) @@ -130,42 +129,79 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory unbatched = - Sequences.map( - Sequences.simple(queryRunners), - (input) -> Sequences.concat( - Sequences.map( - input.run(queryPlus, responseContext), - srv -> Sequences.simple(srv.toSingleEventScanResultValues()) - ) - ) - ).flatMerge( - seq -> seq, - Ordering.from(new ScanResultValueTimestampComparator( - query - )).reverse() - ).limit( - Math.toIntExact(query.getLimit()) - ); + } else { + Preconditions.checkState( + descriptorsOrdered.size() == queryRunnersOrdered.size(), + "Number of segment descriptors does not equal number of " + + "query runners...something went wrong!" + ); - return unbatched; + List>> descriptorsAndRunnersOrdered = new ArrayList<>(); + + for (int i = 0; i < queryRunnersOrdered.size(); i++) { + descriptorsAndRunnersOrdered.add(new Pair<>(descriptorsOrdered.get(i), queryRunnersOrdered.get(i))); + } + + LinkedHashMap>>> partitionsGroupedByInterval = + descriptorsAndRunnersOrdered.stream() + .collect(Collectors.groupingBy( + x -> x.lhs.getInterval(), + LinkedHashMap::new, + Collectors.toList() + )); + + int maxNumPartitionsInSegment = + partitionsGroupedByInterval.values() + .stream() + .map(x -> x.size()) + .max(Comparator.comparing(Integer::valueOf)).get(); + + if (maxNumPartitionsInSegment <= scanQueryConfig.getMaxSegmentsOrderedInMemory()) { + // Use n-way merge strategy + List>> groupedRunners = new ArrayList<>(descriptorsAndRunnersOrdered.size()); + for (Map.Entry>>> entry : + partitionsGroupedByInterval.entrySet()) { + groupedRunners.add(entry.getValue().stream().map(x -> x.rhs).collect(Collectors.toList())); + } + return Sequences.concat( + Sequences.map( + Sequences.simple(groupedRunners), // Sequence of runnerGroups + runnerGroup -> + Sequences.map( + Sequences.simple(runnerGroup), + (input) -> Sequences.concat( + Sequences.map( + input.run(queryPlus, responseContext), + srv -> Sequences.simple(srv.toSingleEventScanResultValues()) + ) + ) + ).flatMerge( + seq -> seq, + Ordering.from(new ScanResultValueTimestampComparator( + query + )).reverse() + ) + ) + ).limit( + Math.toIntExact(query.getLimit()) + ); + } + throw new UOE( + "Time ordering for queries of %,d partitions per segment and a row limit of %,d is not supported." + + " Try reducing the scope of the query to scan fewer partitions than the configurable limit of" + + " %,d partitions or lower the row limit below %,d.", + maxNumPartitionsInSegment, + query.getLimit(), + scanQueryConfig.getMaxSegmentsOrderedInMemory(), + scanQueryConfig.getMaxRowsQueuedForOrdering() + ); } - throw new UOE( - "Time ordering for queries of %,d segments per historical and a row limit of %,d is not supported." - + " Try reducing the scope of the query to scan fewer segments than the configurable segment limit of" - + " %,d segments or lower the row limit below %,d.", - numSegments, - query.getLimit(), - scanQueryConfig.getMaxSegmentsOrderedInMemory(), - scanQueryConfig.getMaxRowsQueuedForOrdering() - ); + }; } @VisibleForTesting - Sequence sortAndLimitScanResultValues( + Sequence sortAndLimitScanResultValuesPriorityQueue( Sequence inputSequence, ScanQuery scanQuery, List descriptorsOrdered diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java index 0d32be9e0c0..763191d8b82 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java @@ -119,7 +119,7 @@ public class ScanQueryRunnerFactoryTest }); Sequence inputSequence = Sequences.simple(srvs); List output = - factory.sortAndLimitScanResultValues( + factory.sortAndLimitScanResultValuesPriorityQueue( inputSequence, query, null