From ec470288c7b725f5310bcf69d1db9f85ff509c8d Mon Sep 17 00:00:00 2001 From: Justin Borromeo Date: Mon, 25 Mar 2019 11:01:35 -0700 Subject: [PATCH] Fixed failing tests -> allow usage of all types of segment spec --- .../query/scan/ScanQueryRunnerFactory.java | 41 +++++++++---------- .../scan/ScanQueryRunnerFactoryTest.java | 3 +- 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java index dbad5fb0070..e8838995220 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java @@ -38,8 +38,6 @@ import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryToolChest; -import org.apache.druid.query.SegmentDescriptor; -import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import org.apache.druid.segment.Segment; import org.joda.time.Interval; @@ -67,6 +65,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory { ScanQuery query = (ScanQuery) queryPlus.getQuery(); - List descriptorsOrdered = - ((MultipleSpecificSegmentSpec) query.getQuerySegmentSpec()).getDescriptors(); // Ascending time order + List intervalsOrdered = + query.getQuerySegmentSpec().getIntervals(); // Ascending time order List> queryRunnersOrdered = Lists.newArrayList(queryRunners); // Ascending time order by default if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) { - descriptorsOrdered = Lists.reverse(descriptorsOrdered); + intervalsOrdered = Lists.reverse(intervalsOrdered); queryRunnersOrdered = Lists.reverse(queryRunnersOrdered); } @@ -127,31 +126,31 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory input.run(queryPlus, responseContext) )), query, - descriptorsOrdered + intervalsOrdered ); } else { Preconditions.checkState( - descriptorsOrdered.size() == queryRunnersOrdered.size(), - "Number of segment descriptors does not equal number of " + intervalsOrdered.size() == queryRunnersOrdered.size(), + "Number of intervals from the query segment spec does not equal number of " + "query runners...something went wrong!" ); // Combine the two lists of segment descriptors and query runners into a single list of // segment descriptors - query runner pairs. This makes it easier to use stream operators. - List>> descriptorsAndRunnersOrdered = new ArrayList<>(); + List>> intervalsAndRunnersOrdered = new ArrayList<>(); for (int i = 0; i < queryRunnersOrdered.size(); i++) { - descriptorsAndRunnersOrdered.add(new Pair<>(descriptorsOrdered.get(i), queryRunnersOrdered.get(i))); + intervalsAndRunnersOrdered.add(new Pair<>(intervalsOrdered.get(i), queryRunnersOrdered.get(i))); } // Group the list of pairs by interval. The LinkedHashMap will have an interval paired with a list of all the // query runners for that segment - LinkedHashMap>>> partitionsGroupedByInterval = - descriptorsAndRunnersOrdered.stream() - .collect(Collectors.groupingBy( - x -> x.lhs.getInterval(), - LinkedHashMap::new, - Collectors.toList() - )); + LinkedHashMap>>> partitionsGroupedByInterval = + intervalsAndRunnersOrdered.stream() + .collect(Collectors.groupingBy( + x -> x.lhs, + LinkedHashMap::new, + Collectors.toList() + )); // Find the segment with the largest numbers of partitions. This will be used to compare with the // maxSegmentPartitionsOrderedInMemory limit to determine if the query is at risk of consuming too much memory. @@ -224,7 +223,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory sortAndLimitScanResultValuesPriorityQueue( Sequence inputSequence, ScanQuery scanQuery, - List descriptorsOrdered + List intervalsOrdered ) { Comparator priorityQComparator = new ScanResultValueTimestampComparator(scanQuery); @@ -267,9 +266,9 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory limit && finalInterval == null) { long timestampOfLimitRow = next.getFirstEventTimestamp(scanQuery.getResultFormat()); - for (SegmentDescriptor descriptor : descriptorsOrdered) { - if (descriptor.getInterval().contains(timestampOfLimitRow)) { - finalInterval = descriptor.getInterval(); + for (Interval interval : intervalsOrdered) { + if (interval.contains(timestampOfLimitRow)) { + finalInterval = interval; } } if (finalInterval == null) { diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java index cbf3a9d5cf6..2287b45b588 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java @@ -26,7 +26,6 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.DefaultGenericQueryMetricsFactory; import org.apache.druid.query.Druids; import org.apache.druid.query.QueryRunnerTestHelper; -import org.apache.druid.query.SegmentDescriptor; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; @@ -124,7 +123,7 @@ public class ScanQueryRunnerFactoryTest factory.sortAndLimitScanResultValuesPriorityQueue( inputSequence, query, - ImmutableList.of(new SegmentDescriptor(new Interval(DateTimes.of("2010-01-01"), DateTimes.of("2019-01-01").plusHours(1)), "1", 0)) + ImmutableList.of(new Interval(DateTimes.of("2010-01-01"), DateTimes.of("2019-01-01").plusHours(1))) ).toList(); // check each scan result value has one event