More comments

This commit is contained in:
Justin Borromeo 2019-03-22 15:30:41 -07:00
parent 1b46b58aec
commit 62dcedacde
1 changed files with 13 additions and 8 deletions

View File

@ -137,7 +137,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
); );
// Combine the two lists of segment descriptors and query runners into a single list of // Combine the two lists of segment descriptors and query runners into a single list of
// segment descriptors - query runner pairs // segment descriptors - query runner pairs. This makes it easier to use stream operators.
List<Pair<SegmentDescriptor, QueryRunner<ScanResultValue>>> descriptorsAndRunnersOrdered = new ArrayList<>(); List<Pair<SegmentDescriptor, QueryRunner<ScanResultValue>>> descriptorsAndRunnersOrdered = new ArrayList<>();
for (int i = 0; i < queryRunnersOrdered.size(); i++) { for (int i = 0; i < queryRunnersOrdered.size(); i++) {
descriptorsAndRunnersOrdered.add(new Pair<>(descriptorsOrdered.get(i), queryRunnersOrdered.get(i))); descriptorsAndRunnersOrdered.add(new Pair<>(descriptorsOrdered.get(i), queryRunnersOrdered.get(i)));
@ -166,7 +166,8 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
// Use n-way merge strategy // Use n-way merge strategy
// Create a list of grouped runner lists (i.e. each sublist/"runner group" corresponds to an interval) -> // Create a list of grouped runner lists (i.e. each sublist/"runner group" corresponds to an interval) ->
// there should be no interval overlap // there should be no interval overlap. We create a list of lists so we can create a sequence of sequences.
// There's no easy way to convert a LinkedHashMap to a sequence because it's non-iterable.
List<List<QueryRunner<ScanResultValue>>> groupedRunners = List<List<QueryRunner<ScanResultValue>>> groupedRunners =
partitionsGroupedByInterval.entrySet() partitionsGroupedByInterval.entrySet()
.stream() .stream()
@ -176,15 +177,20 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
.collect(Collectors.toList())) .collect(Collectors.toList()))
.collect(Collectors.toList()); .collect(Collectors.toList());
// (1) Deaggregate each ScanResultValue returned by the query runners
// (2) Combine the deaggregated ScanResultValues into a single sequence
// (3) Create a sequence of results from each runner in the group and flatmerge based on timestamp
// (4) Create a sequence of results from each runner group
// (5) Join all the results into a single sequence
return Sequences.concat( // 5) Join all the results into a single sequence return Sequences.concat( // (5)
Sequences.map( // 4) Create a sequence of results from each runner group Sequences.map( // (4)
Sequences.simple(groupedRunners), Sequences.simple(groupedRunners),
runnerGroup -> runnerGroup ->
Sequences.map( // 3) Create a sequence of results from each runner in the group and flatmerge based on timestamp Sequences.map( // (3)
Sequences.simple(runnerGroup), Sequences.simple(runnerGroup),
(input) -> Sequences.concat( // 2) Combine the deaggregated ScanResultValues into a single sequence (input) -> Sequences.concat( // (2)
Sequences.map( // 1) Deaggregate each ScanResultValue returned by the query runners Sequences.map( // (1)
input.run(queryPlus, responseContext), input.run(queryPlus, responseContext),
srv -> Sequences.simple(srv.toSingleEventScanResultValues()) srv -> Sequences.simple(srv.toSingleEventScanResultValues())
) )
@ -210,7 +216,6 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
scanQueryConfig.getMaxRowsQueuedForOrdering() scanQueryConfig.getMaxRowsQueuedForOrdering()
); );
} }
}; };
} }