From da4fc664031debae1dc3b4a0190125e979564aac Mon Sep 17 00:00:00 2001 From: Justin Borromeo Date: Mon, 25 Mar 2019 15:19:45 -0700 Subject: [PATCH] Check type of segment spec before using for time ordering --- .../query/scan/ScanQueryRunnerFactory.java | 194 +++++++++--------- 1 file changed, 100 insertions(+), 94 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java index dbad5fb0070..6f14d70d527 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java @@ -92,15 +92,6 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory { ScanQuery query = (ScanQuery) queryPlus.getQuery(); - List descriptorsOrdered = - ((MultipleSpecificSegmentSpec) query.getQuerySegmentSpec()).getDescriptors(); // Ascending time order - List> queryRunnersOrdered = Lists.newArrayList(queryRunners); // Ascending time order by default - - if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) { - descriptorsOrdered = Lists.reverse(descriptorsOrdered); - queryRunnersOrdered = Lists.reverse(queryRunnersOrdered); - } - // Note: this variable is effective only when queryContext has a timeout. // See the comment of CTX_TIMEOUT_AT. final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery()); @@ -119,103 +110,118 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory input.run(queryPlus, responseContext) - )), - query, - descriptorsOrdered - ); } else { - Preconditions.checkState( - descriptorsOrdered.size() == queryRunnersOrdered.size(), - "Number of segment descriptors does not equal number of " - + "query runners...something went wrong!" - ); + if (!(query.getQuerySegmentSpec() instanceof MultipleSpecificSegmentSpec)) { + throw new UOE("Time-ordering on scan queries is only supported for queries with segment specs" + + "of type MultipleSpecificSegmentSpec"); + } + List descriptorsOrdered = + ((MultipleSpecificSegmentSpec) query.getQuerySegmentSpec()).getDescriptors(); // Ascending time order + List> queryRunnersOrdered = Lists.newArrayList(queryRunners); // Ascending time order by default - // Combine the two lists of segment descriptors and query runners into a single list of - // segment descriptors - query runner pairs. This makes it easier to use stream operators. - List>> descriptorsAndRunnersOrdered = new ArrayList<>(); - for (int i = 0; i < queryRunnersOrdered.size(); i++) { - descriptorsAndRunnersOrdered.add(new Pair<>(descriptorsOrdered.get(i), queryRunnersOrdered.get(i))); + if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) { + descriptorsOrdered = Lists.reverse(descriptorsOrdered); + queryRunnersOrdered = Lists.reverse(queryRunnersOrdered); } - // Group the list of pairs by interval. The LinkedHashMap will have an interval paired with a list of all the - // query runners for that segment - LinkedHashMap>>> partitionsGroupedByInterval = - descriptorsAndRunnersOrdered.stream() - .collect(Collectors.groupingBy( - x -> x.lhs.getInterval(), - LinkedHashMap::new, - Collectors.toList() - )); + if (query.getLimit() <= scanQueryConfig.getMaxRowsQueuedForOrdering()) { + // Use priority queue strategy + return sortAndLimitScanResultValuesPriorityQueue( + Sequences.concat(Sequences.map( + Sequences.simple(queryRunnersOrdered), + input -> input.run(queryPlus, responseContext) + )), + query, + descriptorsOrdered + ); + } else { + Preconditions.checkState( + descriptorsOrdered.size() == queryRunnersOrdered.size(), + "Number of segment descriptors does not equal number of " + + "query runners...something went wrong!" + ); - // Find the segment with the largest numbers of partitions. This will be used to compare with the - // maxSegmentPartitionsOrderedInMemory limit to determine if the query is at risk of consuming too much memory. - int maxNumPartitionsInSegment = - partitionsGroupedByInterval.values() - .stream() - .map(x -> x.size()) - .max(Comparator.comparing(Integer::valueOf)) - .get(); + // Combine the two lists of segment descriptors and query runners into a single list of + // segment descriptors - query runner pairs. This makes it easier to use stream operators. + List>> descriptorsAndRunnersOrdered = new ArrayList<>(); + for (int i = 0; i < queryRunnersOrdered.size(); i++) { + descriptorsAndRunnersOrdered.add(new Pair<>(descriptorsOrdered.get(i), queryRunnersOrdered.get(i))); + } - if (maxNumPartitionsInSegment <= scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory()) { - // Use n-way merge strategy + // Group the list of pairs by interval. The LinkedHashMap will have an interval paired with a list of all the + // query runners for that segment + LinkedHashMap>>> partitionsGroupedByInterval = + descriptorsAndRunnersOrdered.stream() + .collect(Collectors.groupingBy( + x -> x.lhs.getInterval(), + LinkedHashMap::new, + Collectors.toList() + )); - // Create a list of grouped runner lists (i.e. each sublist/"runner group" corresponds to an interval) -> - // there should be no interval overlap. We create a list of lists so we can create a sequence of sequences. - // There's no easy way to convert a LinkedHashMap to a sequence because it's non-iterable. - List>> groupedRunners = - partitionsGroupedByInterval.entrySet() + // Find the segment with the largest numbers of partitions. This will be used to compare with the + // maxSegmentPartitionsOrderedInMemory limit to determine if the query is at risk of consuming too much memory. + int maxNumPartitionsInSegment = + partitionsGroupedByInterval.values() .stream() - .map(entry -> entry.getValue() - .stream() - .map(segQueryRunnerPair -> segQueryRunnerPair.rhs) - .collect(Collectors.toList())) - .collect(Collectors.toList()); + .map(x -> x.size()) + .max(Comparator.comparing(Integer::valueOf)) + .get(); - // Starting from the innermost Sequences.map: - // (1) Deaggregate each ScanResultValue returned by the query runners - // (2) Combine the deaggregated ScanResultValues into a single sequence - // (3) Create a sequence of results from each runner in the group and flatmerge based on timestamp - // (4) Create a sequence of results from each runner group - // (5) Join all the results into a single sequence + if (maxNumPartitionsInSegment <= scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory()) { + // Use n-way merge strategy - return Sequences.concat( - Sequences.map( - Sequences.simple(groupedRunners), - runnerGroup -> - Sequences.map( - Sequences.simple(runnerGroup), - (input) -> Sequences.concat( - Sequences.map( - input.run(queryPlus, responseContext), - srv -> Sequences.simple(srv.toSingleEventScanResultValues()) - ) - ) - ).flatMerge( - seq -> seq, - Ordering.from(new ScanResultValueTimestampComparator( - query - )).reverse() - ) - ) - ).limit( - Math.toIntExact(query.getLimit()) + // Create a list of grouped runner lists (i.e. each sublist/"runner group" corresponds to an interval) -> + // there should be no interval overlap. We create a list of lists so we can create a sequence of sequences. + // There's no easy way to convert a LinkedHashMap to a sequence because it's non-iterable. + List>> groupedRunners = + partitionsGroupedByInterval.entrySet() + .stream() + .map(entry -> entry.getValue() + .stream() + .map(segQueryRunnerPair -> segQueryRunnerPair.rhs) + .collect(Collectors.toList())) + .collect(Collectors.toList()); + + // Starting from the innermost Sequences.map: + // (1) Deaggregate each ScanResultValue returned by the query runners + // (2) Combine the deaggregated ScanResultValues into a single sequence + // (3) Create a sequence of results from each runner in the group and flatmerge based on timestamp + // (4) Create a sequence of results from each runner group + // (5) Join all the results into a single sequence + + return Sequences.concat( + Sequences.map( + Sequences.simple(groupedRunners), + runnerGroup -> + Sequences.map( + Sequences.simple(runnerGroup), + (input) -> Sequences.concat( + Sequences.map( + input.run(queryPlus, responseContext), + srv -> Sequences.simple(srv.toSingleEventScanResultValues()) + ) + ) + ).flatMerge( + seq -> seq, + Ordering.from(new ScanResultValueTimestampComparator( + query + )).reverse() + ) + ) + ).limit( + Math.toIntExact(query.getLimit()) + ); + } + throw new UOE( + "Time ordering for queries of %,d partitions per segment and a row limit of %,d is not supported." + + " Try reducing the scope of the query to scan fewer partitions than the configurable limit of" + + " %,d partitions or lower the row limit below %,d.", + maxNumPartitionsInSegment, + query.getLimit(), + scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory(), + scanQueryConfig.getMaxRowsQueuedForOrdering() ); } - throw new UOE( - "Time ordering for queries of %,d partitions per segment and a row limit of %,d is not supported." - + " Try reducing the scope of the query to scan fewer partitions than the configurable limit of" - + " %,d partitions or lower the row limit below %,d.", - maxNumPartitionsInSegment, - query.getLimit(), - scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory(), - scanQueryConfig.getMaxRowsQueuedForOrdering() - ); } }; }