Check type of segment spec before using for time ordering

This commit is contained in:
Justin Borromeo 2019-03-25 15:19:45 -07:00
parent b822fc73df
commit da4fc66403
1 changed files with 100 additions and 94 deletions

View File

@ -92,15 +92,6 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
return (queryPlus, responseContext) -> {
ScanQuery query = (ScanQuery) queryPlus.getQuery();
List<SegmentDescriptor> descriptorsOrdered =
((MultipleSpecificSegmentSpec) query.getQuerySegmentSpec()).getDescriptors(); // Ascending time order
List<QueryRunner<ScanResultValue>> queryRunnersOrdered = Lists.newArrayList(queryRunners); // Ascending time order by default
if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
descriptorsOrdered = Lists.reverse(descriptorsOrdered);
queryRunnersOrdered = Lists.reverse(queryRunnersOrdered);
}
// Note: this variable is effective only when queryContext has a timeout.
// See the comment of CTX_TIMEOUT_AT.
final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery());
@ -119,103 +110,118 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
} else {
return returnedRows;
}
} else if (query.getLimit() <= scanQueryConfig.getMaxRowsQueuedForOrdering()) {
// Use priority queue strategy
return sortAndLimitScanResultValuesPriorityQueue(
Sequences.concat(Sequences.map(
Sequences.simple(queryRunnersOrdered),
input -> input.run(queryPlus, responseContext)
)),
query,
descriptorsOrdered
);
} else {
Preconditions.checkState(
descriptorsOrdered.size() == queryRunnersOrdered.size(),
"Number of segment descriptors does not equal number of "
+ "query runners...something went wrong!"
);
if (!(query.getQuerySegmentSpec() instanceof MultipleSpecificSegmentSpec)) {
throw new UOE("Time-ordering on scan queries is only supported for queries with segment specs"
+ "of type MultipleSpecificSegmentSpec");
}
List<SegmentDescriptor> descriptorsOrdered =
((MultipleSpecificSegmentSpec) query.getQuerySegmentSpec()).getDescriptors(); // Ascending time order
List<QueryRunner<ScanResultValue>> queryRunnersOrdered = Lists.newArrayList(queryRunners); // Ascending time order by default
// Combine the two lists of segment descriptors and query runners into a single list of
// segment descriptors - query runner pairs. This makes it easier to use stream operators.
List<Pair<SegmentDescriptor, QueryRunner<ScanResultValue>>> descriptorsAndRunnersOrdered = new ArrayList<>();
for (int i = 0; i < queryRunnersOrdered.size(); i++) {
descriptorsAndRunnersOrdered.add(new Pair<>(descriptorsOrdered.get(i), queryRunnersOrdered.get(i)));
if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
descriptorsOrdered = Lists.reverse(descriptorsOrdered);
queryRunnersOrdered = Lists.reverse(queryRunnersOrdered);
}
// Group the list of pairs by interval. The LinkedHashMap will have an interval paired with a list of all the
// query runners for that segment
LinkedHashMap<Interval, List<Pair<SegmentDescriptor, QueryRunner<ScanResultValue>>>> partitionsGroupedByInterval =
descriptorsAndRunnersOrdered.stream()
.collect(Collectors.groupingBy(
x -> x.lhs.getInterval(),
LinkedHashMap::new,
Collectors.toList()
));
if (query.getLimit() <= scanQueryConfig.getMaxRowsQueuedForOrdering()) {
// Use priority queue strategy
return sortAndLimitScanResultValuesPriorityQueue(
Sequences.concat(Sequences.map(
Sequences.simple(queryRunnersOrdered),
input -> input.run(queryPlus, responseContext)
)),
query,
descriptorsOrdered
);
} else {
Preconditions.checkState(
descriptorsOrdered.size() == queryRunnersOrdered.size(),
"Number of segment descriptors does not equal number of "
+ "query runners...something went wrong!"
);
// Find the segment with the largest numbers of partitions. This will be used to compare with the
// maxSegmentPartitionsOrderedInMemory limit to determine if the query is at risk of consuming too much memory.
int maxNumPartitionsInSegment =
partitionsGroupedByInterval.values()
.stream()
.map(x -> x.size())
.max(Comparator.comparing(Integer::valueOf))
.get();
// Combine the two lists of segment descriptors and query runners into a single list of
// segment descriptors - query runner pairs. This makes it easier to use stream operators.
List<Pair<SegmentDescriptor, QueryRunner<ScanResultValue>>> descriptorsAndRunnersOrdered = new ArrayList<>();
for (int i = 0; i < queryRunnersOrdered.size(); i++) {
descriptorsAndRunnersOrdered.add(new Pair<>(descriptorsOrdered.get(i), queryRunnersOrdered.get(i)));
}
if (maxNumPartitionsInSegment <= scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory()) {
// Use n-way merge strategy
// Group the list of pairs by interval. The LinkedHashMap will have an interval paired with a list of all the
// query runners for that segment
LinkedHashMap<Interval, List<Pair<SegmentDescriptor, QueryRunner<ScanResultValue>>>> partitionsGroupedByInterval =
descriptorsAndRunnersOrdered.stream()
.collect(Collectors.groupingBy(
x -> x.lhs.getInterval(),
LinkedHashMap::new,
Collectors.toList()
));
// Create a list of grouped runner lists (i.e. each sublist/"runner group" corresponds to an interval) ->
// there should be no interval overlap. We create a list of lists so we can create a sequence of sequences.
// There's no easy way to convert a LinkedHashMap to a sequence because it's non-iterable.
List<List<QueryRunner<ScanResultValue>>> groupedRunners =
partitionsGroupedByInterval.entrySet()
// Find the segment with the largest numbers of partitions. This will be used to compare with the
// maxSegmentPartitionsOrderedInMemory limit to determine if the query is at risk of consuming too much memory.
int maxNumPartitionsInSegment =
partitionsGroupedByInterval.values()
.stream()
.map(entry -> entry.getValue()
.stream()
.map(segQueryRunnerPair -> segQueryRunnerPair.rhs)
.collect(Collectors.toList()))
.collect(Collectors.toList());
.map(x -> x.size())
.max(Comparator.comparing(Integer::valueOf))
.get();
// Starting from the innermost Sequences.map:
// (1) Deaggregate each ScanResultValue returned by the query runners
// (2) Combine the deaggregated ScanResultValues into a single sequence
// (3) Create a sequence of results from each runner in the group and flatmerge based on timestamp
// (4) Create a sequence of results from each runner group
// (5) Join all the results into a single sequence
if (maxNumPartitionsInSegment <= scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory()) {
// Use n-way merge strategy
return Sequences.concat(
Sequences.map(
Sequences.simple(groupedRunners),
runnerGroup ->
Sequences.map(
Sequences.simple(runnerGroup),
(input) -> Sequences.concat(
Sequences.map(
input.run(queryPlus, responseContext),
srv -> Sequences.simple(srv.toSingleEventScanResultValues())
)
)
).flatMerge(
seq -> seq,
Ordering.from(new ScanResultValueTimestampComparator(
query
)).reverse()
)
)
).limit(
Math.toIntExact(query.getLimit())
// Create a list of grouped runner lists (i.e. each sublist/"runner group" corresponds to an interval) ->
// there should be no interval overlap. We create a list of lists so we can create a sequence of sequences.
// There's no easy way to convert a LinkedHashMap to a sequence because it's non-iterable.
List<List<QueryRunner<ScanResultValue>>> groupedRunners =
partitionsGroupedByInterval.entrySet()
.stream()
.map(entry -> entry.getValue()
.stream()
.map(segQueryRunnerPair -> segQueryRunnerPair.rhs)
.collect(Collectors.toList()))
.collect(Collectors.toList());
// Starting from the innermost Sequences.map:
// (1) Deaggregate each ScanResultValue returned by the query runners
// (2) Combine the deaggregated ScanResultValues into a single sequence
// (3) Create a sequence of results from each runner in the group and flatmerge based on timestamp
// (4) Create a sequence of results from each runner group
// (5) Join all the results into a single sequence
return Sequences.concat(
Sequences.map(
Sequences.simple(groupedRunners),
runnerGroup ->
Sequences.map(
Sequences.simple(runnerGroup),
(input) -> Sequences.concat(
Sequences.map(
input.run(queryPlus, responseContext),
srv -> Sequences.simple(srv.toSingleEventScanResultValues())
)
)
).flatMerge(
seq -> seq,
Ordering.from(new ScanResultValueTimestampComparator(
query
)).reverse()
)
)
).limit(
Math.toIntExact(query.getLimit())
);
}
throw new UOE(
"Time ordering for queries of %,d partitions per segment and a row limit of %,d is not supported."
+ " Try reducing the scope of the query to scan fewer partitions than the configurable limit of"
+ " %,d partitions or lower the row limit below %,d.",
maxNumPartitionsInSegment,
query.getLimit(),
scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory(),
scanQueryConfig.getMaxRowsQueuedForOrdering()
);
}
throw new UOE(
"Time ordering for queries of %,d partitions per segment and a row limit of %,d is not supported."
+ " Try reducing the scope of the query to scan fewer partitions than the configurable limit of"
+ " %,d partitions or lower the row limit below %,d.",
maxNumPartitionsInSegment,
query.getLimit(),
scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory(),
scanQueryConfig.getMaxRowsQueuedForOrdering()
);
}
};
}