mirror of https://github.com/apache/druid.git
Merge branch '6088-Time-Ordering-On-Scans-N-Way-Merge' of github.com:justinborromeo/incubator-druid into 6088-Time-Ordering-On-Scans-N-Way-Merge
This commit is contained in:
commit
57033f36df
|
@ -67,6 +67,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
|
||||||
@Inject
|
@Inject
|
||||||
public ScanQueryRunnerFactory(
|
public ScanQueryRunnerFactory(
|
||||||
ScanQueryQueryToolChest toolChest,
|
ScanQueryQueryToolChest toolChest,
|
||||||
|
|
||||||
ScanQueryEngine engine,
|
ScanQueryEngine engine,
|
||||||
ScanQueryConfig scanQueryConfig
|
ScanQueryConfig scanQueryConfig
|
||||||
)
|
)
|
||||||
|
@ -92,12 +93,12 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
|
||||||
return (queryPlus, responseContext) -> {
|
return (queryPlus, responseContext) -> {
|
||||||
ScanQuery query = (ScanQuery) queryPlus.getQuery();
|
ScanQuery query = (ScanQuery) queryPlus.getQuery();
|
||||||
|
|
||||||
List<SegmentDescriptor> descriptorsOrdered =
|
List<Interval> intervalsOrdered =
|
||||||
((MultipleSpecificSegmentSpec) query.getQuerySegmentSpec()).getDescriptors(); // Ascending time order
|
query.getQuerySegmentSpec().getIntervals(); // Ascending time order
|
||||||
List<QueryRunner<ScanResultValue>> queryRunnersOrdered = Lists.newArrayList(queryRunners); // Ascending time order by default
|
List<QueryRunner<ScanResultValue>> queryRunnersOrdered = Lists.newArrayList(queryRunners); // Ascending time order by default
|
||||||
|
|
||||||
if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
|
if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
|
||||||
descriptorsOrdered = Lists.reverse(descriptorsOrdered);
|
intervalsOrdered = Lists.reverse(intervalsOrdered);
|
||||||
queryRunnersOrdered = Lists.reverse(queryRunnersOrdered);
|
queryRunnersOrdered = Lists.reverse(queryRunnersOrdered);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -127,28 +128,28 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
|
||||||
input -> input.run(queryPlus, responseContext)
|
input -> input.run(queryPlus, responseContext)
|
||||||
)),
|
)),
|
||||||
query,
|
query,
|
||||||
descriptorsOrdered
|
intervalsOrdered
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
Preconditions.checkState(
|
Preconditions.checkState(
|
||||||
descriptorsOrdered.size() == queryRunnersOrdered.size(),
|
intervalsOrdered.size() == queryRunnersOrdered.size(),
|
||||||
"Number of segment descriptors does not equal number of "
|
"Number of intervals from the query segment spec does not equal number of "
|
||||||
+ "query runners...something went wrong!"
|
+ "query runners...something went wrong!"
|
||||||
);
|
);
|
||||||
|
|
||||||
// Combine the two lists of segment descriptors and query runners into a single list of
|
// Combine the two lists of segment descriptors and query runners into a single list of
|
||||||
// segment descriptors - query runner pairs. This makes it easier to use stream operators.
|
// segment descriptors - query runner pairs. This makes it easier to use stream operators.
|
||||||
List<Pair<SegmentDescriptor, QueryRunner<ScanResultValue>>> descriptorsAndRunnersOrdered = new ArrayList<>();
|
List<Pair<Interval, QueryRunner<ScanResultValue>>> intervalsAndRunnersOrdered = new ArrayList<>();
|
||||||
for (int i = 0; i < queryRunnersOrdered.size(); i++) {
|
for (int i = 0; i < queryRunnersOrdered.size(); i++) {
|
||||||
descriptorsAndRunnersOrdered.add(new Pair<>(descriptorsOrdered.get(i), queryRunnersOrdered.get(i)));
|
intervalsAndRunnersOrdered.add(new Pair<>(intervalsOrdered.get(i), queryRunnersOrdered.get(i)));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Group the list of pairs by interval. The LinkedHashMap will have an interval paired with a list of all the
|
// Group the list of pairs by interval. The LinkedHashMap will have an interval paired with a list of all the
|
||||||
// query runners for that segment
|
// query runners for that segment
|
||||||
LinkedHashMap<Interval, List<Pair<SegmentDescriptor, QueryRunner<ScanResultValue>>>> partitionsGroupedByInterval =
|
LinkedHashMap<Interval, List<Pair<Interval, QueryRunner<ScanResultValue>>>> partitionsGroupedByInterval =
|
||||||
descriptorsAndRunnersOrdered.stream()
|
intervalsAndRunnersOrdered.stream()
|
||||||
.collect(Collectors.groupingBy(
|
.collect(Collectors.groupingBy(
|
||||||
x -> x.lhs.getInterval(),
|
x -> x.lhs,
|
||||||
LinkedHashMap::new,
|
LinkedHashMap::new,
|
||||||
Collectors.toList()
|
Collectors.toList()
|
||||||
));
|
));
|
||||||
|
@ -224,7 +225,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
|
||||||
Sequence<ScanResultValue> sortAndLimitScanResultValuesPriorityQueue(
|
Sequence<ScanResultValue> sortAndLimitScanResultValuesPriorityQueue(
|
||||||
Sequence<ScanResultValue> inputSequence,
|
Sequence<ScanResultValue> inputSequence,
|
||||||
ScanQuery scanQuery,
|
ScanQuery scanQuery,
|
||||||
List<SegmentDescriptor> descriptorsOrdered
|
List<Interval> intervalsOrdered
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
Comparator<ScanResultValue> priorityQComparator = new ScanResultValueTimestampComparator(scanQuery);
|
Comparator<ScanResultValue> priorityQComparator = new ScanResultValueTimestampComparator(scanQuery);
|
||||||
|
@ -267,9 +268,9 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
|
||||||
// Finish scanning the interval containing the limit row
|
// Finish scanning the interval containing the limit row
|
||||||
if (numRowsScanned > limit && finalInterval == null) {
|
if (numRowsScanned > limit && finalInterval == null) {
|
||||||
long timestampOfLimitRow = next.getFirstEventTimestamp(scanQuery.getResultFormat());
|
long timestampOfLimitRow = next.getFirstEventTimestamp(scanQuery.getResultFormat());
|
||||||
for (SegmentDescriptor descriptor : descriptorsOrdered) {
|
for (Interval interval : intervalsOrdered) {
|
||||||
if (descriptor.getInterval().contains(timestampOfLimitRow)) {
|
if (interval.contains(timestampOfLimitRow)) {
|
||||||
finalInterval = descriptor.getInterval();
|
finalInterval = interval;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (finalInterval == null) {
|
if (finalInterval == null) {
|
||||||
|
|
|
@ -124,7 +124,7 @@ public class ScanQueryRunnerFactoryTest
|
||||||
factory.sortAndLimitScanResultValuesPriorityQueue(
|
factory.sortAndLimitScanResultValuesPriorityQueue(
|
||||||
inputSequence,
|
inputSequence,
|
||||||
query,
|
query,
|
||||||
ImmutableList.of(new SegmentDescriptor(new Interval(DateTimes.of("2010-01-01"), DateTimes.of("2019-01-01").plusHours(1)), "1", 0))
|
ImmutableList.of(new Interval(DateTimes.of("2010-01-01"), DateTimes.of("2019-01-01").plusHours(1)))
|
||||||
).toList();
|
).toList();
|
||||||
|
|
||||||
// check each scan result value has one event
|
// check each scan result value has one event
|
||||||
|
|
Loading…
Reference in New Issue