Refactor n-way merge

This commit is contained in:
Justin Borromeo 2019-03-26 11:42:54 -07:00
parent 8a6bb1127c
commit 376e8bf906
3 changed files with 48 additions and 33 deletions

View File

@ -259,7 +259,7 @@ public class ScanQueryEngine
} }
/** /**
* If we're performing time-ordering, we want to scan through the first `limit` rows ignoring the number * If we're performing time-ordering, we want to scan through the first `limit` rows in each segment ignoring the number
* of rows already counted on other segments. * of rows already counted on other segments.
*/ */
private long calculateLimit(ScanQuery query, Map<String, Object> responseContext) private long calculateLimit(ScanQuery query, Map<String, Object> responseContext)

View File

@ -126,7 +126,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
if (query.getLimit() <= scanQueryConfig.getMaxRowsQueuedForOrdering()) { if (query.getLimit() <= scanQueryConfig.getMaxRowsQueuedForOrdering()) {
// Use priority queue strategy // Use priority queue strategy
return sortAndLimitScanResultValuesPriorityQueue( return priorityQueueSortAndLimit(
Sequences.concat(Sequences.map( Sequences.concat(Sequences.map(
Sequences.simple(queryRunnersOrdered), Sequences.simple(queryRunnersOrdered),
input -> input.run(queryPlus, responseContext) input -> input.run(queryPlus, responseContext)
@ -182,35 +182,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
.collect(Collectors.toList())) .collect(Collectors.toList()))
.collect(Collectors.toList()); .collect(Collectors.toList());
// Starting from the innermost Sequences.map: return nWayMergeAndLimit(groupedRunners, queryPlus, responseContext);
// (1) Deaggregate each ScanResultValue returned by the query runners
// (2) Combine the deaggregated ScanResultValues into a single sequence
// (3) Create a sequence of results from each runner in the group and flatmerge based on timestamp
// (4) Create a sequence of results from each runner group
// (5) Join all the results into a single sequence
return Sequences.concat(
Sequences.map(
Sequences.simple(groupedRunners),
runnerGroup ->
Sequences.map(
Sequences.simple(runnerGroup),
(input) -> Sequences.concat(
Sequences.map(
input.run(queryPlus, responseContext),
srv -> Sequences.simple(srv.toSingleEventScanResultValues())
)
)
).flatMerge(
seq -> seq,
Ordering.from(new ScanResultValueTimestampComparator(
query
)).reverse()
)
)
).limit(
Math.toIntExact(query.getLimit())
);
} }
throw new UOE( throw new UOE(
"Time ordering for queries of %,d partitions per segment and a row limit of %,d is not supported." "Time ordering for queries of %,d partitions per segment and a row limit of %,d is not supported."
@ -227,7 +199,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
} }
@VisibleForTesting @VisibleForTesting
Sequence<ScanResultValue> sortAndLimitScanResultValuesPriorityQueue( Sequence<ScanResultValue> priorityQueueSortAndLimit(
Sequence<ScanResultValue> inputSequence, Sequence<ScanResultValue> inputSequence,
ScanQuery scanQuery, ScanQuery scanQuery,
List<SegmentDescriptor> descriptorsOrdered List<SegmentDescriptor> descriptorsOrdered
@ -297,6 +269,43 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
return Sequences.simple(sortedElements); return Sequences.simple(sortedElements);
} }
@VisibleForTesting
Sequence<ScanResultValue> nWayMergeAndLimit(
List<List<QueryRunner<ScanResultValue>>> groupedRunners,
QueryPlus<ScanResultValue> queryPlus,
Map<String, Object> responseContext
)
{
// Starting from the innermost Sequences.map:
// (1) Deaggregate each ScanResultValue returned by the query runners
// (2) Combine the deaggregated ScanResultValues into a single sequence
// (3) Create a sequence of results from each runner in the group and flatmerge based on timestamp
// (4) Create a sequence of results from each runner group
// (5) Join all the results into a single sequence
return Sequences.concat(
Sequences.map(
Sequences.simple(groupedRunners),
runnerGroup ->
Sequences.map(
Sequences.simple(runnerGroup),
(input) -> Sequences.concat(
Sequences.map(
input.run(queryPlus, responseContext),
srv -> Sequences.simple(srv.toSingleEventScanResultValues())
)
)
).flatMerge(
seq -> seq,
Ordering.from(new ScanResultValueTimestampComparator(
(ScanQuery) queryPlus.getQuery()
)).reverse()
)
)
).limit(
Math.toIntExact(((ScanQuery) (queryPlus.getQuery())).getLimit())
);
}
@Override @Override
public QueryToolChest<ScanResultValue, ScanQuery> getToolchest() public QueryToolChest<ScanResultValue, ScanQuery> getToolchest()

View File

@ -121,7 +121,7 @@ public class ScanQueryRunnerFactoryTest
}); });
Sequence<ScanResultValue> inputSequence = Sequences.simple(srvs); Sequence<ScanResultValue> inputSequence = Sequences.simple(srvs);
List<ScanResultValue> output = List<ScanResultValue> output =
factory.sortAndLimitScanResultValuesPriorityQueue( factory.priorityQueueSortAndLimit(
inputSequence, inputSequence,
query, query,
ImmutableList.of(new SegmentDescriptor(new Interval(DateTimes.of("2010-01-01"), DateTimes.of("2019-01-01").plusHours(1)), "1", 0)) ImmutableList.of(new SegmentDescriptor(new Interval(DateTimes.of("2010-01-01"), DateTimes.of("2019-01-01").plusHours(1)), "1", 0))
@ -155,4 +155,10 @@ public class ScanQueryRunnerFactoryTest
Assert.assertEquals((long) expectedEventTimestamps.get(i), output.get(i).getFirstEventTimestamp(resultFormat)); Assert.assertEquals((long) expectedEventTimestamps.get(i), output.get(i).getFirstEventTimestamp(resultFormat));
} }
} }
@Test
public void testNWayMerge()
{
}
} }