Support querying realtime segments using time-ordered scan queries and fix broken scan queries without time column (#7454)

* Update scan query runner factory to accept SpecificSegmentSpec

*  nit

* Sorry travis

* Improve logging and fix doc

* Bug fix

* Friendlier error msgs and tests to cover bug

* Address Gian's comments

* Fix doc

* Added tests for empty and null column list

* Style

* Fix checking wrong order (looking at query param when it should be
looking at the null-handled order)

* Add test case for null order

* Fix ScanQueryRunnerTest

* Forbidden APIs fixed
This commit is contained in:
Justin Borromeo 2019-04-12 19:08:34 -07:00 committed by Gian Merlino
parent 7d9cb6944b
commit 85f10ed0d0
10 changed files with 761 additions and 290 deletions

View File

@ -61,7 +61,7 @@ The following are the main parameters for Scan queries:
|columns|A String array of dimensions and metrics to scan. If left empty, all dimensions and metrics are returned.|no|
|batchSize|How many rows buffered before return to client. Default is `20480`|no|
|limit|How many rows to return. If not specified, all rows will be returned.|no|
|order|The ordering of returned rows based on timestamp. "ascending", "descending", and "none" (default) are supported. Currently, "ascending" and "descending" are only supported for queries where the limit is less than `druid.query.scan.maxRowsQueuedForOrdering`. Scan queries that are either legacy mode or have a limit greater than `druid.query.scan.maxRowsQueuedForOrdering` will not be time-ordered and default to a order of "none".|none|
|order|The ordering of returned rows based on timestamp. "ascending", "descending", and "none" (default) are supported. Currently, "ascending" and "descending" are only supported for queries where the `__time` column is included in the `columns` field and the requirements outlined in the [time ordering](#time-ordering) section are met.|none|
|legacy|Return results consistent with the legacy "scan-query" contrib extension. Defaults to the value set by `druid.query.scan.legacy`, which in turn defaults to false. See [Legacy mode](#legacy-mode) for details.|no|
|context|An additional JSON Object which can be used to specify certain flags (see the Query Context Properties section below).|no|

View File

@ -34,6 +34,7 @@ import org.apache.druid.query.Query;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import javax.annotation.Nullable;
import java.util.List;
@ -149,6 +150,12 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
this.columns = columns;
this.legacy = legacy;
this.order = (order == null) ? Order.NONE : order;
if (this.order != Order.NONE) {
Preconditions.checkArgument(
columns == null || columns.size() == 0 || columns.contains(ColumnHolder.TIME_COLUMN_NAME),
"The __time column must be selected if the results are time-ordered."
);
}
this.maxRowsQueuedForOrdering = validateAndGetMaxRowsQueuedForOrdering();
this.maxSegmentPartitionsOrderedInMemory = validateAndGetMaxSegmentPartitionsOrderedInMemory();
}
@ -256,6 +263,9 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
@Override
public Ordering<ScanResultValue> getResultOrdering()
{
if (order == Order.NONE) {
return Ordering.natural();
}
return Ordering.from(new ScanResultValueTimestampComparator(this)).reverse();
}

View File

@ -40,11 +40,14 @@ import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.Segment;
import org.joda.time.Interval;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.LinkedHashMap;
@ -111,17 +114,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
return returnedRows;
}
} else {
// Query segment spec must be an instance of MultipleSpecificSegmentSpec because segment descriptors need
// to be present for a 1:1 matching of intervals with query runners. The other types of segment spec condense
// the intervals (i.e. merge neighbouring intervals), eliminating the 1:1 relationship between intervals
// and query runners.
if (!(query.getQuerySegmentSpec() instanceof MultipleSpecificSegmentSpec)) {
throw new UOE("Time-ordering on scan queries is only supported for queries with segment specs"
+ "of type MultipleSpecificSegmentSpec");
}
// Ascending time order for both descriptors and query runners by default
List<SegmentDescriptor> descriptorsOrdered =
((MultipleSpecificSegmentSpec) query.getQuerySegmentSpec()).getDescriptors();
List<SegmentDescriptor> descriptorsOrdered = getSegmentDescriptorsFromSpecificQuerySpec(query.getQuerySegmentSpec());
List<QueryRunner<ScanResultValue>> queryRunnersOrdered = Lists.newArrayList(queryRunners);
if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
@ -286,6 +279,28 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
return Sequences.simple(sortedElements);
}
@VisibleForTesting
List<SegmentDescriptor> getSegmentDescriptorsFromSpecificQuerySpec(QuerySegmentSpec spec)
{
// Query segment spec must be an instance of MultipleSpecificSegmentSpec or SpecificSegmentSpec because
// segment descriptors need to be present for a 1:1 matching of intervals with query runners.
// The other types of segment spec condense the intervals (i.e. merge neighbouring intervals), eliminating
// the 1:1 relationship between intervals and query runners.
List<SegmentDescriptor> descriptorsOrdered;
if (spec instanceof MultipleSpecificSegmentSpec) {
// Ascending time order for both descriptors and query runners by default
descriptorsOrdered = ((MultipleSpecificSegmentSpec) spec).getDescriptors();
} else if (spec instanceof SpecificSegmentSpec) {
descriptorsOrdered = Collections.singletonList(((SpecificSegmentSpec) spec).getDescriptor());
} else {
throw new UOE("Time-ordering on scan queries is only supported for queries with segment specs"
+ "of type MultipleSpecificSegmentSpec or SpecificSegmentSpec...a [%s] was received instead.",
spec.getClass().getSimpleName());
}
return descriptorsOrdered;
}
@VisibleForTesting
Sequence<ScanResultValue> nWayMergeAndLimit(
List<List<QueryRunner<ScanResultValue>>> groupedRunners,

View File

@ -21,6 +21,7 @@ package org.apache.druid.query.scan;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.segment.column.ColumnHolder;
@ -78,9 +79,16 @@ public class ScanResultValue implements Comparable<ScanResultValue>
public long getFirstEventTimestamp(ScanQuery.ResultFormat resultFormat)
{
if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) {
return (Long) ((Map<String, Object>) ((List<Object>) this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME);
Long timestamp = (Long) ((Map<String, Object>) ((List<Object>) this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME);
if (timestamp == null) {
throw new ISE("Unable to compare timestamp for rows without a time column");
}
return timestamp;
} else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) {
int timeColumnIndex = this.getColumns().indexOf(ColumnHolder.TIME_COLUMN_NAME);
if (timeColumnIndex == -1) {
throw new ISE("Unable to compare timestamp for rows without a time column");
}
List<Object> firstEvent = (List<Object>) ((List<Object>) this.getEvents()).get(0);
return (Long) firstEvent.get(timeColumnIndex);
}

View File

@ -42,8 +42,7 @@ public class ScanResultValueTimestampComparator implements Comparator<ScanResult
@Override
public int compare(ScanResultValue o1, ScanResultValue o2)
{
int comparison;
comparison = Longs.compare(
int comparison = Longs.compare(
o1.getFirstEventTimestamp(scanQuery.getResultFormat()),
o2.getFirstEventTimestamp(scanQuery.getResultFormat()));
if (scanQuery.getOrder().equals(ScanQuery.Order.DESCENDING)) {

View File

@ -21,7 +21,6 @@ package org.apache.druid.query.spec;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.query.Query;
@ -64,14 +63,7 @@ public class MultipleSpecificSegmentSpec implements QuerySegmentSpec
intervals = JodaUtils.condenseIntervals(
Iterables.transform(
descriptors,
new Function<SegmentDescriptor, Interval>()
{
@Override
public Interval apply(SegmentDescriptor input)
{
return input.getInterval();
}
}
input -> input.getInterval()
)
);

View File

@ -31,9 +31,15 @@ import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.spec.LegacySegmentSpec;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -43,12 +49,9 @@ import java.util.Collections;
import java.util.List;
@RunWith(Parameterized.class)
@RunWith(Enclosed.class)
public class ScanQueryRunnerFactoryTest
{
private int numElements;
private ScanQuery query;
private ScanQuery.ResultFormat resultFormat;
private static final ScanQueryRunnerFactory factory = new ScanQueryRunnerFactory(
new ScanQueryQueryToolChest(
@ -59,7 +62,14 @@ public class ScanQueryRunnerFactoryTest
new ScanQueryConfig()
);
public ScanQueryRunnerFactoryTest(
@RunWith(Parameterized.class)
public static class ScanQueryRunnerFactoryParameterizedTest
{
private int numElements;
private ScanQuery query;
private ScanQuery.ResultFormat resultFormat;
public ScanQueryRunnerFactoryParameterizedTest(
final int numElements,
final int batchSize,
final long limit,
@ -256,3 +266,57 @@ public class ScanQueryRunnerFactoryTest
}
}
}
public static class ScanQueryRunnerFactoryNonParameterizedTest
{
private SegmentDescriptor descriptor = new SegmentDescriptor(new Interval(
DateTimes.of("2010-01-01"),
DateTimes.of("2019-01-01").plusHours(1)
), "1", 0);
@Test
public void testGetValidSegmentDescriptorsFromSpec()
{
QuerySegmentSpec multiSpecificSpec = new MultipleSpecificSegmentSpec(
Collections.singletonList(
descriptor
)
);
QuerySegmentSpec singleSpecificSpec = new SpecificSegmentSpec(descriptor);
List<SegmentDescriptor> descriptors = factory.getSegmentDescriptorsFromSpecificQuerySpec(multiSpecificSpec);
Assert.assertEquals(1, descriptors.size());
Assert.assertEquals(descriptor, descriptors.get(0));
descriptors = factory.getSegmentDescriptorsFromSpecificQuerySpec(singleSpecificSpec);
Assert.assertEquals(1, descriptors.size());
Assert.assertEquals(descriptor, descriptors.get(0));
}
@Test(expected = UOE.class)
public void testGetSegmentDescriptorsFromInvalidIntervalSpec()
{
QuerySegmentSpec multiIntervalSpec = new MultipleIntervalSegmentSpec(
Collections.singletonList(
new Interval(
DateTimes.of("2010-01-01"),
DateTimes.of("2019-01-01").plusHours(1)
)
)
);
factory.getSegmentDescriptorsFromSpecificQuerySpec(multiIntervalSpec);
}
@Test(expected = UOE.class)
public void testGetSegmentDescriptorsFromInvalidLegacySpec()
{
QuerySegmentSpec legacySpec = new LegacySegmentSpec(
new Interval(
DateTimes.of("2010-01-01"),
DateTimes.of("2019-01-01").plusHours(1)
)
);
factory.getSegmentDescriptorsFromSpecificQuerySpec(legacySpec);
}
}
}

View File

@ -67,6 +67,7 @@ import java.util.Map;
import java.util.Set;
/**
*
*/
@RunWith(Parameterized.class)
public class ScanQueryRunnerTest
@ -524,7 +525,11 @@ public class ScanQueryRunnerTest
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null))
.columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric)
.columns(
QueryRunnerTestHelper.timeDimension,
QueryRunnerTestHelper.qualityDimension,
QueryRunnerTestHelper.indexMetric
)
.limit(limit)
.order(ScanQuery.Order.ASCENDING)
.context(ImmutableMap.of(ScanQuery.CTX_KEY_OUTERMOST, false))
@ -556,7 +561,7 @@ public class ScanQueryRunnerTest
};
final List<List<Map<String, Object>>> ascendingEvents = toEvents(
new String[]{
legacy ? getTimestampName() + ":TIME" : null,
legacy ? getTimestampName() + ":TIME" : ColumnHolder.TIME_COLUMN_NAME,
null,
QueryRunnerTestHelper.qualityDimension + ":STRING",
null,
@ -565,9 +570,35 @@ public class ScanQueryRunnerTest
},
(String[]) ArrayUtils.addAll(seg1Results, seg2Results)
);
if (legacy) {
for (List<Map<String, Object>> batch : ascendingEvents) {
for (Map<String, Object> event : batch) {
event.put("__time", ((DateTime) event.get("timestamp")).getMillis());
}
}
} else {
for (List<Map<String, Object>> batch : ascendingEvents) {
for (Map<String, Object> event : batch) {
event.put("__time", (DateTimes.of((String) event.get("__time"))).getMillis());
}
}
}
List<ScanResultValue> ascendingExpectedResults = toExpected(
ascendingEvents,
legacy ? Lists.newArrayList(getTimestampName(), "quality", "index") : Lists.newArrayList("quality", "index"),
legacy ?
Lists.newArrayList(
QueryRunnerTestHelper.timeDimension,
getTimestampName(),
"quality",
"index"
) :
Lists.newArrayList(
QueryRunnerTestHelper.timeDimension,
"quality",
"index"
),
0,
limit
);
@ -583,7 +614,11 @@ public class ScanQueryRunnerTest
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null))
.columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric)
.columns(
QueryRunnerTestHelper.timeDimension,
QueryRunnerTestHelper.qualityDimension,
QueryRunnerTestHelper.indexMetric
)
.limit(limit)
.order(ScanQuery.Order.DESCENDING)
.build();
@ -616,7 +651,7 @@ public class ScanQueryRunnerTest
ArrayUtils.reverse(expectedRet);
final List<List<Map<String, Object>>> descendingEvents = toEvents(
new String[]{
legacy ? getTimestampName() + ":TIME" : null,
legacy ? getTimestampName() + ":TIME" : ColumnHolder.TIME_COLUMN_NAME,
null,
QueryRunnerTestHelper.qualityDimension + ":STRING",
null,
@ -625,9 +660,34 @@ public class ScanQueryRunnerTest
},
expectedRet
);
if (legacy) {
for (List<Map<String, Object>> batch : descendingEvents) {
for (Map<String, Object> event : batch) {
event.put("__time", ((DateTime) event.get("timestamp")).getMillis());
}
}
} else {
for (List<Map<String, Object>> batch : descendingEvents) {
for (Map<String, Object> event : batch) {
event.put("__time", (DateTimes.of((String) event.get("__time"))).getMillis());
}
}
}
List<ScanResultValue> descendingExpectedResults = toExpected(
descendingEvents,
legacy ? Lists.newArrayList(getTimestampName(), "quality", "index") : Lists.newArrayList("quality", "index"),
legacy ?
Lists.newArrayList(
QueryRunnerTestHelper.timeDimension,
getTimestampName(),
// getTimestampName() always returns the legacy timestamp when legacy is true
"quality",
"index"
) :
Lists.newArrayList(
QueryRunnerTestHelper.timeDimension,
"quality",
"index"
),
0,
limit
);
@ -666,7 +726,11 @@ public class ScanQueryRunnerTest
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null))
.columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric)
.columns(
QueryRunnerTestHelper.timeDimension,
QueryRunnerTestHelper.qualityDimension,
QueryRunnerTestHelper.indexMetric
)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.order(ScanQuery.Order.ASCENDING)
.limit(limit)
@ -676,7 +740,7 @@ public class ScanQueryRunnerTest
Iterable<ScanResultValue> results = runner.run(QueryPlus.wrap(query), context).toList();
final List<List<Map<String, Object>>> ascendingEvents = toEvents(
new String[]{
legacy ? getTimestampName() + ":TIME" : null,
legacy ? getTimestampName() + ":TIME" : ColumnHolder.TIME_COLUMN_NAME,
null,
QueryRunnerTestHelper.qualityDimension + ":STRING",
null,
@ -685,9 +749,34 @@ public class ScanQueryRunnerTest
},
(String[]) ArrayUtils.addAll(seg1Results, seg2Results)
);
if (legacy) {
for (List<Map<String, Object>> batch : ascendingEvents) {
for (Map<String, Object> event : batch) {
event.put("__time", ((DateTime) event.get("timestamp")).getMillis());
}
}
} else {
for (List<Map<String, Object>> batch : ascendingEvents) {
for (Map<String, Object> event : batch) {
event.put("__time", ((DateTimes.of((String) event.get("__time"))).getMillis()));
}
}
}
List<ScanResultValue> ascendingExpectedResults = toExpected(
ascendingEvents,
legacy ? Lists.newArrayList(getTimestampName(), "quality", "index") : Lists.newArrayList("quality", "index"),
legacy ?
Lists.newArrayList(
QueryRunnerTestHelper.timeDimension,
getTimestampName(),
// getTimestampName() always returns the legacy timestamp when legacy is true
"quality",
"index"
) :
Lists.newArrayList(
QueryRunnerTestHelper.timeDimension,
"quality",
"index"
),
0,
limit
);
@ -727,7 +816,11 @@ public class ScanQueryRunnerTest
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null))
.columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric)
.columns(
QueryRunnerTestHelper.timeDimension,
QueryRunnerTestHelper.qualityDimension,
QueryRunnerTestHelper.indexMetric
)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.order(ScanQuery.Order.DESCENDING)
.context(ImmutableMap.of(ScanQuery.CTX_KEY_OUTERMOST, false))
@ -740,7 +833,7 @@ public class ScanQueryRunnerTest
ArrayUtils.reverse(expectedRet);
final List<List<Map<String, Object>>> descendingEvents = toEvents(
new String[]{
legacy ? getTimestampName() + ":TIME" : null,
legacy ? getTimestampName() + ":TIME" : ColumnHolder.TIME_COLUMN_NAME,
null,
QueryRunnerTestHelper.qualityDimension + ":STRING",
null,
@ -749,9 +842,34 @@ public class ScanQueryRunnerTest
},
expectedRet //segments in reverse order from above
);
if (legacy) {
for (List<Map<String, Object>> batch : descendingEvents) {
for (Map<String, Object> event : batch) {
event.put("__time", ((DateTime) event.get("timestamp")).getMillis());
}
}
} else {
for (List<Map<String, Object>> batch : descendingEvents) {
for (Map<String, Object> event : batch) {
event.put("__time", ((DateTimes.of((String) event.get("__time"))).getMillis()));
}
}
}
List<ScanResultValue> descendingExpectedResults = toExpected(
descendingEvents,
legacy ? Lists.newArrayList(getTimestampName(), "quality", "index") : Lists.newArrayList("quality", "index"),
legacy ?
Lists.newArrayList(
QueryRunnerTestHelper.timeDimension,
getTimestampName(),
// getTimestampName() always returns the legacy timestamp when legacy is true
"quality",
"index"
) :
Lists.newArrayList(
QueryRunnerTestHelper.timeDimension,
"quality",
"index"
),
0,
limit
);
@ -760,7 +878,6 @@ public class ScanQueryRunnerTest
}
}
private List<List<Map<String, Object>>> toFullEvents(final String[]... valueSet)
{
return toEvents(
@ -799,13 +916,9 @@ public class ScanQueryRunnerTest
Lists.newArrayList(
Iterables.transform(
values,
new Function<String, Map<String, Object>>()
{
@Override
public Map<String, Object> apply(String input)
{
input -> {
Map<String, Object> event = new HashMap<>();
String[] values = input.split("\\t");
String[] values1 = input.split("\\t");
for (int i = 0; i < dimSpecs.length; i++) {
if (dimSpecs[i] == null || i >= dimSpecs.length) {
continue;
@ -844,7 +957,7 @@ public class ScanQueryRunnerTest
event.put("quality_uniques", collector);
}
if (i >= values.length) {
if (i >= values1.length) {
continue;
}
@ -852,19 +965,18 @@ public class ScanQueryRunnerTest
event.put(
specs[0],
specs.length == 1 || specs[1].equals("STRING") ? values[i] :
specs[1].equals("TIME") ? toTimestamp(values[i]) :
specs[1].equals("FLOAT") ? Float.valueOf(values[i]) :
specs[1].equals("DOUBLE") ? Double.valueOf(values[i]) :
specs[1].equals("LONG") ? Long.valueOf(values[i]) :
specs.length == 1 || specs[1].equals("STRING") ? values1[i] :
specs[1].equals("TIME") ? toTimestamp(values1[i]) :
specs[1].equals("FLOAT") ? Float.valueOf(values1[i]) :
specs[1].equals("DOUBLE") ? Double.valueOf(values1[i]) :
specs[1].equals("LONG") ? Long.valueOf(values1[i]) :
specs[1].equals("NULL") ? null :
specs[1].equals("STRINGS") ? Arrays.asList(values[i].split("\u0001")) :
values[i]
specs[1].equals("STRINGS") ? Arrays.asList(values1[i].split("\u0001")) :
values1[i]
);
}
return event;
}
}
)
)
);
@ -969,7 +1081,6 @@ public class ScanQueryRunnerTest
} else {
Assert.assertEquals("invalid value for " + ac.getKey(), exVal, actVal);
}
}
}

View File

@ -0,0 +1,272 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.scan;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.Druids;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.column.ColumnHolder;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
public class ScanQueryTest
{
private static QuerySegmentSpec intervalSpec;
private static ScanResultValue s1;
private static ScanResultValue s2;
private static ScanResultValue s3;
@BeforeClass
public static void setup()
{
intervalSpec = new MultipleIntervalSegmentSpec(
Collections.singletonList(
new Interval(DateTimes.of("2012-01-01"), DateTimes.of("2012-01-01").plusHours(1))
)
);
ArrayList<HashMap<String, Object>> events1 = new ArrayList<>();
HashMap<String, Object> event1 = new HashMap<>();
event1.put(ColumnHolder.TIME_COLUMN_NAME, new Long(42));
events1.add(event1);
s1 = new ScanResultValue(
"segmentId",
Collections.singletonList(ColumnHolder.TIME_COLUMN_NAME),
events1
);
ArrayList<HashMap<String, Object>> events2 = new ArrayList<>();
HashMap<String, Object> event2 = new HashMap<>();
event2.put(ColumnHolder.TIME_COLUMN_NAME, new Long(43));
events2.add(event2);
s2 = new ScanResultValue(
"segmentId",
Collections.singletonList(ColumnHolder.TIME_COLUMN_NAME),
events2
);
// ScanResultValue s3 has no time column
ArrayList<HashMap<String, Object>> events3 = new ArrayList<>();
HashMap<String, Object> event3 = new HashMap<>();
event3.put("yah", "yeet");
events3.add(event3);
s3 = new ScanResultValue(
"segmentId",
Collections.singletonList("yah"),
events3
);
}
@Test(expected = IllegalArgumentException.class)
public void testAscendingScanQueryWithInvalidColumns()
{
Druids.newScanQueryBuilder()
.order(ScanQuery.Order.ASCENDING)
.columns(ImmutableList.of("not time", "also not time"))
.dataSource("source")
.intervals(intervalSpec)
.build();
}
@Test(expected = IllegalArgumentException.class)
public void testDescendingScanQueryWithInvalidColumns()
{
Druids.newScanQueryBuilder()
.order(ScanQuery.Order.DESCENDING)
.columns(ImmutableList.of("not time", "also not time"))
.dataSource("source")
.intervals(intervalSpec)
.build();
}
// No assertions because we're checking that no IllegalArgumentExceptions are thrown
@Test
public void testValidScanQueryInitialization()
{
List<ScanQuery.Order> nonOrderedOrders = Arrays.asList(null, ScanQuery.Order.NONE);
for (ScanQuery.Order order : nonOrderedOrders) {
Druids.newScanQueryBuilder()
.order(order)
.columns(ImmutableList.of("not time"))
.dataSource("source")
.intervals(intervalSpec)
.build();
Druids.newScanQueryBuilder()
.order(order)
.dataSource("source")
.intervals(intervalSpec)
.build();
Druids.newScanQueryBuilder()
.order(order)
.columns(ImmutableList.of())
.dataSource("source")
.intervals(intervalSpec)
.build();
Druids.newScanQueryBuilder()
.order(order)
.columns(ImmutableList.of("__time"))
.dataSource("source")
.intervals(intervalSpec)
.build();
}
Set<ScanQuery.Order> orderedOrders = ImmutableSet.of(ScanQuery.Order.ASCENDING, ScanQuery.Order.DESCENDING);
for (ScanQuery.Order order : orderedOrders) {
Druids.newScanQueryBuilder()
.order(order)
.columns((List<String>) null)
.dataSource("source")
.intervals(intervalSpec)
.build();
Druids.newScanQueryBuilder()
.order(order)
.columns(ImmutableList.of())
.dataSource("source")
.intervals(intervalSpec)
.build();
Druids.newScanQueryBuilder()
.order(order)
.dataSource("source")
.intervals(intervalSpec)
.build();
Druids.newScanQueryBuilder()
.order(order)
.columns(ImmutableList.of("__time", "col2"))
.dataSource("source")
.intervals(intervalSpec)
.build();
}
}
// Validates that getResultOrdering will work for the broker n-way merge
@Test
public void testMergeSequenceForResults()
{
// Should be able to handle merging s1, s2, s3
ScanQuery noOrderScan = Druids.newScanQueryBuilder()
.order(ScanQuery.Order.NONE)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)
.dataSource("some src")
.intervals(intervalSpec)
.build();
// Should only handle s1 and s2
ScanQuery descendingOrderScan = Druids.newScanQueryBuilder()
.order(ScanQuery.Order.DESCENDING)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)
.dataSource("some src")
.intervals(intervalSpec)
.build();
// Should only handle s1 and s2
ScanQuery ascendingOrderScan = Druids.newScanQueryBuilder()
.order(ScanQuery.Order.ASCENDING)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)
.dataSource("some src")
.intervals(intervalSpec)
.build();
// No Order
Sequence<ScanResultValue> noOrderSeq =
Sequences.simple(
ImmutableList.of(
Sequences.simple(ImmutableList.of(s1, s3)),
Sequences.simple(ImmutableList.of(s2))
)
).flatMerge(seq -> seq, noOrderScan.getResultOrdering());
List<ScanResultValue> noOrderList = noOrderSeq.toList();
Assert.assertEquals(3, noOrderList.size());
// Ascending
Sequence<ScanResultValue> ascendingOrderSeq = Sequences.simple(
ImmutableList.of(
Sequences.simple(ImmutableList.of(s1)),
Sequences.simple(ImmutableList.of(s2))
)
).flatMerge(seq -> seq, ascendingOrderScan.getResultOrdering());
List<ScanResultValue> ascendingList = ascendingOrderSeq.toList();
Assert.assertEquals(2, ascendingList.size());
Assert.assertEquals(s1, ascendingList.get(0));
Assert.assertEquals(s2, ascendingList.get(1));
// Descending
Sequence<ScanResultValue> descendingOrderSeq = Sequences.simple(
ImmutableList.of(
Sequences.simple(ImmutableList.of(s1)),
Sequences.simple(ImmutableList.of(s2))
)
).flatMerge(seq -> seq, descendingOrderScan.getResultOrdering());
List<ScanResultValue> descendingList = descendingOrderSeq.toList();
Assert.assertEquals(2, descendingList.size());
Assert.assertEquals(s2, descendingList.get(0));
Assert.assertEquals(s1, descendingList.get(1));
}
@Test(expected = ISE.class)
public void testTimeOrderingWithoutTimeColumn()
{
ScanQuery descendingOrderScan = Druids.newScanQueryBuilder()
.order(ScanQuery.Order.DESCENDING)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)
.dataSource("some src")
.intervals(intervalSpec)
.build();
// This should fail because s3 doesn't have a timestamp
Sequence<ScanResultValue> borkedSequence = Sequences.simple(
ImmutableList.of(
Sequences.simple(ImmutableList.of(s1)),
Sequences.simple(ImmutableList.of(s2, s3))
)
).flatMerge(seq -> seq, descendingOrderScan.getResultOrdering());
// This should throw an ISE
List<ScanResultValue> res = borkedSequence.toList();
}
}

View File

@ -49,7 +49,7 @@ public class ScanResultValueTimestampComparatorTest
}
@Test
public void comparisonDescendingListTest()
public void testComparisonDescendingList()
{
ScanQuery query = Druids.newScanQueryBuilder()
.order(ScanQuery.Order.DESCENDING)
@ -86,7 +86,7 @@ public class ScanResultValueTimestampComparatorTest
}
@Test
public void comparisonAscendingListTest()
public void testComparisonAscendingList()
{
ScanQuery query = Druids.newScanQueryBuilder()
.order(ScanQuery.Order.ASCENDING)
@ -123,7 +123,7 @@ public class ScanResultValueTimestampComparatorTest
}
@Test
public void comparisonDescendingCompactedListTest()
public void testComparisonDescendingCompactedList()
{
ScanQuery query = Druids.newScanQueryBuilder()
.order(ScanQuery.Order.DESCENDING)
@ -158,7 +158,7 @@ public class ScanResultValueTimestampComparatorTest
}
@Test
public void comparisonAscendingCompactedListTest()
public void testAscendingCompactedList()
{
ScanQuery query = Druids.newScanQueryBuilder()
.order(ScanQuery.Order.ASCENDING)