Added test for n-way merge

This commit is contained in:
Justin Borromeo 2019-03-26 13:14:48 -07:00
parent 376e8bf906
commit fb858efbb7
2 changed files with 96 additions and 7 deletions

View File

@ -61,6 +61,9 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
final ScanQuery scanQuery = ((ScanQuery) (queryPlus.getQuery()))
.withNonNullLegacy(scanQueryConfig);
final QueryPlus<ScanResultValue> queryPlusWithNonNullLegacy = queryPlus.withQuery(scanQuery);
if (scanQuery.getLimit() == Long.MAX_VALUE) {
return runner.run(queryPlusWithNonNullLegacy, responseContext);
}
return new BaseSequence<>(
new BaseSequence.IteratorMaker<ScanResultValue, ScanQueryLimitRowIterator>()
{

View File

@ -20,11 +20,14 @@
package org.apache.druid.query.scan;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.SegmentDescriptor;
import org.joda.time.Interval;
@ -34,7 +37,10 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@RunWith(Parameterized.class)
@ -124,9 +130,95 @@ public class ScanQueryRunnerFactoryTest
factory.priorityQueueSortAndLimit(
inputSequence,
query,
ImmutableList.of(new SegmentDescriptor(new Interval(DateTimes.of("2010-01-01"), DateTimes.of("2019-01-01").plusHours(1)), "1", 0))
ImmutableList.of(new SegmentDescriptor(new Interval(
DateTimes.of("2010-01-01"),
DateTimes.of("2019-01-01").plusHours(1)
), "1", 0))
).toList();
validateSortedOutput(output, expectedEventTimestamps);
}
@Test
public void testNWayMerge()
{
List<Long> expectedEventTimestamps = new ArrayList<>(numElements * 3);
List<ScanResultValue> scanResultValues1 = new ArrayList<>(numElements);
for (int i = 0; i < numElements; i++) {
long timestamp = DateTimes.of("2015-01-01").plusMinutes(i * 2).getMillis();
expectedEventTimestamps.add(timestamp);
scanResultValues1.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1));
}
List<ScanResultValue> scanResultValues2 = new ArrayList<>(numElements);
for (int i = 0; i < numElements; i++) {
long timestamp = DateTimes.of("2015-01-01").plusMinutes(i * 2 + 1).getMillis();
expectedEventTimestamps.add(timestamp);
scanResultValues2.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1));
}
List<ScanResultValue> scanResultValues3 = new ArrayList<>(numElements);
for (int i = 0; i < numElements; i++) {
long timestamp = DateTimes.of("2015-01-02").plusMinutes(i).getMillis();
expectedEventTimestamps.add(timestamp);
scanResultValues3.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1));
}
if ( query.getOrder() == ScanQuery.Order.DESCENDING) {
Collections.reverse(scanResultValues1);
Collections.reverse(scanResultValues2);
Collections.reverse(scanResultValues3);
}
QueryRunner<ScanResultValue> runnerSegment1Partition1 =
(queryPlus, responseContext) -> Sequences.simple(scanResultValues1);
QueryRunner<ScanResultValue> runnerSegment1Partition2 =
(queryPlus, responseContext) -> Sequences.simple(scanResultValues2);
QueryRunner<ScanResultValue> runnerSegment2Partition1 =
(queryPlus, responseContext) -> Sequences.simple(scanResultValues3);
QueryRunner<ScanResultValue> runnerSegment2Partition2 =
(queryPlus, responseContext) -> Sequences.empty();
List<List<QueryRunner<ScanResultValue>>> groupedRunners = new ArrayList<>(2);
if (query.getOrder() == ScanQuery.Order.DESCENDING) {
groupedRunners.add(Arrays.asList(runnerSegment2Partition1, runnerSegment2Partition2));
groupedRunners.add(Arrays.asList(runnerSegment1Partition1, runnerSegment1Partition2));
} else {
groupedRunners.add(Arrays.asList(runnerSegment1Partition1, runnerSegment1Partition2));
groupedRunners.add(Arrays.asList(runnerSegment2Partition1, runnerSegment2Partition2));
}
expectedEventTimestamps.sort((o1, o2) -> {
int retVal = 0;
if (o1 > o2) {
retVal = 1;
} else if (o1 < o2) {
retVal = -1;
}
if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
return retVal * -1;
}
return retVal;
});
List<ScanResultValue> output =
factory.nWayMergeAndLimit(
groupedRunners,
QueryPlus.wrap(query),
ImmutableMap.of()
).toList();
validateSortedOutput(output, expectedEventTimestamps);
}
private void validateSortedOutput(List<ScanResultValue> output, List<Long> expectedEventTimestamps)
{
// check each scan result value has one event
for (ScanResultValue srv : output) {
if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) {
@ -155,10 +247,4 @@ public class ScanQueryRunnerFactoryTest
Assert.assertEquals((long) expectedEventTimestamps.get(i), output.get(i).getFirstEventTimestamp(resultFormat));
}
}
@Test
public void testNWayMerge()
{
}
}