Scan: Add "orderBy" parameter. (#11930)

* Scan: Add "orderBy" parameter.

This patch adds an API for requesting non-time orderings, although it
does not actually add the ability to execute such queries.

The changes are done in such a way that no matter how Scan query objects
are constructed, they will have a correct "getOrderBy". This will enable
us to switch the execution to exclusively use "getOrderBy" later on when
it's implemented.

Scan queries are serialized such that they only include "order" (time
order) if the ordering is time-based, and they only include "orderBy" if
the ordering is non-time-based. This maximizes compatibility with
the existing API while also providing a clean look for formatted queries.

Because this patch does not include execution logic, if someone actually
tries to run a query with non-time ordering, then they will get an error
like "Cannot execute query with orderBy [quality ASC]".

* SQL module fixes.

* Add spotbugs-exclude.

* Remove unused method.
This commit is contained in:
Gian Merlino 2021-11-19 08:19:12 -08:00 committed by GitHub
parent 3c51136098
commit 36ee0367ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 450 additions and 84 deletions

View File

@ -34,6 +34,7 @@
<Bug pattern="EQ_CHECK_FOR_OPERAND_NOT_COMPATIBLE_WITH_THIS"/>
<Or>
<Class name="org.apache.druid.query.scan.ScanQuery$ScanRowsLimitJsonIncludeFilter"/>
<Class name="org.apache.druid.query.scan.ScanQuery$ScanTimeOrderJsonIncludeFilter"/>
<Class name="org.apache.druid.query.groupby.orderby.DefaultLimitSpec$LimitJsonIncludeFilter"/>
</Or>
</And>

View File

@ -279,6 +279,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
0,
null,
null,
null,
columns,
false,
null

View File

@ -796,6 +796,7 @@ public class Druids
private List<String> columns;
private Boolean legacy;
private ScanQuery.Order order;
private List<ScanQuery.OrderBy> orderBy;
public ScanQueryBuilder()
{
@ -811,6 +812,7 @@ public class Druids
columns = new ArrayList<>();
legacy = null;
order = null;
orderBy = null;
}
public ScanQuery build()
@ -824,6 +826,7 @@ public class Druids
offset,
limit,
order,
orderBy,
dimFilter,
columns,
legacy,
@ -845,7 +848,7 @@ public class Druids
.columns(query.getColumns())
.legacy(query.isLegacy())
.context(query.getContext())
.order(query.getOrder());
.orderBy(query.getOrderBys());
}
public ScanQueryBuilder dataSource(String ds)
@ -936,6 +939,12 @@ public class Druids
this.order = order;
return this;
}
public ScanQueryBuilder orderBy(List<ScanQuery.OrderBy> orderBys)
{
this.orderBy = orderBys;
return this;
}
}
public static ScanQueryBuilder newScanQueryBuilder()

View File

@ -25,14 +25,17 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Queries;
import org.apache.druid.query.Query;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.VirtualColumns;
@ -86,6 +89,63 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
}
}
public static class OrderBy
{
private final String columnName;
private final Order order;
@JsonCreator
public OrderBy(
@JsonProperty("columnName") final String columnName,
@JsonProperty("order") final Order order
)
{
this.columnName = Preconditions.checkNotNull(columnName, "columnName");
this.order = Preconditions.checkNotNull(order, "order");
if (order == Order.NONE) {
throw new IAE("Order required for column [%s]", columnName);
}
}
@JsonProperty
public String getColumnName()
{
return columnName;
}
@JsonProperty
public Order getOrder()
{
return order;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
OrderBy that = (OrderBy) o;
return Objects.equals(columnName, that.columnName) && order == that.order;
}
@Override
public int hashCode()
{
return Objects.hash(columnName, order);
}
@Override
public String toString()
{
return StringUtils.format("%s %s", columnName, order == Order.ASCENDING ? "ASC" : "DESC");
}
}
public enum Order
{
ASCENDING,
@ -120,7 +180,8 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
private final DimFilter dimFilter;
private final List<String> columns;
private final Boolean legacy;
private final Order order;
private final Order timeOrder;
private final List<OrderBy> orderBys;
private final Integer maxRowsQueuedForOrdering;
private final Integer maxSegmentPartitionsOrderedInMemory;
@ -133,7 +194,8 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
@JsonProperty("batchSize") int batchSize,
@JsonProperty("offset") long scanRowsOffset,
@JsonProperty("limit") long scanRowsLimit,
@JsonProperty("order") Order order,
@JsonProperty("order") Order orderFromUser,
@JsonProperty("orderBy") List<OrderBy> orderBysFromUser,
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("columns") List<String> columns,
@JsonProperty("legacy") Boolean legacy,
@ -161,17 +223,43 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
this.dimFilter = dimFilter;
this.columns = columns;
this.legacy = legacy;
this.order = (order == null) ? Order.NONE : order;
if (this.order != Order.NONE) {
Preconditions.checkArgument(
columns == null || columns.size() == 0 || columns.contains(ColumnHolder.TIME_COLUMN_NAME),
"The __time column must be selected if the results are time-ordered."
);
final Pair<List<OrderBy>, Order> ordering = verifyAndReconcileOrdering(orderBysFromUser, orderFromUser);
this.orderBys = Preconditions.checkNotNull(ordering.lhs);
this.timeOrder = ordering.rhs;
if (this.columns != null && this.columns.size() > 0) {
// Validate orderBy. (Cannot validate when signature is empty, since that means "discover at runtime".)
for (final OrderBy orderByColumn : this.orderBys) {
if (!this.columns.contains(orderByColumn.getColumnName())) {
// Error message depends on how the user originally specified ordering.
if (orderBysFromUser != null) {
throw new IAE("Column [%s] from 'orderBy' must also appear in 'columns'.", orderByColumn.getColumnName());
} else {
throw new IllegalArgumentException("The __time column must be selected if the results are time-ordered.");
}
}
}
}
this.maxRowsQueuedForOrdering = validateAndGetMaxRowsQueuedForOrdering();
this.maxSegmentPartitionsOrderedInMemory = validateAndGetMaxSegmentPartitionsOrderedInMemory();
}
/**
* Verifies that the ordering of a query is solely determined by {@link #getTimeOrder()}. Required to actually
* execute queries, because {@link #getOrderBys()} is not yet understood by the query engines.
*
* @throws IllegalStateException if the ordering is not solely determined by {@link #getTimeOrder()}
*/
public static void verifyOrderByForNativeExecution(final ScanQuery query)
{
if (query.getTimeOrder() == Order.NONE && !query.getOrderBys().isEmpty()) {
throw new ISE("Cannot execute query with orderBy %s", query.getOrderBys());
}
}
private Integer validateAndGetMaxRowsQueuedForOrdering()
{
final Integer maxRowsQueuedForOrdering =
@ -245,10 +333,37 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
return scanRowsLimit != Long.MAX_VALUE;
}
@JsonProperty
public Order getOrder()
/**
* If this query is purely-time-ordered, returns a value of the enum {@link Order}. Otherwise, returns
* {@link Order#NONE}. If the returned value is {@link Order#NONE} it may not agree with {@link #getOrderBys()}.
*/
@JsonProperty("order")
@JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = ScanTimeOrderJsonIncludeFilter.class)
public Order getTimeOrder()
{
return order;
return timeOrder;
}
public List<OrderBy> getOrderBys()
{
return orderBys;
}
@Nullable
@JsonProperty("orderBy")
@JsonInclude(JsonInclude.Include.NON_EMPTY)
List<OrderBy> getOrderBysForJson()
{
// Return "orderBy" if necessary (meaning: if it is nonempty and nontime). Prevents polluting JSONs with
// redundant "orderBy" and "order" fields.
if (orderBys.size() > 1
|| (orderBys.size() == 1
&& !Iterables.getOnlyElement(orderBys).getColumnName().equals(ColumnHolder.TIME_COLUMN_NAME))) {
return orderBys;
} else {
return null;
}
}
@Nullable
@ -272,6 +387,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
}
@Override
@Nullable
@JsonProperty
public DimFilter getFilter()
{
@ -284,7 +400,9 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
return SCAN;
}
@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public List<String> getColumns()
{
return columns;
@ -293,7 +411,9 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
/**
* Compatibility mode with the legacy scan-query extension.
*/
@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public Boolean isLegacy()
{
return legacy;
@ -302,12 +422,15 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
@Override
public Ordering<ScanResultValue> getResultOrdering()
{
if (order == Order.NONE) {
// No support yet for actually executing queries with non-time orderBy.
verifyOrderByForNativeExecution(this);
if (timeOrder == Order.NONE) {
return Ordering.natural();
}
return Ordering.from(
new ScanResultValueTimestampComparator(this).thenComparing(
order == Order.ASCENDING
timeOrder == Order.ASCENDING
? Comparator.naturalOrder()
: Comparator.<ScanResultValue>naturalOrder().reversed()
)
@ -348,19 +471,19 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
}
@Override
public Query<ScanResultValue> withQuerySegmentSpec(QuerySegmentSpec querySegmentSpec)
public ScanQuery withQuerySegmentSpec(QuerySegmentSpec querySegmentSpec)
{
return Druids.ScanQueryBuilder.copy(this).intervals(querySegmentSpec).build();
}
@Override
public Query<ScanResultValue> withDataSource(DataSource dataSource)
public ScanQuery withDataSource(DataSource dataSource)
{
return Druids.ScanQueryBuilder.copy(this).dataSource(dataSource).build();
}
@Override
public Query<ScanResultValue> withOverriddenContext(Map<String, Object> contextOverrides)
public ScanQuery withOverriddenContext(Map<String, Object> contextOverrides)
{
return Druids.ScanQueryBuilder.copy(this).context(computeOverriddenContext(getContext(), contextOverrides)).build();
}
@ -385,7 +508,8 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
Objects.equals(virtualColumns, scanQuery.virtualColumns) &&
Objects.equals(resultFormat, scanQuery.resultFormat) &&
Objects.equals(dimFilter, scanQuery.dimFilter) &&
Objects.equals(columns, scanQuery.columns);
Objects.equals(columns, scanQuery.columns) &&
Objects.equals(orderBys, scanQuery.orderBys);
}
@Override
@ -400,6 +524,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
scanRowsLimit,
dimFilter,
columns,
orderBys,
legacy
);
}
@ -417,10 +542,92 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
", limit=" + scanRowsLimit +
", dimFilter=" + dimFilter +
", columns=" + columns +
", legacy=" + legacy +
(orderBys.isEmpty() ? "" : ", orderBy=" + orderBys) +
(legacy == null ? "" : ", legacy=" + legacy) +
", context=" + getContext() +
'}';
}
/**
* Verify and reconcile the two ways of specifying ordering: "orderBy", which can refer to any column, and
* "order", which refers to the __time column.
*
* If only "order" is provided, it is returned as-is, along with an equivalent "orderBy".
*
* If only "orderBy" is provided, it is returned as-is. If it can be converted into an equivalent "order", then that
* equivalent "order" is also returned. Otherwise, "orderBy" is returned as-is and "order" is returned as NONE.
*
* If both "orderBy" and "order" are provided, this returns them as-is if they are compatible, or throws an
* exception if they are incompatible.
*
* @param orderByFromUser "orderBy" specified by the user (can refer to any column)
* @param orderFromUser "order" specified by the user (refers to time order)
*/
private static Pair<List<OrderBy>, Order> verifyAndReconcileOrdering(
@Nullable final List<OrderBy> orderByFromUser,
@Nullable final Order orderFromUser
)
{
final List<OrderBy> orderByRetVal;
final Order orderRetVal;
// Compute the returned orderBy.
if (orderByFromUser != null) {
orderByRetVal = orderByFromUser;
} else if (orderFromUser == null || orderFromUser == Order.NONE) {
orderByRetVal = Collections.emptyList();
} else {
orderByRetVal = Collections.singletonList(new OrderBy(ColumnHolder.TIME_COLUMN_NAME, orderFromUser));
}
// Compute the returned order.
orderRetVal = computeTimeOrderFromOrderBys(orderByRetVal);
// Verify compatibility, if the user specified both kinds of ordering.
if (orderFromUser != null && orderFromUser != Order.NONE && orderRetVal != orderFromUser) {
throw new IAE("Cannot provide 'order' incompatible with 'orderBy'");
}
return Pair.of(orderByRetVal, orderRetVal);
}
/**
* Compute time ordering based on a list of orderBys.
*
* Returns {@link Order#ASCENDING} or {@link Order#DESCENDING} if the ordering is time-based; returns
* {@link Order#NONE} otherwise. Importantly, this means that the returned order is not necessarily compatible
* with the input orderBys.
*/
@Nullable
private static Order computeTimeOrderFromOrderBys(final List<OrderBy> orderBys)
{
if (orderBys.size() == 1) {
final OrderBy orderByColumn = Iterables.getOnlyElement(orderBys);
if (ColumnHolder.TIME_COLUMN_NAME.equals(orderByColumn.getColumnName())) {
return orderByColumn.getOrder();
}
}
return Order.NONE;
}
/**
* {@link JsonInclude} filter for {@link #getTimeOrder()}.
*
* This API works by "creative" use of equals. It requires warnings to be suppressed and also requires spotbugs
* exclusions (see spotbugs-exclude.xml).
*/
@SuppressWarnings({"EqualsAndHashcode", "EqualsHashCode"})
static class ScanTimeOrderJsonIncludeFilter // lgtm [java/inconsistent-equals-and-hashcode]
{
@Override
public boolean equals(Object obj)
{
return obj instanceof Order && Order.NONE.equals(obj);
}
}
/**
* {@link JsonInclude} filter for {@link #getScanRowsLimit()}.
*
@ -433,14 +640,6 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
@Override
public boolean equals(Object obj)
{
if (obj == null) {
return false;
}
if (obj.getClass() == this.getClass()) {
return true;
}
return obj instanceof Long && (long) obj == Long.MAX_VALUE;
}
}

View File

@ -70,7 +70,7 @@ public class ScanQueryEngine
final Object numScannedRows = responseContext.get(ResponseContext.Key.NUM_SCANNED_ROWS);
if (numScannedRows != null) {
long count = (long) numScannedRows;
if (count >= query.getScanRowsLimit() && query.getOrder().equals(ScanQuery.Order.NONE)) {
if (count >= query.getScanRowsLimit() && query.getTimeOrder().equals(ScanQuery.Order.NONE)) {
return Sequences.empty();
}
}
@ -131,8 +131,8 @@ public class ScanQueryEngine
intervals.get(0),
query.getVirtualColumns(),
Granularities.ALL,
query.getOrder().equals(ScanQuery.Order.DESCENDING) ||
(query.getOrder().equals(ScanQuery.Order.NONE) && query.isDescending()),
query.getTimeOrder().equals(ScanQuery.Order.DESCENDING) ||
(query.getTimeOrder().equals(ScanQuery.Order.NONE) && query.isDescending()),
null
)
.map(cursor -> new BaseSequence<>(
@ -261,7 +261,7 @@ public class ScanQueryEngine
*/
private long calculateRemainingScanRowsLimit(ScanQuery query, ResponseContext responseContext)
{
if (query.getOrder().equals(ScanQuery.Order.NONE)) {
if (query.getTimeOrder().equals(ScanQuery.Order.NONE)) {
return query.getScanRowsLimit() - (long) responseContext.get(ResponseContext.Key.NUM_SCANNED_ROWS);
}
return query.getScanRowsLimit();

View File

@ -57,7 +57,7 @@ public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultVa
private long count = 0;
private ScanQuery query;
public ScanQueryLimitRowIterator(
ScanQueryLimitRowIterator(
QueryRunner<ScanResultValue> baseRunner,
QueryPlus<ScanResultValue> queryPlus,
ResponseContext responseContext
@ -98,7 +98,7 @@ public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultVa
// We want to perform multi-event ScanResultValue limiting if we are not time-ordering or are at the
// inner-level if we are time-ordering
if (query.getOrder() == ScanQuery.Order.NONE ||
if (query.getTimeOrder() == ScanQuery.Order.NONE ||
!query.getContextBoolean(ScanQuery.CTX_KEY_OUTERMOST, true)) {
ScanResultValue batch = yielder.get();
List events = (List) batch.getEvents();

View File

@ -66,13 +66,11 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
public QueryRunner<ScanResultValue> mergeResults(final QueryRunner<ScanResultValue> runner)
{
return (queryPlus, responseContext) -> {
// Ensure "legacy" is a non-null value, such that all other nodes this query is forwarded to will treat it
// the same way, even if they have different default legacy values.
//
// Also, remove "offset" and add it to the "limit" (we won't push the offset down, just apply it here, at the
// merge at the top of the stack).
final ScanQuery originalQuery = ((ScanQuery) (queryPlus.getQuery()));
ScanQuery.verifyOrderByForNativeExecution(originalQuery);
// Remove "offset" and add it to the "limit" (we won't push the offset down, just apply it here, at the
// merge at the top of the stack).
final long newLimit;
if (!originalQuery.isLimited()) {
// Unlimited stays unlimited.
@ -87,6 +85,8 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
newLimit = originalQuery.getScanRowsLimit() + originalQuery.getScanRowsOffset();
}
// Ensure "legacy" is a non-null value, such that all other nodes this query is forwarded to will treat it
// the same way, even if they have different default legacy values.
final ScanQuery queryToRun = originalQuery.withNonNullLegacy(scanQueryConfig)
.withOffset(0)
.withLimit(newLimit);

View File

@ -90,13 +90,14 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
// in single thread and in jetty thread instead of processing thread
return (queryPlus, responseContext) -> {
ScanQuery query = (ScanQuery) queryPlus.getQuery();
ScanQuery.verifyOrderByForNativeExecution(query);
// Note: this variable is effective only when queryContext has a timeout.
// See the comment of ResponseContext.Key.TIMEOUT_AT.
final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery());
responseContext.put(ResponseContext.Key.TIMEOUT_AT, timeoutAt);
if (query.getOrder().equals(ScanQuery.Order.NONE)) {
if (query.getTimeOrder().equals(ScanQuery.Order.NONE)) {
// Use normal strategy
Sequence<ScanResultValue> returnedRows = Sequences.concat(
Sequences.map(
@ -113,7 +114,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
List<Interval> intervalsOrdered = getIntervalsFromSpecificQuerySpec(query.getQuerySegmentSpec());
List<QueryRunner<ScanResultValue>> queryRunnersOrdered = Lists.newArrayList(queryRunners);
if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
if (ScanQuery.Order.DESCENDING.equals(query.getTimeOrder())) {
intervalsOrdered = Lists.reverse(intervalsOrdered);
queryRunnersOrdered = Lists.reverse(queryRunnersOrdered);
}
@ -365,6 +366,8 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
throw new ISE("Got a [%s] which isn't a %s", query.getClass(), ScanQuery.class);
}
ScanQuery.verifyOrderByForNativeExecution((ScanQuery) query);
// it happens in unit tests
final Number timeoutAt = (Number) responseContext.get(ResponseContext.Key.TIMEOUT_AT);
if (timeoutAt == null || timeoutAt.longValue() == 0L) {

View File

@ -45,7 +45,7 @@ public class ScanResultValueTimestampComparator implements Comparator<ScanResult
int comparison = Longs.compare(
o1.getFirstEventTimestamp(scanQuery.getResultFormat()),
o2.getFirstEventTimestamp(scanQuery.getResultFormat()));
if (scanQuery.getOrder().equals(ScanQuery.Order.ASCENDING)) {
if (scanQuery.getTimeOrder().equals(ScanQuery.Order.ASCENDING)) {
return comparison;
}
return comparison * -1;

View File

@ -211,7 +211,6 @@ public class MultiSegmentScanQueryTest extends NullHandlingTest
.toList();
int totalCount = 0;
for (ScanResultValue result : results) {
System.out.println(((List) result.getEvents()).size());
totalCount += ((List) result.getEvents()).size();
}
Assert.assertEquals(

View File

@ -152,7 +152,7 @@ public class ScanQueryRunnerFactoryTest
} else if (o1 < o2) {
retVal = -1;
}
if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
if (query.getTimeOrder().equals(ScanQuery.Order.DESCENDING)) {
return retVal * -1;
}
return retVal;
@ -205,7 +205,7 @@ public class ScanQueryRunnerFactoryTest
scanResultValues3.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1));
}
if (query.getOrder() == ScanQuery.Order.DESCENDING) {
if (query.getTimeOrder() == ScanQuery.Order.DESCENDING) {
Collections.reverse(scanResultValues1);
Collections.reverse(scanResultValues2);
Collections.reverse(scanResultValues3);
@ -226,7 +226,7 @@ public class ScanQueryRunnerFactoryTest
List<List<QueryRunner<ScanResultValue>>> groupedRunners = new ArrayList<>(2);
if (query.getOrder() == ScanQuery.Order.DESCENDING) {
if (query.getTimeOrder() == ScanQuery.Order.DESCENDING) {
groupedRunners.add(Arrays.asList(runnerSegment2Partition1, runnerSegment2Partition2));
groupedRunners.add(Arrays.asList(runnerSegment1Partition1, runnerSegment1Partition2));
} else {
@ -241,7 +241,7 @@ public class ScanQueryRunnerFactoryTest
} else if (o1 < o2) {
retVal = -1;
}
if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
if (query.getTimeOrder().equals(ScanQuery.Order.DESCENDING)) {
return retVal * -1;
}
return retVal;
@ -273,7 +273,7 @@ public class ScanQueryRunnerFactoryTest
// check ordering is correct
for (int i = 1; i < output.size(); i++) {
if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
if (query.getTimeOrder().equals(ScanQuery.Order.DESCENDING)) {
Assert.assertTrue(output.get(i).getFirstEventTimestamp(resultFormat) <
output.get(i - 1).getFirstEventTimestamp(resultFormat));
} else {

View File

@ -31,6 +31,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
public class ScanQuerySpecTest
{
@ -54,10 +55,8 @@ public class ScanQuerySpecTest
+ "\"resultFormat\":\"list\","
+ "\"batchSize\":20480,"
+ "\"limit\":3,"
+ "\"order\":\"none\","
+ "\"filter\":null,"
+ "\"columns\":[\"market\",\"quality\",\"index\"],"
+ "\"legacy\":null,"
+ "\"context\":null,"
+ "\"descending\":false,"
+ "\"granularity\":{\"type\":\"all\"}}";
@ -72,6 +71,7 @@ public class ScanQuerySpecTest
3,
ScanQuery.Order.NONE,
null,
null,
Arrays.asList("market", "quality", "index"),
null,
null
@ -83,6 +83,92 @@ public class ScanQuerySpecTest
Assert.assertEquals(query, JSON_MAPPER.readValue(legacy, ScanQuery.class));
}
@Test
public void testSerializationWithTimeOrder() throws Exception
{
String originalJson =
"{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"testing\"},"
+ "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2011-01-12T00:00:00.000Z/2011-01-14T00:00:00.000Z\"]},"
+ "\"virtualColumns\":[],"
+ "\"resultFormat\":\"list\","
+ "\"batchSize\":20480,"
+ "\"limit\":3,"
+ "\"order\":\"ascending\","
+ "\"filter\":null,"
+ "\"columns\":[\"market\",\"quality\",\"index\",\"__time\"],"
+ "\"context\":null,"
+ "\"descending\":false,"
+ "\"granularity\":{\"type\":\"all\"}}";
ScanQuery expectedQuery = new ScanQuery(
new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE),
new LegacySegmentSpec(Intervals.of("2011-01-12/2011-01-14")),
VirtualColumns.EMPTY,
ScanQuery.ResultFormat.RESULT_FORMAT_LIST,
0,
0,
3,
ScanQuery.Order.ASCENDING,
null,
null,
Arrays.asList("market", "quality", "index", "__time"),
null,
null
);
String serializedJson = JSON_MAPPER.writeValueAsString(expectedQuery);
Assert.assertEquals(originalJson, serializedJson);
Assert.assertEquals(expectedQuery, JSON_MAPPER.readValue(originalJson, ScanQuery.class));
Assert.assertEquals(ScanQuery.Order.ASCENDING, expectedQuery.getTimeOrder());
Assert.assertEquals(
Collections.singletonList(new ScanQuery.OrderBy("__time", ScanQuery.Order.ASCENDING)),
expectedQuery.getOrderBys()
);
}
@Test
public void testSerializationWithOrderBy() throws Exception
{
String originalJson =
"{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"testing\"},"
+ "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2011-01-12T00:00:00.000Z/2011-01-14T00:00:00.000Z\"]},"
+ "\"virtualColumns\":[],"
+ "\"resultFormat\":\"list\","
+ "\"batchSize\":20480,"
+ "\"limit\":3,"
+ "\"orderBy\":[{\"columnName\":\"quality\",\"order\":\"ascending\"}],"
+ "\"filter\":null,"
+ "\"columns\":[\"market\",\"quality\",\"index\",\"__time\"],"
+ "\"context\":null,"
+ "\"descending\":false,"
+ "\"granularity\":{\"type\":\"all\"}}";
ScanQuery expectedQuery = new ScanQuery(
new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE),
new LegacySegmentSpec(Intervals.of("2011-01-12/2011-01-14")),
VirtualColumns.EMPTY,
ScanQuery.ResultFormat.RESULT_FORMAT_LIST,
0,
0,
3,
null,
Collections.singletonList(new ScanQuery.OrderBy("quality", ScanQuery.Order.ASCENDING)),
null,
Arrays.asList("market", "quality", "index", "__time"),
null,
null
);
String serializedJson = JSON_MAPPER.writeValueAsString(expectedQuery);
Assert.assertEquals(originalJson, serializedJson);
Assert.assertEquals(expectedQuery, JSON_MAPPER.readValue(originalJson, ScanQuery.class));
Assert.assertEquals(ScanQuery.Order.NONE, expectedQuery.getTimeOrder());
Assert.assertEquals(
Collections.singletonList(new ScanQuery.OrderBy("quality", ScanQuery.Order.ASCENDING)),
expectedQuery.getOrderBys()
);
}
@Test
public void testSerializationLegacyString() throws Exception
{
@ -96,6 +182,7 @@ public class ScanQuerySpecTest
3,
ScanQuery.Order.NONE,
null,
null,
Arrays.asList("market", "quality", "index"),
null,
null

View File

@ -114,6 +114,43 @@ public class ScanQueryTest
.build();
}
@Test
public void testConflictingOrderByAndTimeOrder()
{
Assert.assertThrows(
"Cannot provide 'order' incompatible with 'orderBy'",
IllegalArgumentException.class,
() ->
Druids.newScanQueryBuilder()
.order(ScanQuery.Order.ASCENDING)
.orderBy(
// Not ok, even though it starts with __time ASC, because it also has non-time component.
ImmutableList.of(
new ScanQuery.OrderBy("__time", ScanQuery.Order.ASCENDING),
new ScanQuery.OrderBy("quality", ScanQuery.Order.DESCENDING)
)
)
.columns(ImmutableList.of("__time", "quality"))
.dataSource("source")
.intervals(intervalSpec)
.build()
);
}
@Test
public void testCompatibleOrderByAndTimeOrder()
{
Assert.assertNotNull(
Druids.newScanQueryBuilder()
.order(ScanQuery.Order.ASCENDING)
.orderBy(ImmutableList.of(new ScanQuery.OrderBy("__time", ScanQuery.Order.ASCENDING)))
.columns(ImmutableList.of("__time", "quality"))
.dataSource("source")
.intervals(intervalSpec)
.build()
);
}
// No assertions because we're checking that no IllegalArgumentExceptions are thrown
@Test
public void testValidScanQueryInitialization()
@ -270,6 +307,37 @@ public class ScanQueryTest
List<ScanResultValue> res = borkedSequence.toList();
}
@Test
public void testGetResultOrderingWithTimeBasedOrderBy()
{
final ScanQuery scanQuery =
Druids.newScanQueryBuilder()
.columns("__time")
.orderBy(Collections.singletonList(new ScanQuery.OrderBy("__time", ScanQuery.Order.DESCENDING)))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)
.dataSource("some src")
.intervals(intervalSpec)
.build();
Assert.assertNotNull(scanQuery.getResultOrdering());
}
@Test
public void testGetResultOrderingWithNonTimeOrderBy()
{
// Queries with non-time order cannot currently be executed
final ScanQuery scanQuery =
Druids.newScanQueryBuilder()
.columns("quality")
.orderBy(Collections.singletonList(new ScanQuery.OrderBy("quality", ScanQuery.Order.ASCENDING)))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)
.dataSource("some src")
.intervals(intervalSpec)
.build();
Assert.assertThrows("Cannot execute query with orderBy [quality ASC]", ISE.class, scanQuery::getResultOrdering);
}
@Test
public void testGetRequiredColumnsWithNoColumns()
{

View File

@ -1154,6 +1154,7 @@ public class DruidQuery
scanOffset,
scanLimit,
order,
null,
filtration.getDimFilter(),
Ordering.natural().sortedCopy(columns),
false,

View File

@ -1076,7 +1076,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
ImmutableList.of(),
ImmutableList.of(
new Object[]{
"DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"inline\",\"columnNames\":[\"EXPR$0\"],\"columnTypes\":[\"LONG\"],\"rows\":[[2]]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"order\":\"none\",\"filter\":null,\"columns\":[\"EXPR$0\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{EXPR$0:LONG}])\n",
"DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"inline\",\"columnNames\":[\"EXPR$0\"],\"columnTypes\":[\"LONG\"],\"rows\":[[2]]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"filter\":null,\"columns\":[\"EXPR$0\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{EXPR$0:LONG}])\n",
"[]"
}
)
@ -1492,7 +1492,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
ImmutableList.of(),
ImmutableList.of(
new Object[]{
"DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"order\":\"none\",\"filter\":null,\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"dim3\",\"m1\",\"m2\",\"unique_dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{__time:LONG, cnt:LONG, dim1:STRING, dim2:STRING, dim3:STRING, m1:FLOAT, m2:DOUBLE, unique_dim1:COMPLEX<hyperUnique>}])\n",
"DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"filter\":null,\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"dim3\",\"m1\",\"m2\",\"unique_dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{__time:LONG, cnt:LONG, dim1:STRING, dim2:STRING, dim3:STRING, m1:FLOAT, m2:DOUBLE, unique_dim1:COMPLEX<hyperUnique>}])\n",
"[{\"name\":\"foo\",\"type\":\"DATASOURCE\"}]"
}
)
@ -1847,23 +1847,22 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.intervals(querySegmentSpec(Intervals.of("1990-01-01T00:00:00.000Z/146140482-04-24T15:36:27.903Z")))
.columns(
ImmutableList.<String>builder()
.add("__time")
.add("count")
.add("dimHyperUnique")
.add("dimMultivalEnumerated")
.add("dimMultivalEnumerated2")
.add("dimMultivalSequentialWithNulls")
.add("dimSequential")
.add("dimSequentialHalfNull")
.add("dimUniform")
.add("dimZipf")
.add("metFloatNormal")
.add("metFloatZipf")
.add("metLongSequential")
.build()
.add("__time")
.add("count")
.add("dimHyperUnique")
.add("dimMultivalEnumerated")
.add("dimMultivalEnumerated2")
.add("dimMultivalSequentialWithNulls")
.add("dimSequential")
.add("dimSequentialHalfNull")
.add("dimUniform")
.add("dimZipf")
.add("metFloatNormal")
.add("metFloatZipf")
.add("metLongSequential")
.build()
)
.limit(2)
.order(ScanQuery.Order.DESCENDING)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(OUTER_LIMIT_CONTEXT)
.build()
@ -1915,24 +1914,23 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.intervals(querySegmentSpec(Intervals.of("1990-01-01T00:00:00.000Z/146140482-04-24T15:36:27.903Z")))
.columns(
ImmutableList.<String>builder()
.add("__time")
.add("count")
.add("dimHyperUnique")
.add("dimMultivalEnumerated")
.add("dimMultivalEnumerated2")
.add("dimMultivalSequentialWithNulls")
.add("dimSequential")
.add("dimSequentialHalfNull")
.add("dimUniform")
.add("dimZipf")
.add("metFloatNormal")
.add("metFloatZipf")
.add("metLongSequential")
.add("metLongUniform")
.build()
.add("__time")
.add("count")
.add("dimHyperUnique")
.add("dimMultivalEnumerated")
.add("dimMultivalEnumerated2")
.add("dimMultivalSequentialWithNulls")
.add("dimSequential")
.add("dimSequentialHalfNull")
.add("dimUniform")
.add("dimZipf")
.add("metFloatNormal")
.add("metFloatZipf")
.add("metLongSequential")
.add("metLongUniform")
.build()
)
.limit(2)
.order(ScanQuery.Order.DESCENDING)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(OUTER_LIMIT_CONTEXT)
.build()
@ -9072,7 +9070,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
final String explanation =
"DruidOuterQueryRel(query=[{\"queryType\":\"timeseries\",\"dataSource\":{\"type\":\"table\",\"name\":\"__subquery__\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"descending\":false,\"virtualColumns\":[],\"filter\":null,\"granularity\":{\"type\":\"all\"},\"aggregations\":[{\"type\":\"count\",\"name\":\"a0\"}],\"postAggregations\":[],\"limit\":2147483647,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"}}], signature=[{a0:LONG}])\n"
+ " DruidJoinQueryRel(condition=[=(SUBSTRING($3, 1, 1), $8)], joinType=[inner], query=[{\"queryType\":\"groupBy\",\"dataSource\":{\"type\":\"table\",\"name\":\"__join__\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"filter\":null,\"granularity\":{\"type\":\"all\"},\"dimensions\":[{\"type\":\"default\",\"dimension\":\"dim2\",\"outputName\":\"d0\",\"outputType\":\"STRING\"}],\"aggregations\":[],\"postAggregations\":[],\"having\":null,\"limitSpec\":{\"type\":\"NoopLimitSpec\"},\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"descending\":false}], signature=[{d0:STRING}])\n"
+ " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"order\":\"none\",\"filter\":null,\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"dim3\",\"m1\",\"m2\",\"unique_dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{__time:LONG, cnt:LONG, dim1:STRING, dim2:STRING, dim3:STRING, m1:FLOAT, m2:DOUBLE, unique_dim1:COMPLEX<hyperUnique>}])\n"
+ " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"filter\":null,\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"dim3\",\"m1\",\"m2\",\"unique_dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{__time:LONG, cnt:LONG, dim1:STRING, dim2:STRING, dim3:STRING, m1:FLOAT, m2:DOUBLE, unique_dim1:COMPLEX<hyperUnique>}])\n"
+ " DruidQueryRel(query=[{\"queryType\":\"groupBy\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"filter\":{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"dim1\",\"value\":null,\"extractionFn\":null}},\"granularity\":{\"type\":\"all\"},\"dimensions\":[{\"type\":\"extraction\",\"dimension\":\"dim1\",\"outputName\":\"d0\",\"outputType\":\"STRING\",\"extractionFn\":{\"type\":\"substring\",\"index\":0,\"length\":1}}],\"aggregations\":[],\"postAggregations\":[],\"having\":null,\"limitSpec\":{\"type\":\"NoopLimitSpec\"},\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"descending\":false}], signature=[{d0:STRING}])\n";
final String resources = "[{\"name\":\"foo\",\"type\":\"DATASOURCE\"}]";