diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryLimitRowIterator.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryLimitRowIterator.java index 4aad2f9afa6..79c67031612 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryLimitRowIterator.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryLimitRowIterator.java @@ -35,6 +35,21 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; + +/** + * This iterator supports iteration through a Sequence returned by a ScanResultValue QueryRunner. Its behaviour + * varies depending on whether the query is returning time-ordered values and whether the CTX_KEY_OUTERMOST flag is + * set as false. + * + * Behaviours: + * 1) No time ordering: expects a Sequence of ScanResultValues which each contain up to query.batchSize events. + * The iterator will be "done" when the limit of events is reached. The final ScanResultValue might contain + * fewer than batchSize events so that the limit number of events is returned. + * 2) Time Ordering, CTX_KEY_OUTERMOST==null or true: Same behaviour as no time ordering + * 3) Time Ordering, CTX_KEY_OUTERMOST=false: The Sequence returned in this case should contain ScanResultValues + * that contain only one event each. This iterator will perform batching according to query.batchSize until + * the limit is reached. + */ public class ScanQueryLimitRowIterator implements CloseableIterator { private static final String TIME_ORDERING_SEGMENT_ID = "No segment ID available when using time ordering"; @@ -83,7 +98,7 @@ public class ScanQueryLimitRowIterator implements CloseableIterator singleEventScanResultValues = new ArrayList<>(); + private static List multiEventScanResultValues = new ArrayList<>(); + private static final ScanQuery.ResultFormat resultFormat = ScanQuery.ResultFormat.RESULT_FORMAT_LIST; + + public ScanQueryLimitRowIteratorTest( + final int batchSize, + final int limit + ) + { + this.batchSize = batchSize; + this.limit = limit; + } + + @Parameterized.Parameters(name = "{0} {1}") + public static Iterable constructorFeeder() + { + List batchSizes = ImmutableList.of(1, 33); + List limits = ImmutableList.of(3, 10000); + return QueryRunnerTestHelper.cartesian( + batchSizes, + limits + ); + } + + @Before + public void setup() + { + singleEventScanResultValues = new ArrayList<>(); + multiEventScanResultValues = new ArrayList<>(); + for (int i = 0; i < NUM_ELEMENTS; i++) { + singleEventScanResultValues.add( + ScanQueryTestHelper.generateScanResultValue( + ThreadLocalRandom.current().nextLong(), + resultFormat, + 1 + )); + } + for (int i = 0; i < NUM_ELEMENTS / batchSize; i++) { + multiEventScanResultValues.add( + ScanQueryTestHelper.generateScanResultValue( + ThreadLocalRandom.current().nextLong(), + resultFormat, + batchSize + )); + } + multiEventScanResultValues.add( + ScanQueryTestHelper.generateScanResultValue( + ThreadLocalRandom.current().nextLong(), + resultFormat, + NUM_ELEMENTS % batchSize + )); + } + + /** + * Expect no batching to occur and limit to be applied + */ + @Test + public void testNonTimeOrderedScan() + { + ScanQuery query = Druids.newScanQueryBuilder() + .limit(limit) + .timeOrder(ScanQuery.TimeOrder.NONE) + .dataSource("some datasource") + .batchSize(batchSize) + .intervals(QueryRunnerTestHelper.fullOnIntervalSpec) + .resultFormat(resultFormat) + .context(ImmutableMap.of(ScanQuery.CTX_KEY_OUTERMOST, false)) + .build(); + QueryPlus queryPlus = QueryPlus.wrap(query); + ScanQueryLimitRowIterator itr = new ScanQueryLimitRowIterator( + ((queryInput, responseContext) -> Sequences.simple(multiEventScanResultValues)), + queryPlus, + ImmutableMap.of() + ); + + int count = 0; + int expectedNumRows = Math.min(limit, NUM_ELEMENTS); + + while (itr.hasNext()) { + ScanResultValue curr = itr.next(); + List> events = ScanQueryTestHelper.getEventsListResultFormat(curr); + if (events.size() != batchSize) { + if (expectedNumRows - count > batchSize) { + Assert.fail("Batch size is incorrect"); + } else { + Assert.assertEquals(expectedNumRows - count, events.size()); + } + } + count += events.size(); + } + Assert.assertEquals(expectedNumRows, count); + } + + /** + * Expect batching to occur and limit to be applied on the Broker. Input from Historical + * is a sequence of single-event ScanResultValues. + */ + @Test + public void testBrokerTimeOrderedScan() + { + ScanQuery query = Druids.newScanQueryBuilder() + .limit(limit) + .timeOrder(ScanQuery.TimeOrder.DESCENDING) + .dataSource("some datasource") + .batchSize(batchSize) + .intervals(QueryRunnerTestHelper.fullOnIntervalSpec) + .resultFormat(resultFormat) + .build(); + QueryPlus queryPlus = QueryPlus.wrap(query); + ScanQueryLimitRowIterator itr = new ScanQueryLimitRowIterator( + ((queryInput, responseContext) -> Sequences.simple(singleEventScanResultValues)), + queryPlus, + ImmutableMap.of() + ); + + int count = 0; + int expectedNumRows = Math.min(limit, NUM_ELEMENTS); + while (itr.hasNext()) { + ScanResultValue curr = itr.next(); + List> events = ScanQueryTestHelper.getEventsListResultFormat(curr); + if (events.size() != batchSize) { + if (expectedNumRows - count >= batchSize) { + Assert.fail("Batch size is incorrect"); + } else { + Assert.assertEquals(expectedNumRows - count, events.size()); + } + } + count += events.size(); + } + Assert.assertEquals(expectedNumRows, count); + } + + /** + * Expect no batching to occur and limit to be applied. Input is a sequence of sorted single-event ScanResultValues + * (unbatching and sorting occurs in ScanQueryRunnerFactory#mergeRunners()). + */ + @Test + public void testHistoricalTimeOrderedScan() + { + ScanQuery query = Druids.newScanQueryBuilder() + .limit(limit) + .timeOrder(ScanQuery.TimeOrder.DESCENDING) + .dataSource("some datasource") + .batchSize(batchSize) + .intervals(QueryRunnerTestHelper.fullOnIntervalSpec) + .resultFormat(resultFormat) + .context(ImmutableMap.of(ScanQuery.CTX_KEY_OUTERMOST, false)) + .build(); + + QueryPlus queryPlus = QueryPlus.wrap(query); + ScanQueryLimitRowIterator itr = new ScanQueryLimitRowIterator( + ((queryInput, responseContext) -> Sequences.simple(singleEventScanResultValues)), + queryPlus, + ImmutableMap.of() + ); + + int count = 0; + int expectedNumRows = Math.min(limit, NUM_ELEMENTS); + while (itr.hasNext()) { + ScanResultValue curr = itr.next(); + List> events = ScanQueryTestHelper.getEventsListResultFormat(curr); + Assert.assertEquals(1, events.size()); + count += events.size(); + } + Assert.assertEquals(expectedNumRows, count); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java index d7cf516fe51..a6d4f9de797 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java @@ -20,24 +20,18 @@ package org.apache.druid.query.scan; import com.google.common.collect.ImmutableList; -import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.DefaultGenericQueryMetricsFactory; import org.apache.druid.query.Druids; import org.apache.druid.query.QueryRunnerTestHelper; -import org.apache.druid.segment.column.ColumnHolder; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @@ -109,7 +103,7 @@ public class ScanQueryRunnerFactoryTest for (int i = 0; i < numElements; i++) { long timestamp = (long) (ThreadLocalRandom.current().nextLong()); expectedEventTimestamps.add(timestamp); - srvs.add(generateOneEventScanResultValue(timestamp, resultFormat)); + srvs.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1)); } expectedEventTimestamps.sort((o1, o2) -> { int retVal = 0; @@ -133,9 +127,9 @@ public class ScanQueryRunnerFactoryTest // check each scan result value has one event for (ScanResultValue srv : output) { if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) { - Assert.assertTrue(getEventsCompactedListResultFormat(srv).size() == 1); + Assert.assertTrue(ScanQueryTestHelper.getEventsCompactedListResultFormat(srv).size() == 1); } else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) { - Assert.assertTrue(getEventsListResultFormat(srv).size() == 1); + Assert.assertTrue(ScanQueryTestHelper.getEventsListResultFormat(srv).size() == 1); } } @@ -153,37 +147,4 @@ public class ScanQueryRunnerFactoryTest } } } - - private ScanResultValue generateOneEventScanResultValue(long timestamp, ScanQuery.ResultFormat resultFormat) - { - String segmentId = "some_segment_id"; - List columns = new ArrayList<>(Arrays.asList(ColumnHolder.TIME_COLUMN_NAME, "name", "count")); - Object event; - if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) { - Map eventMap = new HashMap<>(); - eventMap.put(ColumnHolder.TIME_COLUMN_NAME, timestamp); - eventMap.put("name", "Feridun"); - eventMap.put("count", 4); - event = eventMap; - } else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) { - event = new ArrayList<>(Arrays.asList( - timestamp, - "Feridun", - 4 - )); - } else { - throw new UOE("Result format [%s] not supported yet", resultFormat.toString()); - } - return new ScanResultValue(segmentId, columns, Collections.singletonList(event)); - } - - private List> getEventsListResultFormat(ScanResultValue scanResultValue) - { - return (List>) scanResultValue.getEvents(); - } - - private List> getEventsCompactedListResultFormat(ScanResultValue scanResultValue) - { - return (List>) scanResultValue.getEvents(); - } } diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTestHelper.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTestHelper.java new file mode 100644 index 00000000000..554c9229bfe --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTestHelper.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.scan; + +import org.apache.druid.java.util.common.UOE; +import org.apache.druid.segment.column.ColumnHolder; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ScanQueryTestHelper +{ + public static ScanResultValue generateScanResultValue(long timestamp, ScanQuery.ResultFormat resultFormat, int batchSize) + { + String segmentId = "some_segment_id"; + List columns = new ArrayList<>(Arrays.asList(ColumnHolder.TIME_COLUMN_NAME, "name", "count")); + List events = new ArrayList<>(); + for (int i = 0; i < batchSize; i++) { + Object event; + if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) { + Map eventMap = new HashMap<>(); + eventMap.put(ColumnHolder.TIME_COLUMN_NAME, timestamp); + eventMap.put("name", "Feridun"); + eventMap.put("count", i); + event = eventMap; + } else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) { + event = new ArrayList<>(Arrays.asList( + timestamp, + "Feridun", + i + )); + } else { + throw new UOE("Result format [%s] not supported yet", resultFormat.toString()); + } + events.add(event); + } + return new ScanResultValue(segmentId, columns, events); + } + + public static List> getEventsListResultFormat(ScanResultValue scanResultValue) + { + return (List>) scanResultValue.getEvents(); + } + + public static List> getEventsCompactedListResultFormat(ScanResultValue scanResultValue) + { + return (List>) scanResultValue.getEvents(); + } + +}