diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java index e8838995220..dbad5fb0070 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java @@ -38,6 +38,8 @@ import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryToolChest; +import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import org.apache.druid.segment.Segment; import org.joda.time.Interval; @@ -65,7 +67,6 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory { ScanQuery query = (ScanQuery) queryPlus.getQuery(); - List intervalsOrdered = - query.getQuerySegmentSpec().getIntervals(); // Ascending time order + List descriptorsOrdered = + ((MultipleSpecificSegmentSpec) query.getQuerySegmentSpec()).getDescriptors(); // Ascending time order List> queryRunnersOrdered = Lists.newArrayList(queryRunners); // Ascending time order by default if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) { - intervalsOrdered = Lists.reverse(intervalsOrdered); + descriptorsOrdered = Lists.reverse(descriptorsOrdered); queryRunnersOrdered = Lists.reverse(queryRunnersOrdered); } @@ -126,31 +127,31 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory input.run(queryPlus, responseContext) )), query, - intervalsOrdered + descriptorsOrdered ); } else { Preconditions.checkState( - intervalsOrdered.size() == queryRunnersOrdered.size(), - "Number of intervals from the query segment spec does not equal number of " + descriptorsOrdered.size() == queryRunnersOrdered.size(), + "Number of segment descriptors does not equal number of " + "query runners...something went wrong!" ); // Combine the two lists of segment descriptors and query runners into a single list of // segment descriptors - query runner pairs. This makes it easier to use stream operators. - List>> intervalsAndRunnersOrdered = new ArrayList<>(); + List>> descriptorsAndRunnersOrdered = new ArrayList<>(); for (int i = 0; i < queryRunnersOrdered.size(); i++) { - intervalsAndRunnersOrdered.add(new Pair<>(intervalsOrdered.get(i), queryRunnersOrdered.get(i))); + descriptorsAndRunnersOrdered.add(new Pair<>(descriptorsOrdered.get(i), queryRunnersOrdered.get(i))); } // Group the list of pairs by interval. The LinkedHashMap will have an interval paired with a list of all the // query runners for that segment - LinkedHashMap>>> partitionsGroupedByInterval = - intervalsAndRunnersOrdered.stream() - .collect(Collectors.groupingBy( - x -> x.lhs, - LinkedHashMap::new, - Collectors.toList() - )); + LinkedHashMap>>> partitionsGroupedByInterval = + descriptorsAndRunnersOrdered.stream() + .collect(Collectors.groupingBy( + x -> x.lhs.getInterval(), + LinkedHashMap::new, + Collectors.toList() + )); // Find the segment with the largest numbers of partitions. This will be used to compare with the // maxSegmentPartitionsOrderedInMemory limit to determine if the query is at risk of consuming too much memory. @@ -223,7 +224,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory sortAndLimitScanResultValuesPriorityQueue( Sequence inputSequence, ScanQuery scanQuery, - List intervalsOrdered + List descriptorsOrdered ) { Comparator priorityQComparator = new ScanResultValueTimestampComparator(scanQuery); @@ -266,9 +267,9 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory limit && finalInterval == null) { long timestampOfLimitRow = next.getFirstEventTimestamp(scanQuery.getResultFormat()); - for (Interval interval : intervalsOrdered) { - if (interval.contains(timestampOfLimitRow)) { - finalInterval = interval; + for (SegmentDescriptor descriptor : descriptorsOrdered) { + if (descriptor.getInterval().contains(timestampOfLimitRow)) { + finalInterval = descriptor.getInterval(); } } if (finalInterval == null) { diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java index 2287b45b588..cbf3a9d5cf6 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java @@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.DefaultGenericQueryMetricsFactory; import org.apache.druid.query.Druids; import org.apache.druid.query.QueryRunnerTestHelper; +import org.apache.druid.query.SegmentDescriptor; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; @@ -123,7 +124,7 @@ public class ScanQueryRunnerFactoryTest factory.sortAndLimitScanResultValuesPriorityQueue( inputSequence, query, - ImmutableList.of(new Interval(DateTimes.of("2010-01-01"), DateTimes.of("2019-01-01").plusHours(1))) + ImmutableList.of(new SegmentDescriptor(new Interval(DateTimes.of("2010-01-01"), DateTimes.of("2019-01-01").plusHours(1)), "1", 0)) ).toList(); // check each scan result value has one event