Wrote tests and added Javadoc

This commit is contained in:
Justin Borromeo 2019-03-01 13:38:29 -08:00
parent 5ff59f5ca6
commit 47c970b5f4
4 changed files with 303 additions and 45 deletions

View File

@ -35,6 +35,21 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* This iterator supports iteration through a Sequence returned by a ScanResultValue QueryRunner. Its behaviour
* varies depending on whether the query is returning time-ordered values and whether the CTX_KEY_OUTERMOST flag is
* set as false.
*
* Behaviours:
* 1) No time ordering: expects a Sequence of ScanResultValues which each contain up to query.batchSize events.
* The iterator will be "done" when the limit of events is reached. The final ScanResultValue might contain
* fewer than batchSize events so that the limit number of events is returned.
* 2) Time Ordering, CTX_KEY_OUTERMOST==null or true: Same behaviour as no time ordering
* 3) Time Ordering, CTX_KEY_OUTERMOST=false: The Sequence returned in this case should contain ScanResultValues
* that contain only one event each. This iterator will perform batching according to query.batchSize until
* the limit is reached.
*/
public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultValue>
{
private static final String TIME_ORDERING_SEGMENT_ID = "No segment ID available when using time ordering";
@ -83,7 +98,7 @@ public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultVa
throw new UOE(ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR + " is not supported yet");
}
// We don't want to perform batching at the historical-level if we're performing time ordering
// We don't want to perform batching at the historical-level if we're time ordering
if (query.getTimeOrder() == ScanQuery.TimeOrder.NONE ||
!query.getContextBoolean(ScanQuery.CTX_KEY_OUTERMOST, true)) {
ScanResultValue batch = yielder.get();
@ -95,9 +110,9 @@ public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultVa
} else {
// last batch
// single batch length is <= Integer.MAX_VALUE, so this should not overflow
int left = (int) (limit - count);
int numLeft = (int) (limit - count);
count = limit;
return new ScanResultValue(batch.getSegmentId(), batch.getColumns(), events.subList(0, left));
return new ScanResultValue(batch.getSegmentId(), batch.getColumns(), events.subList(0, numLeft));
}
} else {
// Perform single-event ScanResultValue batching. Each scan result value in this case will only have one event

View File

@ -0,0 +1,211 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.scan;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
@RunWith(Parameterized.class)
public class ScanQueryLimitRowIteratorTest
{
private static final int NUM_ELEMENTS = 1000;
private static int batchSize;
private static int limit;
private static List<ScanResultValue> singleEventScanResultValues = new ArrayList<>();
private static List<ScanResultValue> multiEventScanResultValues = new ArrayList<>();
private static final ScanQuery.ResultFormat resultFormat = ScanQuery.ResultFormat.RESULT_FORMAT_LIST;
public ScanQueryLimitRowIteratorTest(
final int batchSize,
final int limit
)
{
this.batchSize = batchSize;
this.limit = limit;
}
@Parameterized.Parameters(name = "{0} {1}")
public static Iterable<Object[]> constructorFeeder()
{
List<Integer> batchSizes = ImmutableList.of(1, 33);
List<Integer> limits = ImmutableList.of(3, 10000);
return QueryRunnerTestHelper.cartesian(
batchSizes,
limits
);
}
@Before
public void setup()
{
singleEventScanResultValues = new ArrayList<>();
multiEventScanResultValues = new ArrayList<>();
for (int i = 0; i < NUM_ELEMENTS; i++) {
singleEventScanResultValues.add(
ScanQueryTestHelper.generateScanResultValue(
ThreadLocalRandom.current().nextLong(),
resultFormat,
1
));
}
for (int i = 0; i < NUM_ELEMENTS / batchSize; i++) {
multiEventScanResultValues.add(
ScanQueryTestHelper.generateScanResultValue(
ThreadLocalRandom.current().nextLong(),
resultFormat,
batchSize
));
}
multiEventScanResultValues.add(
ScanQueryTestHelper.generateScanResultValue(
ThreadLocalRandom.current().nextLong(),
resultFormat,
NUM_ELEMENTS % batchSize
));
}
/**
* Expect no batching to occur and limit to be applied
*/
@Test
public void testNonTimeOrderedScan()
{
ScanQuery query = Druids.newScanQueryBuilder()
.limit(limit)
.timeOrder(ScanQuery.TimeOrder.NONE)
.dataSource("some datasource")
.batchSize(batchSize)
.intervals(QueryRunnerTestHelper.fullOnIntervalSpec)
.resultFormat(resultFormat)
.context(ImmutableMap.of(ScanQuery.CTX_KEY_OUTERMOST, false))
.build();
QueryPlus<ScanResultValue> queryPlus = QueryPlus.wrap(query);
ScanQueryLimitRowIterator itr = new ScanQueryLimitRowIterator(
((queryInput, responseContext) -> Sequences.simple(multiEventScanResultValues)),
queryPlus,
ImmutableMap.of()
);
int count = 0;
int expectedNumRows = Math.min(limit, NUM_ELEMENTS);
while (itr.hasNext()) {
ScanResultValue curr = itr.next();
List<Map<String, Object>> events = ScanQueryTestHelper.getEventsListResultFormat(curr);
if (events.size() != batchSize) {
if (expectedNumRows - count > batchSize) {
Assert.fail("Batch size is incorrect");
} else {
Assert.assertEquals(expectedNumRows - count, events.size());
}
}
count += events.size();
}
Assert.assertEquals(expectedNumRows, count);
}
/**
* Expect batching to occur and limit to be applied on the Broker. Input from Historical
* is a sequence of single-event ScanResultValues.
*/
@Test
public void testBrokerTimeOrderedScan()
{
ScanQuery query = Druids.newScanQueryBuilder()
.limit(limit)
.timeOrder(ScanQuery.TimeOrder.DESCENDING)
.dataSource("some datasource")
.batchSize(batchSize)
.intervals(QueryRunnerTestHelper.fullOnIntervalSpec)
.resultFormat(resultFormat)
.build();
QueryPlus<ScanResultValue> queryPlus = QueryPlus.wrap(query);
ScanQueryLimitRowIterator itr = new ScanQueryLimitRowIterator(
((queryInput, responseContext) -> Sequences.simple(singleEventScanResultValues)),
queryPlus,
ImmutableMap.of()
);
int count = 0;
int expectedNumRows = Math.min(limit, NUM_ELEMENTS);
while (itr.hasNext()) {
ScanResultValue curr = itr.next();
List<Map<String, Object>> events = ScanQueryTestHelper.getEventsListResultFormat(curr);
if (events.size() != batchSize) {
if (expectedNumRows - count >= batchSize) {
Assert.fail("Batch size is incorrect");
} else {
Assert.assertEquals(expectedNumRows - count, events.size());
}
}
count += events.size();
}
Assert.assertEquals(expectedNumRows, count);
}
/**
* Expect no batching to occur and limit to be applied. Input is a sequence of sorted single-event ScanResultValues
* (unbatching and sorting occurs in ScanQueryRunnerFactory#mergeRunners()).
*/
@Test
public void testHistoricalTimeOrderedScan()
{
ScanQuery query = Druids.newScanQueryBuilder()
.limit(limit)
.timeOrder(ScanQuery.TimeOrder.DESCENDING)
.dataSource("some datasource")
.batchSize(batchSize)
.intervals(QueryRunnerTestHelper.fullOnIntervalSpec)
.resultFormat(resultFormat)
.context(ImmutableMap.of(ScanQuery.CTX_KEY_OUTERMOST, false))
.build();
QueryPlus<ScanResultValue> queryPlus = QueryPlus.wrap(query);
ScanQueryLimitRowIterator itr = new ScanQueryLimitRowIterator(
((queryInput, responseContext) -> Sequences.simple(singleEventScanResultValues)),
queryPlus,
ImmutableMap.of()
);
int count = 0;
int expectedNumRows = Math.min(limit, NUM_ELEMENTS);
while (itr.hasNext()) {
ScanResultValue curr = itr.next();
List<Map<String, Object>> events = ScanQueryTestHelper.getEventsListResultFormat(curr);
Assert.assertEquals(1, events.size());
count += events.size();
}
Assert.assertEquals(expectedNumRows, count);
}
}

View File

@ -20,24 +20,18 @@
package org.apache.druid.query.scan;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.segment.column.ColumnHolder;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
@ -109,7 +103,7 @@ public class ScanQueryRunnerFactoryTest
for (int i = 0; i < numElements; i++) {
long timestamp = (long) (ThreadLocalRandom.current().nextLong());
expectedEventTimestamps.add(timestamp);
srvs.add(generateOneEventScanResultValue(timestamp, resultFormat));
srvs.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1));
}
expectedEventTimestamps.sort((o1, o2) -> {
int retVal = 0;
@ -133,9 +127,9 @@ public class ScanQueryRunnerFactoryTest
// check each scan result value has one event
for (ScanResultValue srv : output) {
if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) {
Assert.assertTrue(getEventsCompactedListResultFormat(srv).size() == 1);
Assert.assertTrue(ScanQueryTestHelper.getEventsCompactedListResultFormat(srv).size() == 1);
} else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) {
Assert.assertTrue(getEventsListResultFormat(srv).size() == 1);
Assert.assertTrue(ScanQueryTestHelper.getEventsListResultFormat(srv).size() == 1);
}
}
@ -153,37 +147,4 @@ public class ScanQueryRunnerFactoryTest
}
}
}
private ScanResultValue generateOneEventScanResultValue(long timestamp, ScanQuery.ResultFormat resultFormat)
{
String segmentId = "some_segment_id";
List<String> columns = new ArrayList<>(Arrays.asList(ColumnHolder.TIME_COLUMN_NAME, "name", "count"));
Object event;
if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) {
Map<String, Object> eventMap = new HashMap<>();
eventMap.put(ColumnHolder.TIME_COLUMN_NAME, timestamp);
eventMap.put("name", "Feridun");
eventMap.put("count", 4);
event = eventMap;
} else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) {
event = new ArrayList<>(Arrays.asList(
timestamp,
"Feridun",
4
));
} else {
throw new UOE("Result format [%s] not supported yet", resultFormat.toString());
}
return new ScanResultValue(segmentId, columns, Collections.singletonList(event));
}
private List<Map<String, Object>> getEventsListResultFormat(ScanResultValue scanResultValue)
{
return (List<Map<String, Object>>) scanResultValue.getEvents();
}
private List<List<Object>> getEventsCompactedListResultFormat(ScanResultValue scanResultValue)
{
return (List<List<Object>>) scanResultValue.getEvents();
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.scan;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.segment.column.ColumnHolder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ScanQueryTestHelper
{
public static ScanResultValue generateScanResultValue(long timestamp, ScanQuery.ResultFormat resultFormat, int batchSize)
{
String segmentId = "some_segment_id";
List<String> columns = new ArrayList<>(Arrays.asList(ColumnHolder.TIME_COLUMN_NAME, "name", "count"));
List<Object> events = new ArrayList<>();
for (int i = 0; i < batchSize; i++) {
Object event;
if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) {
Map<String, Object> eventMap = new HashMap<>();
eventMap.put(ColumnHolder.TIME_COLUMN_NAME, timestamp);
eventMap.put("name", "Feridun");
eventMap.put("count", i);
event = eventMap;
} else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) {
event = new ArrayList<>(Arrays.asList(
timestamp,
"Feridun",
i
));
} else {
throw new UOE("Result format [%s] not supported yet", resultFormat.toString());
}
events.add(event);
}
return new ScanResultValue(segmentId, columns, events);
}
public static List<Map<String, Object>> getEventsListResultFormat(ScanResultValue scanResultValue)
{
return (List<Map<String, Object>>) scanResultValue.getEvents();
}
public static List<List<Object>> getEventsCompactedListResultFormat(ScanResultValue scanResultValue)
{
return (List<List<Object>>) scanResultValue.getEvents();
}
}