Implemented Clint's recommendations

This commit is contained in:
Justin Borromeo 2019-03-27 20:03:41 -07:00
parent 07503ea5c0
commit 287a367f41
2 changed files with 35 additions and 25 deletions

View File

@ -159,7 +159,7 @@ The Scan query currently supports ordering based on timestamp for non-legacy que
will yield results that do not indicate which segment rows are from (`segmentId` will show up as `null`). Furthermore,
time ordering is only supported where the result set limit is less than `druid.query.scan.maxRowsQueuedForOrdering`
rows **or** all segments scanned have fewer than `druid.query.scan.maxSegmentPartitionsOrderedInMemory` partitions. Also,
time ordering is not support for queries issued directly to historicals unless a list of segments is specified. The
time ordering is not supported for queries issued directly to historicals unless a list of segments is specified. The
reasoning behind these limitations is that the implementation of time ordering uses two strategies that can consume too
much heap memory if left unbounded. These strategies (listed below) are chosen on a per-Historical basis depending on
query result set limit and the number of segments being scanned.

View File

@ -111,13 +111,18 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
return returnedRows;
}
} else {
// Query segment spec must be an instance of MultipleSpecificSegmentSpec because segment descriptors need
// to be present for a 1:1 matching of intervals with query runners. The other types of segment spec condense
// the intervals (i.e. merge neighbouring intervals), eliminating the 1:1 relationship between intervals
// and query runners.
if (!(query.getQuerySegmentSpec() instanceof MultipleSpecificSegmentSpec)) {
throw new UOE("Time-ordering on scan queries is only supported for queries with segment specs"
+ "of type MultipleSpecificSegmentSpec");
}
// Ascending time order for both descriptors and query runners by default
List<SegmentDescriptor> descriptorsOrdered =
((MultipleSpecificSegmentSpec) query.getQuerySegmentSpec()).getDescriptors(); // Ascending time order
List<QueryRunner<ScanResultValue>> queryRunnersOrdered = Lists.newArrayList(queryRunners); // Ascending time order by default
((MultipleSpecificSegmentSpec) query.getQuerySegmentSpec()).getDescriptors();
List<QueryRunner<ScanResultValue>> queryRunnersOrdered = Lists.newArrayList(queryRunners);
if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
descriptorsOrdered = Lists.reverse(descriptorsOrdered);
@ -208,8 +213,9 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
Comparator<ScanResultValue> priorityQComparator = new ScanResultValueTimestampComparator(scanQuery);
if (scanQuery.getLimit() > Integer.MAX_VALUE) {
throw new UOE("Limit of %,d rows not supported for priority queue strategy of time-ordering scan results",
scanQuery.getLimit()
throw new UOE(
"Limit of %,d rows not supported for priority queue strategy of time-ordering scan results",
scanQuery.getLimit()
);
}
@ -288,28 +294,32 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
// (3) Create a sequence of results from each runner in the group and flatmerge based on timestamp
// (4) Create a sequence of results from each runner group
// (5) Join all the results into a single sequence
return Sequences.concat(
Sequences.map(
Sequences.simple(groupedRunners),
runnerGroup ->
Sequences.map(
Sequences.simple(runnerGroup),
(input) -> Sequences.concat(
Sequences.map(
input.run(queryPlus, responseContext),
srv -> Sequences.simple(srv.toSingleEventScanResultValues())
Sequence<ScanResultValue> resultSequence =
Sequences.concat(
Sequences.map(
Sequences.simple(groupedRunners),
runnerGroup ->
Sequences.map(
Sequences.simple(runnerGroup),
(input) -> Sequences.concat(
Sequences.map(
input.run(queryPlus, responseContext),
srv -> Sequences.simple(srv.toSingleEventScanResultValues())
)
)
).flatMerge(
seq -> seq,
Ordering.from(new ScanResultValueTimestampComparator(
(ScanQuery) queryPlus.getQuery()
)).reverse()
)
).flatMerge(
seq -> seq,
Ordering.from(new ScanResultValueTimestampComparator(
(ScanQuery) queryPlus.getQuery()
)).reverse()
)
)
).limit(
((ScanQuery) (queryPlus.getQuery())).getLimit()
);
)
);
long limit = ((ScanQuery) (queryPlus.getQuery())).getLimit();
if (limit == Long.MAX_VALUE) {
return resultSequence;
}
return resultSequence.limit(limit);
}
@Override