diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java index f313e2ed5d2..6d6758b1926 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java @@ -61,6 +61,9 @@ public class ScanQueryQueryToolChest extends QueryToolChest queryPlusWithNonNullLegacy = queryPlus.withQuery(scanQuery); + if (scanQuery.getLimit() == Long.MAX_VALUE) { + return runner.run(queryPlusWithNonNullLegacy, responseContext); + } return new BaseSequence<>( new BaseSequence.IteratorMaker() { diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java index 08abc595b21..ef8dc7ff6a0 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java @@ -20,11 +20,14 @@ package org.apache.druid.query.scan; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.DefaultGenericQueryMetricsFactory; import org.apache.druid.query.Druids; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerTestHelper; import org.apache.druid.query.SegmentDescriptor; import org.joda.time.Interval; @@ -34,7 +37,10 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; @RunWith(Parameterized.class) @@ -124,9 +130,95 @@ public class ScanQueryRunnerFactoryTest factory.priorityQueueSortAndLimit( inputSequence, query, - ImmutableList.of(new SegmentDescriptor(new Interval(DateTimes.of("2010-01-01"), DateTimes.of("2019-01-01").plusHours(1)), "1", 0)) + ImmutableList.of(new SegmentDescriptor(new Interval( + DateTimes.of("2010-01-01"), + DateTimes.of("2019-01-01").plusHours(1) + ), "1", 0)) ).toList(); + validateSortedOutput(output, expectedEventTimestamps); + } + + @Test + public void testNWayMerge() + { + List expectedEventTimestamps = new ArrayList<>(numElements * 3); + + List scanResultValues1 = new ArrayList<>(numElements); + for (int i = 0; i < numElements; i++) { + long timestamp = DateTimes.of("2015-01-01").plusMinutes(i * 2).getMillis(); + expectedEventTimestamps.add(timestamp); + scanResultValues1.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1)); + } + + List scanResultValues2 = new ArrayList<>(numElements); + for (int i = 0; i < numElements; i++) { + long timestamp = DateTimes.of("2015-01-01").plusMinutes(i * 2 + 1).getMillis(); + expectedEventTimestamps.add(timestamp); + scanResultValues2.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1)); + } + + List scanResultValues3 = new ArrayList<>(numElements); + for (int i = 0; i < numElements; i++) { + long timestamp = DateTimes.of("2015-01-02").plusMinutes(i).getMillis(); + expectedEventTimestamps.add(timestamp); + scanResultValues3.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1)); + } + + if ( query.getOrder() == ScanQuery.Order.DESCENDING) { + Collections.reverse(scanResultValues1); + Collections.reverse(scanResultValues2); + Collections.reverse(scanResultValues3); + } + + QueryRunner runnerSegment1Partition1 = + (queryPlus, responseContext) -> Sequences.simple(scanResultValues1); + + QueryRunner runnerSegment1Partition2 = + (queryPlus, responseContext) -> Sequences.simple(scanResultValues2); + + + QueryRunner runnerSegment2Partition1 = + (queryPlus, responseContext) -> Sequences.simple(scanResultValues3); + + QueryRunner runnerSegment2Partition2 = + (queryPlus, responseContext) -> Sequences.empty(); + + List>> groupedRunners = new ArrayList<>(2); + + if (query.getOrder() == ScanQuery.Order.DESCENDING) { + groupedRunners.add(Arrays.asList(runnerSegment2Partition1, runnerSegment2Partition2)); + groupedRunners.add(Arrays.asList(runnerSegment1Partition1, runnerSegment1Partition2)); + } else { + groupedRunners.add(Arrays.asList(runnerSegment1Partition1, runnerSegment1Partition2)); + groupedRunners.add(Arrays.asList(runnerSegment2Partition1, runnerSegment2Partition2)); + } + + expectedEventTimestamps.sort((o1, o2) -> { + int retVal = 0; + if (o1 > o2) { + retVal = 1; + } else if (o1 < o2) { + retVal = -1; + } + if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) { + return retVal * -1; + } + return retVal; + }); + + List output = + factory.nWayMergeAndLimit( + groupedRunners, + QueryPlus.wrap(query), + ImmutableMap.of() + ).toList(); + + validateSortedOutput(output, expectedEventTimestamps); + } + + private void validateSortedOutput(List output, List expectedEventTimestamps) + { // check each scan result value has one event for (ScanResultValue srv : output) { if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) { @@ -155,10 +247,4 @@ public class ScanQueryRunnerFactoryTest Assert.assertEquals((long) expectedEventTimestamps.get(i), output.get(i).getFirstEventTimestamp(resultFormat)); } } - - @Test - public void testNWayMerge() - { - - } }