Revert "Fixed failing tests -> allow usage of all types of segment spec"

This reverts commit ec470288c7b725f5310bcf69d1db9f85ff509c8d.
This commit is contained in:
Justin Borromeo 2019-03-25 13:13:32 -07:00
parent ec470288c7
commit 8f01d8dd16
2 changed files with 23 additions and 21 deletions

View File

@ -38,6 +38,8 @@ import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.segment.Segment; import org.apache.druid.segment.Segment;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -65,7 +67,6 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
@Inject @Inject
public ScanQueryRunnerFactory( public ScanQueryRunnerFactory(
ScanQueryQueryToolChest toolChest, ScanQueryQueryToolChest toolChest,
ScanQueryEngine engine, ScanQueryEngine engine,
ScanQueryConfig scanQueryConfig ScanQueryConfig scanQueryConfig
) )
@ -91,12 +92,12 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
return (queryPlus, responseContext) -> { return (queryPlus, responseContext) -> {
ScanQuery query = (ScanQuery) queryPlus.getQuery(); ScanQuery query = (ScanQuery) queryPlus.getQuery();
List<Interval> intervalsOrdered = List<SegmentDescriptor> descriptorsOrdered =
query.getQuerySegmentSpec().getIntervals(); // Ascending time order ((MultipleSpecificSegmentSpec) query.getQuerySegmentSpec()).getDescriptors(); // Ascending time order
List<QueryRunner<ScanResultValue>> queryRunnersOrdered = Lists.newArrayList(queryRunners); // Ascending time order by default List<QueryRunner<ScanResultValue>> queryRunnersOrdered = Lists.newArrayList(queryRunners); // Ascending time order by default
if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) { if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
intervalsOrdered = Lists.reverse(intervalsOrdered); descriptorsOrdered = Lists.reverse(descriptorsOrdered);
queryRunnersOrdered = Lists.reverse(queryRunnersOrdered); queryRunnersOrdered = Lists.reverse(queryRunnersOrdered);
} }
@ -126,28 +127,28 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
input -> input.run(queryPlus, responseContext) input -> input.run(queryPlus, responseContext)
)), )),
query, query,
intervalsOrdered descriptorsOrdered
); );
} else { } else {
Preconditions.checkState( Preconditions.checkState(
intervalsOrdered.size() == queryRunnersOrdered.size(), descriptorsOrdered.size() == queryRunnersOrdered.size(),
"Number of intervals from the query segment spec does not equal number of " "Number of segment descriptors does not equal number of "
+ "query runners...something went wrong!" + "query runners...something went wrong!"
); );
// Combine the two lists of segment descriptors and query runners into a single list of // Combine the two lists of segment descriptors and query runners into a single list of
// segment descriptors - query runner pairs. This makes it easier to use stream operators. // segment descriptors - query runner pairs. This makes it easier to use stream operators.
List<Pair<Interval, QueryRunner<ScanResultValue>>> intervalsAndRunnersOrdered = new ArrayList<>(); List<Pair<SegmentDescriptor, QueryRunner<ScanResultValue>>> descriptorsAndRunnersOrdered = new ArrayList<>();
for (int i = 0; i < queryRunnersOrdered.size(); i++) { for (int i = 0; i < queryRunnersOrdered.size(); i++) {
intervalsAndRunnersOrdered.add(new Pair<>(intervalsOrdered.get(i), queryRunnersOrdered.get(i))); descriptorsAndRunnersOrdered.add(new Pair<>(descriptorsOrdered.get(i), queryRunnersOrdered.get(i)));
} }
// Group the list of pairs by interval. The LinkedHashMap will have an interval paired with a list of all the // Group the list of pairs by interval. The LinkedHashMap will have an interval paired with a list of all the
// query runners for that segment // query runners for that segment
LinkedHashMap<Interval, List<Pair<Interval, QueryRunner<ScanResultValue>>>> partitionsGroupedByInterval = LinkedHashMap<Interval, List<Pair<SegmentDescriptor, QueryRunner<ScanResultValue>>>> partitionsGroupedByInterval =
intervalsAndRunnersOrdered.stream() descriptorsAndRunnersOrdered.stream()
.collect(Collectors.groupingBy( .collect(Collectors.groupingBy(
x -> x.lhs, x -> x.lhs.getInterval(),
LinkedHashMap::new, LinkedHashMap::new,
Collectors.toList() Collectors.toList()
)); ));
@ -223,7 +224,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
Sequence<ScanResultValue> sortAndLimitScanResultValuesPriorityQueue( Sequence<ScanResultValue> sortAndLimitScanResultValuesPriorityQueue(
Sequence<ScanResultValue> inputSequence, Sequence<ScanResultValue> inputSequence,
ScanQuery scanQuery, ScanQuery scanQuery,
List<Interval> intervalsOrdered List<SegmentDescriptor> descriptorsOrdered
) )
{ {
Comparator<ScanResultValue> priorityQComparator = new ScanResultValueTimestampComparator(scanQuery); Comparator<ScanResultValue> priorityQComparator = new ScanResultValueTimestampComparator(scanQuery);
@ -266,9 +267,9 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
// Finish scanning the interval containing the limit row // Finish scanning the interval containing the limit row
if (numRowsScanned > limit && finalInterval == null) { if (numRowsScanned > limit && finalInterval == null) {
long timestampOfLimitRow = next.getFirstEventTimestamp(scanQuery.getResultFormat()); long timestampOfLimitRow = next.getFirstEventTimestamp(scanQuery.getResultFormat());
for (Interval interval : intervalsOrdered) { for (SegmentDescriptor descriptor : descriptorsOrdered) {
if (interval.contains(timestampOfLimitRow)) { if (descriptor.getInterval().contains(timestampOfLimitRow)) {
finalInterval = interval; finalInterval = descriptor.getInterval();
} }
} }
if (finalInterval == null) { if (finalInterval == null) {

View File

@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory; import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.Druids; import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryRunnerTestHelper; import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.SegmentDescriptor;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -123,7 +124,7 @@ public class ScanQueryRunnerFactoryTest
factory.sortAndLimitScanResultValuesPriorityQueue( factory.sortAndLimitScanResultValuesPriorityQueue(
inputSequence, inputSequence,
query, query,
ImmutableList.of(new Interval(DateTimes.of("2010-01-01"), DateTimes.of("2019-01-01").plusHours(1))) ImmutableList.of(new SegmentDescriptor(new Interval(DateTimes.of("2010-01-01"), DateTimes.of("2019-01-01").plusHours(1)), "1", 0))
).toList(); ).toList();
// check each scan result value has one event // check each scan result value has one event