Add "offset" parameter to the Scan query. (#10233)

* Add "offset" parameter to the Scan query.

It works by doing the query as normal and then throwing away the first
"offset" number of rows on the broker.

* Fix constructor call.

* Fix up JSONs.

* Fix call to ScanQuery.

* Doc update.

* Fix javadocs.

* Spotbugs, LGTM suppressions.

* Javadocs.

* Fix suppression.

* Stabilize Scan query result order, add tests.

* Update LGTM comment.

* Fixup.

* Test different batch sizes too.

* Nicer tests.

* Fix comment.
This commit is contained in:
Gian Merlino 2020-08-13 14:56:24 -07:00 committed by GitHub
parent e053348f74
commit 6cca7242de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 892 additions and 106 deletions

View File

@ -33,6 +33,7 @@
<And> <And>
<Bug pattern="EQ_CHECK_FOR_OPERAND_NOT_COMPATIBLE_WITH_THIS"/> <Bug pattern="EQ_CHECK_FOR_OPERAND_NOT_COMPATIBLE_WITH_THIS"/>
<Or> <Or>
<Class name="org.apache.druid.query.scan.ScanQuery$ScanRowsLimitJsonIncludeFilter"/>
<Class name="org.apache.druid.query.groupby.orderby.DefaultLimitSpec$LimitJsonIncludeFilter"/> <Class name="org.apache.druid.query.groupby.orderby.DefaultLimitSpec$LimitJsonIncludeFilter"/>
</Or> </Or>
</And> </And>

View File

@ -22,6 +22,7 @@ package org.apache.druid.collections;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import java.util.Comparator; import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
@ -40,11 +41,13 @@ import java.util.Objects;
public class StableLimitingSorter<T> public class StableLimitingSorter<T>
{ {
private final MinMaxPriorityQueue<NumberedElement<T>> queue; private final MinMaxPriorityQueue<NumberedElement<T>> queue;
private final int limit;
private long count = 0; private long size = 0;
public StableLimitingSorter(final Comparator<T> comparator, final int limit) public StableLimitingSorter(final Comparator<T> comparator, final int limit)
{ {
this.limit = limit;
this.queue = MinMaxPriorityQueue this.queue = MinMaxPriorityQueue
.orderedBy( .orderedBy(
Ordering.from( Ordering.from(
@ -61,7 +64,15 @@ public class StableLimitingSorter<T>
*/ */
public void add(T element) public void add(T element)
{ {
queue.offer(new NumberedElement<>(element, count++)); queue.offer(new NumberedElement<>(element, size++));
}
/**
* Returns the number of elements currently in the sorter.
*/
public int size()
{
return Ints.checkedCast(Math.min(size, limit));
} }
/** /**

View File

@ -62,6 +62,7 @@ The following are the main parameters for Scan queries:
|columns|A String array of dimensions and metrics to scan. If left empty, all dimensions and metrics are returned.|no| |columns|A String array of dimensions and metrics to scan. If left empty, all dimensions and metrics are returned.|no|
|batchSize|The maximum number of rows buffered before being returned to the client. Default is `20480`|no| |batchSize|The maximum number of rows buffered before being returned to the client. Default is `20480`|no|
|limit|How many rows to return. If not specified, all rows will be returned.|no| |limit|How many rows to return. If not specified, all rows will be returned.|no|
|offset|Skip this many rows when returning results. Skipped rows will still need to be generated internally and then discarded, meaning that raising offsets to high values can cause queries to use additional resources.<br /><br />Together, "limit" and "offset" can be used to implement pagination. However, note that if the underlying datasource is modified in between page fetches in ways that affect overall query results, then the different pages will not necessarily align with each other.|no|
|order|The ordering of returned rows based on timestamp. "ascending", "descending", and "none" (default) are supported. Currently, "ascending" and "descending" are only supported for queries where the `__time` column is included in the `columns` field and the requirements outlined in the [time ordering](#time-ordering) section are met.|none| |order|The ordering of returned rows based on timestamp. "ascending", "descending", and "none" (default) are supported. Currently, "ascending" and "descending" are only supported for queries where the `__time` column is included in the `columns` field and the requirements outlined in the [time ordering](#time-ordering) section are met.|none|
|legacy|Return results consistent with the legacy "scan-query" contrib extension. Defaults to the value set by `druid.query.scan.legacy`, which in turn defaults to false. See [Legacy mode](#legacy-mode) for details.|no| |legacy|Return results consistent with the legacy "scan-query" contrib extension. Defaults to the value set by `druid.query.scan.legacy`, which in turn defaults to false. See [Legacy mode](#legacy-mode) for details.|no|
|context|An additional JSON Object which can be used to specify certain flags (see the `query context properties` section below).|no| |context|An additional JSON Object which can be used to specify certain flags (see the `query context properties` section below).|no|

View File

@ -25,5 +25,3 @@ sidebar_label: "Select"
Older versions of Apache Druid included a Select query type. Since Druid 0.17.0, it has been removed and replaced by the [Scan query](../querying/scan-query.md), which offers improved memory usage and performance. This solves issues that users had with Select queries causing Druid to run out of memory or slow down. Older versions of Apache Druid included a Select query type. Since Druid 0.17.0, it has been removed and replaced by the [Scan query](../querying/scan-query.md), which offers improved memory usage and performance. This solves issues that users had with Select queries causing Druid to run out of memory or slow down.
The Scan query has a different syntax, but supports many of the features of the Select query, including time ordering and limiting. Scan does not include the Select query's pagination feature; however, in many cases pagination is unnecessary with Scan due to its ability to return a virtually unlimited number of results in one call.

View File

@ -269,6 +269,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
null, null,
0, 0,
0, 0,
0,
null, null,
null, null,
columns, columns,

View File

@ -63,6 +63,7 @@ import java.util.Set;
import java.util.UUID; import java.util.UUID;
/** /**
*
*/ */
public class Druids public class Druids
{ {
@ -789,6 +790,7 @@ public class Druids
private Map<String, Object> context; private Map<String, Object> context;
private ScanQuery.ResultFormat resultFormat; private ScanQuery.ResultFormat resultFormat;
private int batchSize; private int batchSize;
private long offset;
private long limit; private long limit;
private DimFilter dimFilter; private DimFilter dimFilter;
private List<String> columns; private List<String> columns;
@ -803,6 +805,7 @@ public class Druids
context = null; context = null;
resultFormat = null; resultFormat = null;
batchSize = 0; batchSize = 0;
offset = 0;
limit = 0; limit = 0;
dimFilter = null; dimFilter = null;
columns = new ArrayList<>(); columns = new ArrayList<>();
@ -818,6 +821,7 @@ public class Druids
virtualColumns, virtualColumns,
resultFormat, resultFormat,
batchSize, batchSize,
offset,
limit, limit,
order, order,
dimFilter, dimFilter,
@ -835,6 +839,7 @@ public class Druids
.virtualColumns(query.getVirtualColumns()) .virtualColumns(query.getVirtualColumns())
.resultFormat(query.getResultFormat()) .resultFormat(query.getResultFormat())
.batchSize(query.getBatchSize()) .batchSize(query.getBatchSize())
.offset(query.getScanRowsOffset())
.limit(query.getScanRowsLimit()) .limit(query.getScanRowsLimit())
.filters(query.getFilter()) .filters(query.getFilter())
.columns(query.getColumns()) .columns(query.getColumns())
@ -890,6 +895,12 @@ public class Druids
return this; return this;
} }
public ScanQueryBuilder offset(long o)
{
offset = o;
return this;
}
public ScanQueryBuilder limit(long l) public ScanQueryBuilder limit(long l)
{ {
limit = l; limit = l;

View File

@ -21,6 +21,7 @@ package org.apache.druid.query.scan;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue; import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -37,6 +38,7 @@ import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnHolder;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -110,7 +112,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
private final VirtualColumns virtualColumns; private final VirtualColumns virtualColumns;
private final ResultFormat resultFormat; private final ResultFormat resultFormat;
private final int batchSize; private final int batchSize;
@JsonProperty("limit") private final long scanRowsOffset;
private final long scanRowsLimit; private final long scanRowsLimit;
private final DimFilter dimFilter; private final DimFilter dimFilter;
private final List<String> columns; private final List<String> columns;
@ -126,6 +128,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
@JsonProperty("virtualColumns") VirtualColumns virtualColumns, @JsonProperty("virtualColumns") VirtualColumns virtualColumns,
@JsonProperty("resultFormat") ResultFormat resultFormat, @JsonProperty("resultFormat") ResultFormat resultFormat,
@JsonProperty("batchSize") int batchSize, @JsonProperty("batchSize") int batchSize,
@JsonProperty("offset") long scanRowsOffset,
@JsonProperty("limit") long scanRowsLimit, @JsonProperty("limit") long scanRowsLimit,
@JsonProperty("order") Order order, @JsonProperty("order") Order order,
@JsonProperty("filter") DimFilter dimFilter, @JsonProperty("filter") DimFilter dimFilter,
@ -142,6 +145,11 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
this.batchSize > 0, this.batchSize > 0,
"batchSize must be greater than 0" "batchSize must be greater than 0"
); );
this.scanRowsOffset = scanRowsOffset;
Preconditions.checkArgument(
this.scanRowsOffset >= 0,
"offset must be greater than or equal to 0"
);
this.scanRowsLimit = (scanRowsLimit == 0) ? Long.MAX_VALUE : scanRowsLimit; this.scanRowsLimit = (scanRowsLimit == 0) ? Long.MAX_VALUE : scanRowsLimit;
Preconditions.checkArgument( Preconditions.checkArgument(
this.scanRowsLimit > 0, this.scanRowsLimit > 0,
@ -202,12 +210,38 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
return batchSize; return batchSize;
} }
@JsonProperty /**
* Offset for this query; behaves like SQL "OFFSET". Zero means no offset. Negative values are invalid.
*/
@JsonProperty("offset")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public long getScanRowsOffset()
{
return scanRowsOffset;
}
/**
* Limit for this query; behaves like SQL "LIMIT". Will always be positive. {@link Long#MAX_VALUE} is used in
* situations where the user wants an effectively unlimited resultset.
*/
@JsonProperty("limit")
@JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = ScanRowsLimitJsonIncludeFilter.class)
public long getScanRowsLimit() public long getScanRowsLimit()
{ {
return scanRowsLimit; return scanRowsLimit;
} }
/**
* Returns whether this query is limited or not. Because {@link Long#MAX_VALUE} is used to signify unlimitedness,
* this is equivalent to {@code getScanRowsLimit() != Long.Max_VALUE}.
*
* @see #getScanRowsLimit()
*/
public boolean isLimited()
{
return scanRowsLimit != Long.MAX_VALUE;
}
@JsonProperty @JsonProperty
public Order getOrder() public Order getOrder()
{ {
@ -268,7 +302,23 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
if (order == Order.NONE) { if (order == Order.NONE) {
return Ordering.natural(); return Ordering.natural();
} }
return Ordering.from(new ScanResultValueTimestampComparator(this)).reverse(); return Ordering.from(
new ScanResultValueTimestampComparator(this).thenComparing(
order == Order.ASCENDING
? Comparator.naturalOrder()
: Comparator.<ScanResultValue>naturalOrder().reversed()
)
);
}
public ScanQuery withOffset(final long newOffset)
{
return Druids.ScanQueryBuilder.copy(this).offset(newOffset).build();
}
public ScanQuery withLimit(final long newLimit)
{
return Druids.ScanQueryBuilder.copy(this).limit(newLimit).build();
} }
public ScanQuery withNonNullLegacy(final ScanQueryConfig scanQueryConfig) public ScanQuery withNonNullLegacy(final ScanQueryConfig scanQueryConfig)
@ -295,7 +345,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
} }
@Override @Override
public boolean equals(final Object o) public boolean equals(Object o)
{ {
if (this == o) { if (this == o) {
return true; return true;
@ -308,6 +358,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
} }
final ScanQuery scanQuery = (ScanQuery) o; final ScanQuery scanQuery = (ScanQuery) o;
return batchSize == scanQuery.batchSize && return batchSize == scanQuery.batchSize &&
scanRowsOffset == scanQuery.scanRowsOffset &&
scanRowsLimit == scanQuery.scanRowsLimit && scanRowsLimit == scanQuery.scanRowsLimit &&
Objects.equals(legacy, scanQuery.legacy) && Objects.equals(legacy, scanQuery.legacy) &&
Objects.equals(virtualColumns, scanQuery.virtualColumns) && Objects.equals(virtualColumns, scanQuery.virtualColumns) &&
@ -319,8 +370,17 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
@Override @Override
public int hashCode() public int hashCode()
{ {
return Objects.hash(super.hashCode(), virtualColumns, resultFormat, batchSize, return Objects.hash(
scanRowsLimit, dimFilter, columns, legacy); super.hashCode(),
virtualColumns,
resultFormat,
batchSize,
scanRowsOffset,
scanRowsLimit,
dimFilter,
columns,
legacy
);
} }
@Override @Override
@ -332,10 +392,35 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
", virtualColumns=" + getVirtualColumns() + ", virtualColumns=" + getVirtualColumns() +
", resultFormat='" + resultFormat + '\'' + ", resultFormat='" + resultFormat + '\'' +
", batchSize=" + batchSize + ", batchSize=" + batchSize +
", scanRowsLimit=" + scanRowsLimit + ", offset=" + scanRowsOffset +
", limit=" + scanRowsLimit +
", dimFilter=" + dimFilter + ", dimFilter=" + dimFilter +
", columns=" + columns + ", columns=" + columns +
", legacy=" + legacy + ", legacy=" + legacy +
'}'; '}';
} }
/**
* {@link JsonInclude} filter for {@link #getScanRowsLimit()}.
*
* This API works by "creative" use of equals. It requires warnings to be suppressed and also requires spotbugs
* exclusions (see spotbugs-exclude.xml).
*/
@SuppressWarnings({"EqualsAndHashcode"})
static class ScanRowsLimitJsonIncludeFilter // lgtm [java/inconsistent-equals-and-hashcode]
{
@Override
public boolean equals(Object obj)
{
if (obj == null) {
return false;
}
if (obj.getClass() == this.getClass()) {
return true;
}
return obj instanceof Long && (long) obj == Long.MAX_VALUE;
}
}
} }

View File

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.scan;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.guava.DelegatingYieldingAccumulator;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.apache.druid.java.util.common.guava.YieldingSequenceBase;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
/**
* A Sequence that wraps the results of a ScanQuery and skips a given number of rows. It is used to implement
* the "offset" feature.
*/
public class ScanQueryOffsetSequence extends YieldingSequenceBase<ScanResultValue>
{
private final Sequence<ScanResultValue> baseSequence;
private final long skip;
public ScanQueryOffsetSequence(Sequence<ScanResultValue> baseSequence, long skip)
{
this.baseSequence = baseSequence;
this.skip = skip;
if (skip < 1) {
throw new IAE("'skip' must be greater than zero");
}
}
@Override
public <OutType> Yielder<OutType> toYielder(
final OutType initValue,
final YieldingAccumulator<OutType, ScanResultValue> accumulator
)
{
final SkippingYieldingAccumulator<OutType> skippingAccumulator = new SkippingYieldingAccumulator<>(accumulator);
return wrapYielder(baseSequence.toYielder(initValue, skippingAccumulator), skippingAccumulator);
}
private <OutType> Yielder<OutType> wrapYielder(
final Yielder<OutType> yielder,
final SkippingYieldingAccumulator<OutType> accumulator
)
{
return new Yielder<OutType>()
{
@Override
public OutType get()
{
return yielder.get();
}
@Override
public Yielder<OutType> next(OutType initValue)
{
return wrapYielder(yielder.next(initValue), accumulator);
}
@Override
public boolean isDone()
{
return yielder.isDone();
}
@Override
public void close() throws IOException
{
yielder.close();
}
};
}
private class SkippingYieldingAccumulator<OutType> extends DelegatingYieldingAccumulator<OutType, ScanResultValue>
{
private long skipped = 0;
public SkippingYieldingAccumulator(final YieldingAccumulator<OutType, ScanResultValue> accumulator)
{
super(accumulator);
}
@Override
public OutType accumulate(OutType accumulated, ScanResultValue result)
{
if (skipped < skip) {
final long toSkip = skip - skipped;
final List<?> rows = (List) result.getEvents();
if (toSkip >= rows.size()) {
// Skip everything.
skipped += rows.size();
return accumulated;
} else {
// Skip partially.
final List<?> newEvents = rows.stream().skip(toSkip).collect(Collectors.toList());
skipped += toSkip;
return super.accumulate(
accumulated,
new ScanResultValue(result.getSegmentId(), result.getColumns(), newEvents)
);
}
} else {
return super.accumulate(accumulated, result);
}
}
}
}

View File

@ -33,7 +33,6 @@ import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.GenericQueryMetricsFactory; import org.apache.druid.query.GenericQueryMetricsFactory;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryMetrics; import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.aggregation.MetricManipulationFn; import org.apache.druid.query.aggregation.MetricManipulationFn;
@ -69,19 +68,41 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
return (queryPlus, responseContext) -> { return (queryPlus, responseContext) -> {
// Ensure "legacy" is a non-null value, such that all other nodes this query is forwarded to will treat it // Ensure "legacy" is a non-null value, such that all other nodes this query is forwarded to will treat it
// the same way, even if they have different default legacy values. // the same way, even if they have different default legacy values.
final ScanQuery scanQuery = ((ScanQuery) (queryPlus.getQuery())) //
.withNonNullLegacy(scanQueryConfig); // Also, remove "offset" and add it to the "limit" (we won't push the offset down, just apply it here, at the
final QueryPlus<ScanResultValue> queryPlusWithNonNullLegacy = queryPlus.withQuery(scanQuery); // merge at the top of the stack).
if (scanQuery.getScanRowsLimit() == Long.MAX_VALUE) { final ScanQuery originalQuery = ((ScanQuery) (queryPlus.getQuery()));
return runner.run(queryPlusWithNonNullLegacy, responseContext);
final long newLimit;
if (!originalQuery.isLimited()) {
// Unlimited stays unlimited.
newLimit = Long.MAX_VALUE;
} else if (originalQuery.getScanRowsLimit() > Long.MAX_VALUE - originalQuery.getScanRowsOffset()) {
throw new ISE(
"Cannot apply limit[%d] with offset[%d] due to overflow",
originalQuery.getScanRowsLimit(),
originalQuery.getScanRowsOffset()
);
} else {
newLimit = originalQuery.getScanRowsLimit() + originalQuery.getScanRowsOffset();
} }
return new BaseSequence<>(
final ScanQuery queryToRun = originalQuery.withNonNullLegacy(scanQueryConfig)
.withOffset(0)
.withLimit(newLimit);
final Sequence<ScanResultValue> results;
if (!queryToRun.isLimited()) {
results = runner.run(queryPlus.withQuery(queryToRun), responseContext);
} else {
results = new BaseSequence<>(
new BaseSequence.IteratorMaker<ScanResultValue, ScanQueryLimitRowIterator>() new BaseSequence.IteratorMaker<ScanResultValue, ScanQueryLimitRowIterator>()
{ {
@Override @Override
public ScanQueryLimitRowIterator make() public ScanQueryLimitRowIterator make()
{ {
return new ScanQueryLimitRowIterator(runner, queryPlusWithNonNullLegacy, responseContext); return new ScanQueryLimitRowIterator(runner, queryPlus.withQuery(queryToRun), responseContext);
} }
@Override @Override
@ -90,6 +111,13 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
CloseQuietly.close(iterFromMake); CloseQuietly.close(iterFromMake);
} }
}); });
}
if (originalQuery.getScanRowsOffset() > 0) {
return new ScanQueryOffsetSequence(results, originalQuery.getScanRowsOffset());
} else {
return results;
}
}; };
} }

View File

@ -20,9 +20,10 @@
package org.apache.druid.query.scan; package org.apache.druid.query.scan;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.collections.StableLimitingSorter;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Pair;
@ -30,7 +31,7 @@ import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.YieldingAccumulator; import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryPlus;
@ -47,14 +48,11 @@ import org.apache.druid.segment.Segment;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.Deque;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -124,7 +122,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
if (query.getScanRowsLimit() <= maxRowsQueuedForOrdering) { if (query.getScanRowsLimit() <= maxRowsQueuedForOrdering) {
// Use priority queue strategy // Use priority queue strategy
try { try {
return priorityQueueSortAndLimit( return stableLimitingSort(
Sequences.concat(Sequences.map( Sequences.concat(Sequences.map(
Sequences.simple(queryRunnersOrdered), Sequences.simple(queryRunnersOrdered),
input -> input.run(queryPlus, responseContext) input -> input.run(queryPlus, responseContext)
@ -204,14 +202,18 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
}; };
} }
/**
* Returns a sorted and limited copy of the provided {@param inputSequence}. Materializes the full sequence
* in memory before returning it. The amount of memory use is limited by the limit of the {@param scanQuery}.
*/
@VisibleForTesting @VisibleForTesting
Sequence<ScanResultValue> priorityQueueSortAndLimit( Sequence<ScanResultValue> stableLimitingSort(
Sequence<ScanResultValue> inputSequence, Sequence<ScanResultValue> inputSequence,
ScanQuery scanQuery, ScanQuery scanQuery,
List<Interval> intervalsOrdered List<Interval> intervalsOrdered
) throws IOException ) throws IOException
{ {
Comparator<ScanResultValue> priorityQComparator = new ScanResultValueTimestampComparator(scanQuery); Comparator<ScanResultValue> comparator = scanQuery.getResultOrdering();
if (scanQuery.getScanRowsLimit() > Integer.MAX_VALUE) { if (scanQuery.getScanRowsLimit() > Integer.MAX_VALUE) {
throw new UOE( throw new UOE(
@ -224,20 +226,9 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
// only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE) // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
int limit = Math.toIntExact(scanQuery.getScanRowsLimit()); int limit = Math.toIntExact(scanQuery.getScanRowsLimit());
PriorityQueue<ScanResultValue> q = new PriorityQueue<>(limit, priorityQComparator); final StableLimitingSorter<ScanResultValue> sorter = new StableLimitingSorter<>(comparator, limit);
Yielder<ScanResultValue> yielder = inputSequence.toYielder( Yielder<ScanResultValue> yielder = Yielders.each(inputSequence);
null,
new YieldingAccumulator<ScanResultValue, ScanResultValue>()
{
@Override
public ScanResultValue accumulate(ScanResultValue accumulated, ScanResultValue in)
{
yield();
return in;
}
}
);
try { try {
boolean doneScanning = yielder.isDone(); boolean doneScanning = yielder.isDone();
@ -251,10 +242,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
numRowsScanned++; numRowsScanned++;
// Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list // Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list
// needs to be preserved for queries using the compactedList result format // needs to be preserved for queries using the compactedList result format
q.offer(srv); sorter.add(srv);
if (q.size() > limit) {
q.poll();
}
// Finish scanning the interval containing the limit row // Finish scanning the interval containing the limit row
if (numRowsScanned > limit && finalInterval == null) { if (numRowsScanned > limit && finalInterval == null) {
@ -265,7 +253,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
} }
} }
if (finalInterval == null) { if (finalInterval == null) {
throw new ISE("WTH??? Row came from an unscanned interval?"); throw new ISE("Row unexpectedly came from an unscanned interval");
} }
} }
} }
@ -274,13 +262,9 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
(finalInterval != null && (finalInterval != null &&
!finalInterval.contains(next.getFirstEventTimestamp(scanQuery.getResultFormat()))); !finalInterval.contains(next.getFirstEventTimestamp(scanQuery.getResultFormat())));
} }
// Need to convert to a Deque because Priority Queue's iterator doesn't guarantee that the sorted order
// will be maintained. Deque was chosen over list because its addFirst is O(1). final List<ScanResultValue> sortedElements = new ArrayList<>(sorter.size());
final Deque<ScanResultValue> sortedElements = new ArrayDeque<>(q.size()); Iterators.addAll(sortedElements, sorter.drain());
while (q.size() != 0) {
// addFirst is used since PriorityQueue#poll() dequeues the low-priority (timestamp-wise) events first.
sortedElements.addFirst(q.poll());
}
return Sequences.simple(sortedElements); return Sequences.simple(sortedElements);
} }
finally { finally {
@ -343,9 +327,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
) )
).flatMerge( ).flatMerge(
seq -> seq, seq -> seq,
Ordering.from(new ScanResultValueTimestampComparator( queryPlus.getQuery().getResultOrdering()
(ScanQuery) queryPlus.getQuery()
)).reverse()
) )
) )
); );

View File

@ -45,7 +45,7 @@ public class ScanResultValueTimestampComparator implements Comparator<ScanResult
int comparison = Longs.compare( int comparison = Longs.compare(
o1.getFirstEventTimestamp(scanQuery.getResultFormat()), o1.getFirstEventTimestamp(scanQuery.getResultFormat()),
o2.getFirstEventTimestamp(scanQuery.getResultFormat())); o2.getFirstEventTimestamp(scanQuery.getResultFormat()));
if (scanQuery.getOrder().equals(ScanQuery.Order.DESCENDING)) { if (scanQuery.getOrder().equals(ScanQuery.Order.ASCENDING)) {
return comparison; return comparison;
} }
return comparison * -1; return comparison * -1;

View File

@ -37,6 +37,7 @@ import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -130,6 +131,12 @@ public class RowBasedStorageAdapter<RowType> implements StorageAdapter
@Override @Override
public int getNumRows() public int getNumRows()
{ {
if (rowIterable instanceof Collection) {
return ((Collection<RowType>) rowIterable).size();
}
// getNumRows is only used by tests and by segmentMetadataQuery (which would be odd to call on inline datasources)
// so no big deal if it doesn't always work.
throw new UnsupportedOperationException("Cannot retrieve number of rows"); throw new UnsupportedOperationException("Cannot retrieve number of rows");
} }

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
import com.google.common.io.CharSource; import com.google.common.io.CharSource;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.druid.common.config.NullHandlingTest;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
@ -38,7 +39,6 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerTestHelper; import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.TableDataSource; import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.spec.LegacySegmentSpec; import org.apache.druid.query.spec.LegacySegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.IncrementalIndexSegment; import org.apache.druid.segment.IncrementalIndexSegment;
@ -65,7 +65,7 @@ import java.util.List;
* *
*/ */
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class MultiSegmentScanQueryTest public class MultiSegmentScanQueryTest extends NullHandlingTest
{ {
private static final ScanQueryQueryToolChest TOOL_CHEST = new ScanQueryQueryToolChest( private static final ScanQueryQueryToolChest TOOL_CHEST = new ScanQueryQueryToolChest(
new ScanQueryConfig(), new ScanQueryConfig(),
@ -164,21 +164,24 @@ public class MultiSegmentScanQueryTest
IOUtils.closeQuietly(segment1); IOUtils.closeQuietly(segment1);
} }
@Parameterized.Parameters(name = "limit={0},batchSize={1}") @Parameterized.Parameters(name = "limit={0},offset={1},batchSize={2}")
public static Iterable<Object[]> constructorFeeder() public static Iterable<Object[]> constructorFeeder()
{ {
return QueryRunnerTestHelper.cartesian( return QueryRunnerTestHelper.cartesian(
Arrays.asList(0, 1, 3, 7, 10, 20, 1000), Arrays.asList(0, 1, 3, 7, 10, 20, 1000),
Arrays.asList(0, 1, 3, 5, 7, 10, 20, 200, 1000),
Arrays.asList(0, 1, 3, 6, 7, 10, 123, 2000) Arrays.asList(0, 1, 3, 6, 7, 10, 123, 2000)
); );
} }
private final int limit; private final int limit;
private final int offset;
private final int batchSize; private final int batchSize;
public MultiSegmentScanQueryTest(int limit, int batchSize) public MultiSegmentScanQueryTest(int limit, int offset, int batchSize)
{ {
this.limit = limit; this.limit = limit;
this.offset = offset;
this.batchSize = batchSize; this.batchSize = batchSize;
} }
@ -190,11 +193,12 @@ public class MultiSegmentScanQueryTest
.batchSize(batchSize) .batchSize(batchSize)
.columns(Collections.emptyList()) .columns(Collections.emptyList())
.legacy(false) .legacy(false)
.limit(limit); .limit(limit)
.offset(offset);
} }
@Test @Test
public void testMergeRunnersWithLimit() public void testMergeRunnersWithLimitAndOffset()
{ {
ScanQuery query = newBuilder().build(); ScanQuery query = newBuilder().build();
List<ScanResultValue> results = FACTORY List<ScanResultValue> results = FACTORY
@ -216,17 +220,10 @@ public class MultiSegmentScanQueryTest
} }
@Test @Test
public void testMergeResultsWithLimit() public void testMergeResultsWithLimitAndOffset()
{ {
QueryRunner<ScanResultValue> runner = TOOL_CHEST.mergeResults( QueryRunner<ScanResultValue> runner = TOOL_CHEST.mergeResults(
new QueryRunner<ScanResultValue>() (queryPlus, responseContext) -> {
{
@Override
public Sequence<ScanResultValue> run(
QueryPlus<ScanResultValue> queryPlus,
ResponseContext responseContext
)
{
// simulate results back from 2 historicals // simulate results back from 2 historicals
List<Sequence<ScanResultValue>> sequences = Lists.newArrayListWithExpectedSize(2); List<Sequence<ScanResultValue>> sequences = Lists.newArrayListWithExpectedSize(2);
sequences.add(FACTORY.createRunner(segment0).run(queryPlus)); sequences.add(FACTORY.createRunner(segment0).run(queryPlus));
@ -236,7 +233,6 @@ public class MultiSegmentScanQueryTest
Sequences.simple(sequences) Sequences.simple(sequences)
); );
} }
}
); );
ScanQuery query = newBuilder().build(); ScanQuery query = newBuilder().build();
List<ScanResultValue> results = runner.run(QueryPlus.wrap(query)).toList(); List<ScanResultValue> results = runner.run(QueryPlus.wrap(query)).toList();
@ -246,7 +242,12 @@ public class MultiSegmentScanQueryTest
} }
Assert.assertEquals( Assert.assertEquals(
totalCount, totalCount,
limit != 0 ? Math.min(limit, V_0112.length + V_0113.length) : V_0112.length + V_0113.length Math.max(
0,
limit != 0
? Math.min(limit, V_0112.length + V_0113.length - offset)
: V_0112.length + V_0113.length - offset
)
); );
} }
} }

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.scan;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.guava.Sequences;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
public class ScanQueryOffsetSequenceTest
{
private final List<List<Integer>> rowList1 = ImmutableList.of(
ImmutableList.of(1, 2),
ImmutableList.of(3, 4),
ImmutableList.of(5, 6)
);
private final List<List<Integer>> rowList2 = ImmutableList.of(
ImmutableList.of(7, 8),
ImmutableList.of(9, 10),
ImmutableList.of(11, 12)
);
@Test
public void testSkip()
{
final List<ScanResultValue> unskipped = makeExpectedResults(0);
for (int skip = 1; skip <= rowList1.size() + rowList2.size() + 1; skip++) {
final List<ScanResultValue> expected = makeExpectedResults(skip);
final List<ScanResultValue> resultsAfterSkip = new ScanQueryOffsetSequence(
Sequences.simple(unskipped),
skip
).toList();
Assert.assertEquals("skip = " + skip, expected, resultsAfterSkip);
}
}
/**
* Return a list of 0, 1, or 2 {@link ScanResultValue} based on the "skip" parameter. Rows are taken from
* {@link #rowList1} and {@link #rowList2}.
*/
private List<ScanResultValue> makeExpectedResults(final int skip)
{
final List<ScanResultValue> expected;
if (skip < rowList1.size()) {
// Somewhere inside the first ScanResultValue.
expected = ImmutableList.of(
new ScanResultValue(
"1",
ImmutableList.of("a", "b"),
rowList1.subList(skip, rowList1.size())
),
new ScanResultValue(
"2",
ImmutableList.of("b", "c"),
rowList2
)
);
} else if (skip < rowList1.size() + rowList2.size()) {
// Somewhere inside the second ScanResultValue.
expected = ImmutableList.of(
new ScanResultValue(
"2",
ImmutableList.of("b", "c"),
rowList2.subList(skip - rowList1.size(), rowList2.size())
)
);
} else {
// Past the second ScanResultValue.
expected = ImmutableList.of();
}
return expected;
}
}

View File

@ -0,0 +1,409 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.scan;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Sets;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.MergeSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.segment.RowAdapter;
import org.apache.druid.segment.RowBasedSegment;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* Tests the order in which Scan query results come back.
*
* Ensures that we have run-to-run stability of result order, which is important for offset-based pagination.
*/
@RunWith(Parameterized.class)
public class ScanQueryResultOrderingTest
{
private static final String DATASOURCE = "datasource";
private static final String ID_COLUMN = "id";
private static final RowAdapter<Object[]> ROW_ADAPTER = new RowAdapter<Object[]>()
{
@Override
public ToLongFunction<Object[]> timestampFunction()
{
return row -> ((DateTime) row[0]).getMillis();
}
@Override
public Function<Object[], Object> columnFunction(String columnName)
{
if (ID_COLUMN.equals(columnName)) {
return row -> row[1];
} else if (ColumnHolder.TIME_COLUMN_NAME.equals(columnName)) {
return timestampFunction()::applyAsLong;
} else {
return row -> null;
}
}
};
private static final RowSignature ROW_SIGNATURE = RowSignature.builder()
.addTimeColumn()
.add(ID_COLUMN, ValueType.LONG)
.build();
private static final List<RowBasedSegment<Object[]>> SEGMENTS = ImmutableList.of(
new RowBasedSegment<>(
SegmentId.of(DATASOURCE, Intervals.of("2000-01-01/P1D"), "1", 0),
ImmutableList.of(
new Object[]{DateTimes.of("2000T01"), 101},
new Object[]{DateTimes.of("2000T01"), 80},
new Object[]{DateTimes.of("2000T01"), 232},
new Object[]{DateTimes.of("2000T01"), 12},
new Object[]{DateTimes.of("2000T02"), 808},
new Object[]{DateTimes.of("2000T02"), 411},
new Object[]{DateTimes.of("2000T02"), 383},
new Object[]{DateTimes.of("2000T05"), 22}
),
ROW_ADAPTER,
ROW_SIGNATURE
),
new RowBasedSegment<>(
SegmentId.of(DATASOURCE, Intervals.of("2000-01-01/P1D"), "1", 1),
ImmutableList.of(
new Object[]{DateTimes.of("2000T01"), 333},
new Object[]{DateTimes.of("2000T01"), 222},
new Object[]{DateTimes.of("2000T01"), 444},
new Object[]{DateTimes.of("2000T01"), 111},
new Object[]{DateTimes.of("2000T03"), 555},
new Object[]{DateTimes.of("2000T03"), 999},
new Object[]{DateTimes.of("2000T03"), 888},
new Object[]{DateTimes.of("2000T05"), 777}
),
ROW_ADAPTER,
ROW_SIGNATURE
),
new RowBasedSegment<>(
SegmentId.of(DATASOURCE, Intervals.of("2000-01-02/P1D"), "1", 0),
ImmutableList.of(
new Object[]{DateTimes.of("2000-01-02T00"), 7},
new Object[]{DateTimes.of("2000-01-02T02"), 9},
new Object[]{DateTimes.of("2000-01-02T03"), 8}
),
ROW_ADAPTER,
ROW_SIGNATURE
)
);
private final List<Integer> segmentToServerMap;
private final int limit;
private final int batchSize;
private final int maxRowsQueuedForOrdering;
private ScanQueryRunnerFactory queryRunnerFactory;
private List<QueryRunner<ScanResultValue>> segmentRunners;
@Parameterized.Parameters(name = "Segment-to-server map[{0}], limit[{1}], batchSize[{2}], maxRowsQueuedForOrdering[{3}]")
public static Iterable<Object[]> constructorFeeder()
{
// Set number of server equal to number of segments, then try all possible distributions of segments to servers.
final int numServers = SEGMENTS.size();
final Set<List<Integer>> segmentToServerMaps = Sets.cartesianProduct(
IntStream.range(0, SEGMENTS.size())
.mapToObj(i -> IntStream.range(0, numServers).boxed().collect(Collectors.toSet()))
.collect(Collectors.toList())
);
// Try every limit up to one past the total number of rows.
final Set<Integer> limits = new TreeSet<>();
final int totalNumRows = SEGMENTS.stream().mapToInt(s -> s.asStorageAdapter().getNumRows()).sum();
for (int i = 0; i <= totalNumRows + 1; i++) {
limits.add(i);
}
// Try various batch sizes.
final Set<Integer> batchSizes = ImmutableSortedSet.of(1, 2, 100);
final Set<Integer> maxRowsQueuedForOrderings = ImmutableSortedSet.of(1, 7, 100000);
return Sets.cartesianProduct(
segmentToServerMaps,
limits,
batchSizes,
maxRowsQueuedForOrderings
).stream().map(args -> args.toArray(new Object[0])).collect(Collectors.toList());
}
public ScanQueryResultOrderingTest(
final List<Integer> segmentToServerMap,
final int limit,
final int batchSize,
final int maxRowsQueuedForOrdering
)
{
this.segmentToServerMap = segmentToServerMap;
this.limit = limit;
this.batchSize = batchSize;
this.maxRowsQueuedForOrdering = maxRowsQueuedForOrdering;
}
@Before
public void setUp()
{
queryRunnerFactory = new ScanQueryRunnerFactory(
new ScanQueryQueryToolChest(
new ScanQueryConfig(),
new DefaultGenericQueryMetricsFactory()
),
new ScanQueryEngine(),
new ScanQueryConfig()
);
segmentRunners = SEGMENTS.stream().map(queryRunnerFactory::createRunner).collect(Collectors.toList());
}
@Test
public void testOrderNone()
{
assertResultsEquals(
Druids.newScanQueryBuilder()
.dataSource("ds")
.intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2000/P1D"))))
.columns(ColumnHolder.TIME_COLUMN_NAME, ID_COLUMN)
.order(ScanQuery.Order.NONE)
.build(),
ImmutableList.of(
101,
80,
232,
12,
808,
411,
383,
22,
333,
222,
444,
111,
555,
999,
888,
777,
7,
9,
8
)
);
}
@Test
public void testOrderTimeAscending()
{
assertResultsEquals(
Druids.newScanQueryBuilder()
.dataSource("ds")
.intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2000/P1D"))))
.columns(ColumnHolder.TIME_COLUMN_NAME, ID_COLUMN)
.order(ScanQuery.Order.ASCENDING)
.build(),
ImmutableList.of(
101,
80,
232,
12,
333,
222,
444,
111,
808,
411,
383,
555,
999,
888,
22,
777,
7,
9,
8
)
);
}
@Test
public void testOrderTimeDescending()
{
assertResultsEquals(
Druids.newScanQueryBuilder()
.dataSource("ds")
.intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2000/P1D"))))
.columns(ColumnHolder.TIME_COLUMN_NAME, ID_COLUMN)
.order(ScanQuery.Order.DESCENDING)
.build(),
ImmutableList.of(
8,
9,
7,
777,
22,
888,
999,
555,
383,
411,
808,
111,
444,
222,
333,
12,
232,
80,
101
)
);
}
private void assertResultsEquals(final ScanQuery query, final List<Integer> expectedResults)
{
final List<List<Pair<SegmentId, QueryRunner<ScanResultValue>>>> serverRunners = new ArrayList<>();
for (int i = 0; i <= segmentToServerMap.stream().max(Comparator.naturalOrder()).orElse(0); i++) {
serverRunners.add(new ArrayList<>());
}
for (int segmentNumber = 0; segmentNumber < segmentToServerMap.size(); segmentNumber++) {
final SegmentId segmentId = SEGMENTS.get(segmentNumber).getId();
final int serverNumber = segmentToServerMap.get(segmentNumber);
serverRunners.get(serverNumber).add(Pair.of(segmentId, segmentRunners.get(segmentNumber)));
}
// Simulates what the Historical servers would do.
final List<QueryRunner<ScanResultValue>> mergedServerRunners =
serverRunners.stream()
.filter(runners -> !runners.isEmpty())
.map(
runners ->
queryRunnerFactory.getToolchest().mergeResults(
new QueryRunner<ScanResultValue>()
{
@Override
public Sequence<ScanResultValue> run(
final QueryPlus<ScanResultValue> queryPlus,
final ResponseContext responseContext
)
{
return queryRunnerFactory.mergeRunners(
Execs.directExecutor(),
runners.stream().map(p -> p.rhs).collect(Collectors.toList())
).run(
queryPlus.withQuery(
queryPlus.getQuery()
.withQuerySegmentSpec(
new MultipleSpecificSegmentSpec(
runners.stream()
.map(p -> p.lhs.toDescriptor())
.collect(Collectors.toList())
)
)
),
responseContext
);
}
}
)
)
.collect(Collectors.toList());
// Simulates what the Broker would do.
final QueryRunner<ScanResultValue> brokerRunner = queryRunnerFactory.getToolchest().mergeResults(
(queryPlus, responseContext) -> {
final List<Sequence<ScanResultValue>> sequences =
mergedServerRunners.stream()
.map(runner -> runner.run(queryPlus.withoutThreadUnsafeState()))
.collect(Collectors.toList());
return new MergeSequence<>(
queryPlus.getQuery().getResultOrdering(),
Sequences.simple(sequences)
);
}
);
// Finally: run the query.
final List<Integer> results = runQuery(
(ScanQuery) Druids.ScanQueryBuilder.copy(query)
.limit(limit)
.batchSize(batchSize)
.build()
.withOverriddenContext(
ImmutableMap.of(
ScanQueryConfig.CTX_KEY_MAX_ROWS_QUEUED_FOR_ORDERING,
maxRowsQueuedForOrdering
)
),
brokerRunner
);
Assert.assertEquals(
expectedResults.stream().limit(limit == 0 ? Long.MAX_VALUE : limit).collect(Collectors.toList()),
results
);
}
private List<Integer> runQuery(final ScanQuery query, final QueryRunner<ScanResultValue> brokerRunner)
{
final List<Object[]> results = queryRunnerFactory.getToolchest().resultsAsArrays(
query,
brokerRunner.run(QueryPlus.wrap(query))
).toList();
return results.stream().mapToInt(row -> (int) row[1]).boxed().collect(Collectors.toList());
}
}

View File

@ -138,7 +138,7 @@ public class ScanQueryRunnerFactoryTest
}); });
Sequence<ScanResultValue> inputSequence = Sequences.simple(srvs); Sequence<ScanResultValue> inputSequence = Sequences.simple(srvs);
try { try {
List<ScanResultValue> output = FACTORY.priorityQueueSortAndLimit( List<ScanResultValue> output = FACTORY.stableLimitingSort(
inputSequence, inputSequence,
query, query,
ImmutableList.of(new Interval( ImmutableList.of(new Interval(

View File

@ -22,6 +22,7 @@ package org.apache.druid.query.scan;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunnerTestHelper; import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.TableDataSource; import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.spec.LegacySegmentSpec; import org.apache.druid.query.spec.LegacySegmentSpec;
@ -36,7 +37,7 @@ public class ScanQuerySpecTest
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
@Test @Test
public void testSerializationLegacyString() throws Exception public void testSerialization() throws Exception
{ {
String legacy = String legacy =
"{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"testing\"}," "{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"testing\"},"
@ -67,6 +68,7 @@ public class ScanQuerySpecTest
VirtualColumns.EMPTY, VirtualColumns.EMPTY,
ScanQuery.ResultFormat.RESULT_FORMAT_LIST, ScanQuery.ResultFormat.RESULT_FORMAT_LIST,
0, 0,
0,
3, 3,
ScanQuery.Order.NONE, ScanQuery.Order.NONE,
null, null,
@ -80,4 +82,27 @@ public class ScanQuerySpecTest
Assert.assertEquals(query, JSON_MAPPER.readValue(actual, ScanQuery.class)); Assert.assertEquals(query, JSON_MAPPER.readValue(actual, ScanQuery.class));
Assert.assertEquals(query, JSON_MAPPER.readValue(legacy, ScanQuery.class)); Assert.assertEquals(query, JSON_MAPPER.readValue(legacy, ScanQuery.class));
} }
@Test
public void testSerializationLegacyString() throws Exception
{
ScanQuery query = new ScanQuery(
new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE),
new LegacySegmentSpec(Intervals.of("2011-01-12/2011-01-14")),
VirtualColumns.EMPTY,
ScanQuery.ResultFormat.RESULT_FORMAT_LIST,
0,
1,
3,
ScanQuery.Order.NONE,
null,
Arrays.asList("market", "quality", "index"),
null,
null
);
final String serialized = JSON_MAPPER.writeValueAsString(query);
final ScanQuery deserialized = (ScanQuery) JSON_MAPPER.readValue(serialized, Query.class);
Assert.assertEquals(query, deserialized);
}
} }

View File

@ -82,7 +82,7 @@ public class ScanResultValueTimestampComparatorTest
events2 events2
); );
Assert.assertEquals(-1, comparator.compare(s1, s2)); Assert.assertEquals(1, comparator.compare(s1, s2));
} }
@Test @Test
@ -119,7 +119,7 @@ public class ScanResultValueTimestampComparatorTest
events2 events2
); );
Assert.assertEquals(1, comparator.compare(s1, s2)); Assert.assertEquals(-1, comparator.compare(s1, s2));
} }
@Test @Test
@ -154,7 +154,7 @@ public class ScanResultValueTimestampComparatorTest
events2 events2
); );
Assert.assertEquals(-1, comparator.compare(s1, s2)); Assert.assertEquals(1, comparator.compare(s1, s2));
} }
@Test @Test
@ -189,6 +189,6 @@ public class ScanResultValueTimestampComparatorTest
events2 events2
); );
Assert.assertEquals(1, comparator.compare(s1, s2)); Assert.assertEquals(-1, comparator.compare(s1, s2));
} }
} }

View File

@ -995,6 +995,7 @@ public class DruidQuery
getVirtualColumns(true), getVirtualColumns(true),
ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST, ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST,
0, 0,
0,
scanLimit, scanLimit,
order, order,
filtration.getDimFilter(), filtration.getDimFilter(),

View File

@ -1059,7 +1059,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
ImmutableList.of(), ImmutableList.of(),
ImmutableList.of( ImmutableList.of(
new Object[]{ new Object[]{
"DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"order\":\"none\",\"filter\":null,\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"dim3\",\"m1\",\"m2\",\"unique_dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{__time:LONG, cnt:LONG, dim1:STRING, dim2:STRING, dim3:STRING, m1:FLOAT, m2:DOUBLE, unique_dim1:COMPLEX}])\n" "DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"order\":\"none\",\"filter\":null,\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"dim3\",\"m1\",\"m2\",\"unique_dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{__time:LONG, cnt:LONG, dim1:STRING, dim2:STRING, dim3:STRING, m1:FLOAT, m2:DOUBLE, unique_dim1:COMPLEX}])\n"
} }
) )
); );
@ -7403,7 +7403,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
final String explanation = final String explanation =
"DruidOuterQueryRel(query=[{\"queryType\":\"timeseries\",\"dataSource\":{\"type\":\"table\",\"name\":\"__subquery__\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"descending\":false,\"virtualColumns\":[],\"filter\":null,\"granularity\":{\"type\":\"all\"},\"aggregations\":[{\"type\":\"count\",\"name\":\"a0\"}],\"postAggregations\":[],\"limit\":2147483647,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"skipEmptyBuckets\":true,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\"}}], signature=[{a0:LONG}])\n" "DruidOuterQueryRel(query=[{\"queryType\":\"timeseries\",\"dataSource\":{\"type\":\"table\",\"name\":\"__subquery__\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"descending\":false,\"virtualColumns\":[],\"filter\":null,\"granularity\":{\"type\":\"all\"},\"aggregations\":[{\"type\":\"count\",\"name\":\"a0\"}],\"postAggregations\":[],\"limit\":2147483647,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"skipEmptyBuckets\":true,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\"}}], signature=[{a0:LONG}])\n"
+ " DruidJoinQueryRel(condition=[=(SUBSTRING($3, 1, 1), $8)], joinType=[inner], query=[{\"queryType\":\"groupBy\",\"dataSource\":{\"type\":\"table\",\"name\":\"__join__\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"filter\":null,\"granularity\":{\"type\":\"all\"},\"dimensions\":[{\"type\":\"default\",\"dimension\":\"dim2\",\"outputName\":\"d0\",\"outputType\":\"STRING\"}],\"aggregations\":[],\"postAggregations\":[],\"having\":null,\"limitSpec\":{\"type\":\"NoopLimitSpec\"},\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\"},\"descending\":false}], signature=[{d0:STRING}])\n" + " DruidJoinQueryRel(condition=[=(SUBSTRING($3, 1, 1), $8)], joinType=[inner], query=[{\"queryType\":\"groupBy\",\"dataSource\":{\"type\":\"table\",\"name\":\"__join__\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"filter\":null,\"granularity\":{\"type\":\"all\"},\"dimensions\":[{\"type\":\"default\",\"dimension\":\"dim2\",\"outputName\":\"d0\",\"outputType\":\"STRING\"}],\"aggregations\":[],\"postAggregations\":[],\"having\":null,\"limitSpec\":{\"type\":\"NoopLimitSpec\"},\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\"},\"descending\":false}], signature=[{d0:STRING}])\n"
+ " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"order\":\"none\",\"filter\":null,\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"dim3\",\"m1\",\"m2\",\"unique_dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{__time:LONG, cnt:LONG, dim1:STRING, dim2:STRING, dim3:STRING, m1:FLOAT, m2:DOUBLE, unique_dim1:COMPLEX}])\n" + " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"order\":\"none\",\"filter\":null,\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"dim3\",\"m1\",\"m2\",\"unique_dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{__time:LONG, cnt:LONG, dim1:STRING, dim2:STRING, dim3:STRING, m1:FLOAT, m2:DOUBLE, unique_dim1:COMPLEX}])\n"
+ " DruidQueryRel(query=[{\"queryType\":\"groupBy\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"filter\":{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"dim1\",\"value\":null,\"extractionFn\":null}},\"granularity\":{\"type\":\"all\"},\"dimensions\":[{\"type\":\"extraction\",\"dimension\":\"dim1\",\"outputName\":\"d0\",\"outputType\":\"STRING\",\"extractionFn\":{\"type\":\"substring\",\"index\":0,\"length\":1}}],\"aggregations\":[],\"postAggregations\":[],\"having\":null,\"limitSpec\":{\"type\":\"NoopLimitSpec\"},\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\"},\"descending\":false}], signature=[{d0:STRING}])\n"; + " DruidQueryRel(query=[{\"queryType\":\"groupBy\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"filter\":{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"dim1\",\"value\":null,\"extractionFn\":null}},\"granularity\":{\"type\":\"all\"},\"dimensions\":[{\"type\":\"extraction\",\"dimension\":\"dim1\",\"outputName\":\"d0\",\"outputType\":\"STRING\",\"extractionFn\":{\"type\":\"substring\",\"index\":0,\"length\":1}}],\"aggregations\":[],\"postAggregations\":[],\"having\":null,\"limitSpec\":{\"type\":\"NoopLimitSpec\"},\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\"},\"descending\":false}], signature=[{d0:STRING}])\n";
testQuery( testQuery(