diff --git a/docs/content/querying/scan-query.md b/docs/content/querying/scan-query.md index 1b97ed1c49d..ee44681b025 100644 --- a/docs/content/querying/scan-query.md +++ b/docs/content/querying/scan-query.md @@ -159,7 +159,7 @@ The Scan query currently supports ordering based on timestamp for non-legacy que will yield results that do not indicate which segment rows are from (`segmentId` will show up as `null`). Furthermore, time ordering is only supported where the result set limit is less than `druid.query.scan.maxRowsQueuedForOrdering` rows **or** all segments scanned have fewer than `druid.query.scan.maxSegmentPartitionsOrderedInMemory` partitions. Also, -time ordering is not support for queries issued directly to historicals unless a list of segments is specified. The +time ordering is not supported for queries issued directly to historicals unless a list of segments is specified. The reasoning behind these limitations is that the implementation of time ordering uses two strategies that can consume too much heap memory if left unbounded. These strategies (listed below) are chosen on a per-Historical basis depending on query result set limit and the number of segments being scanned. diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java index 5ad510874bc..41729f78638 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java @@ -111,13 +111,18 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory descriptorsOrdered = - ((MultipleSpecificSegmentSpec) query.getQuerySegmentSpec()).getDescriptors(); // Ascending time order - List> queryRunnersOrdered = Lists.newArrayList(queryRunners); // Ascending time order by default + ((MultipleSpecificSegmentSpec) query.getQuerySegmentSpec()).getDescriptors(); + List> queryRunnersOrdered = Lists.newArrayList(queryRunners); if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) { descriptorsOrdered = Lists.reverse(descriptorsOrdered); @@ -208,8 +213,9 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory priorityQComparator = new ScanResultValueTimestampComparator(scanQuery); if (scanQuery.getLimit() > Integer.MAX_VALUE) { - throw new UOE("Limit of %,d rows not supported for priority queue strategy of time-ordering scan results", - scanQuery.getLimit() + throw new UOE( + "Limit of %,d rows not supported for priority queue strategy of time-ordering scan results", + scanQuery.getLimit() ); } @@ -288,28 +294,32 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory - Sequences.map( - Sequences.simple(runnerGroup), - (input) -> Sequences.concat( - Sequences.map( - input.run(queryPlus, responseContext), - srv -> Sequences.simple(srv.toSingleEventScanResultValues()) + Sequence resultSequence = + Sequences.concat( + Sequences.map( + Sequences.simple(groupedRunners), + runnerGroup -> + Sequences.map( + Sequences.simple(runnerGroup), + (input) -> Sequences.concat( + Sequences.map( + input.run(queryPlus, responseContext), + srv -> Sequences.simple(srv.toSingleEventScanResultValues()) + ) ) + ).flatMerge( + seq -> seq, + Ordering.from(new ScanResultValueTimestampComparator( + (ScanQuery) queryPlus.getQuery() + )).reverse() ) - ).flatMerge( - seq -> seq, - Ordering.from(new ScanResultValueTimestampComparator( - (ScanQuery) queryPlus.getQuery() - )).reverse() - ) - ) - ).limit( - ((ScanQuery) (queryPlus.getQuery())).getLimit() - ); + ) + ); + long limit = ((ScanQuery) (queryPlus.getQuery())).getLimit(); + if (limit == Long.MAX_VALUE) { + return resultSequence; + } + return resultSequence.limit(limit); } @Override