From 6cca7242de629ad07478c6792b01dc1f37dc6b40 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 13 Aug 2020 14:56:24 -0700 Subject: [PATCH] Add "offset" parameter to the Scan query. (#10233) * Add "offset" parameter to the Scan query. It works by doing the query as normal and then throwing away the first "offset" number of rows on the broker. * Fix constructor call. * Fix up JSONs. * Fix call to ScanQuery. * Doc update. * Fix javadocs. * Spotbugs, LGTM suppressions. * Javadocs. * Fix suppression. * Stabilize Scan query result order, add tests. * Update LGTM comment. * Fixup. * Test different batch sizes too. * Nicer tests. * Fix comment. --- codestyle/spotbugs-exclude.xml | 1 + .../collections/StableLimitingSorter.java | 15 +- docs/querying/scan-query.md | 1 + docs/querying/select-query.md | 2 - ...bstractMultiPhaseParallelIndexingTest.java | 1 + .../java/org/apache/druid/query/Druids.java | 11 + .../apache/druid/query/scan/ScanQuery.java | 99 ++++- .../query/scan/ScanQueryOffsetSequence.java | 128 ++++++ .../query/scan/ScanQueryQueryToolChest.java | 68 ++- .../query/scan/ScanQueryRunnerFactory.java | 54 +-- .../ScanResultValueTimestampComparator.java | 2 +- .../druid/segment/RowBasedStorageAdapter.java | 7 + .../query/scan/MultiSegmentScanQueryTest.java | 61 +-- .../scan/ScanQueryOffsetSequenceTest.java | 97 +++++ .../scan/ScanQueryResultOrderingTest.java | 409 ++++++++++++++++++ .../scan/ScanQueryRunnerFactoryTest.java | 2 +- .../druid/query/scan/ScanQuerySpecTest.java | 27 +- ...canResultValueTimestampComparatorTest.java | 8 +- .../druid/sql/calcite/rel/DruidQuery.java | 1 + .../druid/sql/calcite/CalciteQueryTest.java | 4 +- 20 files changed, 892 insertions(+), 106 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/scan/ScanQueryOffsetSequence.java create mode 100644 processing/src/test/java/org/apache/druid/query/scan/ScanQueryOffsetSequenceTest.java create mode 100644 processing/src/test/java/org/apache/druid/query/scan/ScanQueryResultOrderingTest.java diff --git a/codestyle/spotbugs-exclude.xml b/codestyle/spotbugs-exclude.xml index 973aa7a22a3..2092f31db3a 100644 --- a/codestyle/spotbugs-exclude.xml +++ b/codestyle/spotbugs-exclude.xml @@ -33,6 +33,7 @@ + diff --git a/core/src/main/java/org/apache/druid/collections/StableLimitingSorter.java b/core/src/main/java/org/apache/druid/collections/StableLimitingSorter.java index 318751c3003..76208bc05f9 100644 --- a/core/src/main/java/org/apache/druid/collections/StableLimitingSorter.java +++ b/core/src/main/java/org/apache/druid/collections/StableLimitingSorter.java @@ -22,6 +22,7 @@ package org.apache.druid.collections; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.Ordering; +import com.google.common.primitives.Ints; import java.util.Comparator; import java.util.Iterator; @@ -40,11 +41,13 @@ import java.util.Objects; public class StableLimitingSorter { private final MinMaxPriorityQueue> queue; + private final int limit; - private long count = 0; + private long size = 0; public StableLimitingSorter(final Comparator comparator, final int limit) { + this.limit = limit; this.queue = MinMaxPriorityQueue .orderedBy( Ordering.from( @@ -61,7 +64,15 @@ public class StableLimitingSorter */ public void add(T element) { - queue.offer(new NumberedElement<>(element, count++)); + queue.offer(new NumberedElement<>(element, size++)); + } + + /** + * Returns the number of elements currently in the sorter. + */ + public int size() + { + return Ints.checkedCast(Math.min(size, limit)); } /** diff --git a/docs/querying/scan-query.md b/docs/querying/scan-query.md index bd6a4da6e65..58f1324d389 100644 --- a/docs/querying/scan-query.md +++ b/docs/querying/scan-query.md @@ -62,6 +62,7 @@ The following are the main parameters for Scan queries: |columns|A String array of dimensions and metrics to scan. If left empty, all dimensions and metrics are returned.|no| |batchSize|The maximum number of rows buffered before being returned to the client. Default is `20480`|no| |limit|How many rows to return. If not specified, all rows will be returned.|no| +|offset|Skip this many rows when returning results. Skipped rows will still need to be generated internally and then discarded, meaning that raising offsets to high values can cause queries to use additional resources.

Together, "limit" and "offset" can be used to implement pagination. However, note that if the underlying datasource is modified in between page fetches in ways that affect overall query results, then the different pages will not necessarily align with each other.|no| |order|The ordering of returned rows based on timestamp. "ascending", "descending", and "none" (default) are supported. Currently, "ascending" and "descending" are only supported for queries where the `__time` column is included in the `columns` field and the requirements outlined in the [time ordering](#time-ordering) section are met.|none| |legacy|Return results consistent with the legacy "scan-query" contrib extension. Defaults to the value set by `druid.query.scan.legacy`, which in turn defaults to false. See [Legacy mode](#legacy-mode) for details.|no| |context|An additional JSON Object which can be used to specify certain flags (see the `query context properties` section below).|no| diff --git a/docs/querying/select-query.md b/docs/querying/select-query.md index be7ede132b0..734073dc531 100644 --- a/docs/querying/select-query.md +++ b/docs/querying/select-query.md @@ -25,5 +25,3 @@ sidebar_label: "Select" Older versions of Apache Druid included a Select query type. Since Druid 0.17.0, it has been removed and replaced by the [Scan query](../querying/scan-query.md), which offers improved memory usage and performance. This solves issues that users had with Select queries causing Druid to run out of memory or slow down. - -The Scan query has a different syntax, but supports many of the features of the Select query, including time ordering and limiting. Scan does not include the Select query's pagination feature; however, in many cases pagination is unnecessary with Scan due to its ability to return a virtually unlimited number of results in one call. diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java index 966a4c827cb..1499ba4b380 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java @@ -269,6 +269,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn null, 0, 0, + 0, null, null, columns, diff --git a/processing/src/main/java/org/apache/druid/query/Druids.java b/processing/src/main/java/org/apache/druid/query/Druids.java index 21db123d89e..d5e8e1a1970 100644 --- a/processing/src/main/java/org/apache/druid/query/Druids.java +++ b/processing/src/main/java/org/apache/druid/query/Druids.java @@ -63,6 +63,7 @@ import java.util.Set; import java.util.UUID; /** + * */ public class Druids { @@ -789,6 +790,7 @@ public class Druids private Map context; private ScanQuery.ResultFormat resultFormat; private int batchSize; + private long offset; private long limit; private DimFilter dimFilter; private List columns; @@ -803,6 +805,7 @@ public class Druids context = null; resultFormat = null; batchSize = 0; + offset = 0; limit = 0; dimFilter = null; columns = new ArrayList<>(); @@ -818,6 +821,7 @@ public class Druids virtualColumns, resultFormat, batchSize, + offset, limit, order, dimFilter, @@ -835,6 +839,7 @@ public class Druids .virtualColumns(query.getVirtualColumns()) .resultFormat(query.getResultFormat()) .batchSize(query.getBatchSize()) + .offset(query.getScanRowsOffset()) .limit(query.getScanRowsLimit()) .filters(query.getFilter()) .columns(query.getColumns()) @@ -890,6 +895,12 @@ public class Druids return this; } + public ScanQueryBuilder offset(long o) + { + offset = o; + return this; + } + public ScanQueryBuilder limit(long l) { limit = l; diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java index b87a1c4c4bb..347e675cea3 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java @@ -21,6 +21,7 @@ package org.apache.druid.query.scan; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonValue; import com.google.common.base.Preconditions; @@ -37,6 +38,7 @@ import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnHolder; import javax.annotation.Nullable; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -110,7 +112,7 @@ public class ScanQuery extends BaseQuery private final VirtualColumns virtualColumns; private final ResultFormat resultFormat; private final int batchSize; - @JsonProperty("limit") + private final long scanRowsOffset; private final long scanRowsLimit; private final DimFilter dimFilter; private final List columns; @@ -126,6 +128,7 @@ public class ScanQuery extends BaseQuery @JsonProperty("virtualColumns") VirtualColumns virtualColumns, @JsonProperty("resultFormat") ResultFormat resultFormat, @JsonProperty("batchSize") int batchSize, + @JsonProperty("offset") long scanRowsOffset, @JsonProperty("limit") long scanRowsLimit, @JsonProperty("order") Order order, @JsonProperty("filter") DimFilter dimFilter, @@ -142,6 +145,11 @@ public class ScanQuery extends BaseQuery this.batchSize > 0, "batchSize must be greater than 0" ); + this.scanRowsOffset = scanRowsOffset; + Preconditions.checkArgument( + this.scanRowsOffset >= 0, + "offset must be greater than or equal to 0" + ); this.scanRowsLimit = (scanRowsLimit == 0) ? Long.MAX_VALUE : scanRowsLimit; Preconditions.checkArgument( this.scanRowsLimit > 0, @@ -202,12 +210,38 @@ public class ScanQuery extends BaseQuery return batchSize; } - @JsonProperty + /** + * Offset for this query; behaves like SQL "OFFSET". Zero means no offset. Negative values are invalid. + */ + @JsonProperty("offset") + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + public long getScanRowsOffset() + { + return scanRowsOffset; + } + + /** + * Limit for this query; behaves like SQL "LIMIT". Will always be positive. {@link Long#MAX_VALUE} is used in + * situations where the user wants an effectively unlimited resultset. + */ + @JsonProperty("limit") + @JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = ScanRowsLimitJsonIncludeFilter.class) public long getScanRowsLimit() { return scanRowsLimit; } + /** + * Returns whether this query is limited or not. Because {@link Long#MAX_VALUE} is used to signify unlimitedness, + * this is equivalent to {@code getScanRowsLimit() != Long.Max_VALUE}. + * + * @see #getScanRowsLimit() + */ + public boolean isLimited() + { + return scanRowsLimit != Long.MAX_VALUE; + } + @JsonProperty public Order getOrder() { @@ -268,7 +302,23 @@ public class ScanQuery extends BaseQuery if (order == Order.NONE) { return Ordering.natural(); } - return Ordering.from(new ScanResultValueTimestampComparator(this)).reverse(); + return Ordering.from( + new ScanResultValueTimestampComparator(this).thenComparing( + order == Order.ASCENDING + ? Comparator.naturalOrder() + : Comparator.naturalOrder().reversed() + ) + ); + } + + public ScanQuery withOffset(final long newOffset) + { + return Druids.ScanQueryBuilder.copy(this).offset(newOffset).build(); + } + + public ScanQuery withLimit(final long newLimit) + { + return Druids.ScanQueryBuilder.copy(this).limit(newLimit).build(); } public ScanQuery withNonNullLegacy(final ScanQueryConfig scanQueryConfig) @@ -295,7 +345,7 @@ public class ScanQuery extends BaseQuery } @Override - public boolean equals(final Object o) + public boolean equals(Object o) { if (this == o) { return true; @@ -308,6 +358,7 @@ public class ScanQuery extends BaseQuery } final ScanQuery scanQuery = (ScanQuery) o; return batchSize == scanQuery.batchSize && + scanRowsOffset == scanQuery.scanRowsOffset && scanRowsLimit == scanQuery.scanRowsLimit && Objects.equals(legacy, scanQuery.legacy) && Objects.equals(virtualColumns, scanQuery.virtualColumns) && @@ -319,8 +370,17 @@ public class ScanQuery extends BaseQuery @Override public int hashCode() { - return Objects.hash(super.hashCode(), virtualColumns, resultFormat, batchSize, - scanRowsLimit, dimFilter, columns, legacy); + return Objects.hash( + super.hashCode(), + virtualColumns, + resultFormat, + batchSize, + scanRowsOffset, + scanRowsLimit, + dimFilter, + columns, + legacy + ); } @Override @@ -332,10 +392,35 @@ public class ScanQuery extends BaseQuery ", virtualColumns=" + getVirtualColumns() + ", resultFormat='" + resultFormat + '\'' + ", batchSize=" + batchSize + - ", scanRowsLimit=" + scanRowsLimit + + ", offset=" + scanRowsOffset + + ", limit=" + scanRowsLimit + ", dimFilter=" + dimFilter + ", columns=" + columns + ", legacy=" + legacy + '}'; } + + /** + * {@link JsonInclude} filter for {@link #getScanRowsLimit()}. + * + * This API works by "creative" use of equals. It requires warnings to be suppressed and also requires spotbugs + * exclusions (see spotbugs-exclude.xml). + */ + @SuppressWarnings({"EqualsAndHashcode"}) + static class ScanRowsLimitJsonIncludeFilter // lgtm [java/inconsistent-equals-and-hashcode] + { + @Override + public boolean equals(Object obj) + { + if (obj == null) { + return false; + } + + if (obj.getClass() == this.getClass()) { + return true; + } + + return obj instanceof Long && (long) obj == Long.MAX_VALUE; + } + } } diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryOffsetSequence.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryOffsetSequence.java new file mode 100644 index 00000000000..6685a40d973 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryOffsetSequence.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.scan; + +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.guava.DelegatingYieldingAccumulator; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.common.guava.YieldingAccumulator; +import org.apache.druid.java.util.common.guava.YieldingSequenceBase; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; + +/** + * A Sequence that wraps the results of a ScanQuery and skips a given number of rows. It is used to implement + * the "offset" feature. + */ +public class ScanQueryOffsetSequence extends YieldingSequenceBase +{ + private final Sequence baseSequence; + private final long skip; + + public ScanQueryOffsetSequence(Sequence baseSequence, long skip) + { + this.baseSequence = baseSequence; + this.skip = skip; + + if (skip < 1) { + throw new IAE("'skip' must be greater than zero"); + } + } + + @Override + public Yielder toYielder( + final OutType initValue, + final YieldingAccumulator accumulator + ) + { + final SkippingYieldingAccumulator skippingAccumulator = new SkippingYieldingAccumulator<>(accumulator); + return wrapYielder(baseSequence.toYielder(initValue, skippingAccumulator), skippingAccumulator); + } + + private Yielder wrapYielder( + final Yielder yielder, + final SkippingYieldingAccumulator accumulator + ) + { + return new Yielder() + { + @Override + public OutType get() + { + return yielder.get(); + } + + @Override + public Yielder next(OutType initValue) + { + return wrapYielder(yielder.next(initValue), accumulator); + } + + @Override + public boolean isDone() + { + return yielder.isDone(); + } + + @Override + public void close() throws IOException + { + yielder.close(); + } + }; + } + + private class SkippingYieldingAccumulator extends DelegatingYieldingAccumulator + { + private long skipped = 0; + + public SkippingYieldingAccumulator(final YieldingAccumulator accumulator) + { + super(accumulator); + } + + @Override + public OutType accumulate(OutType accumulated, ScanResultValue result) + { + if (skipped < skip) { + final long toSkip = skip - skipped; + final List rows = (List) result.getEvents(); + if (toSkip >= rows.size()) { + // Skip everything. + skipped += rows.size(); + return accumulated; + } else { + // Skip partially. + final List newEvents = rows.stream().skip(toSkip).collect(Collectors.toList()); + skipped += toSkip; + return super.accumulate( + accumulated, + new ScanResultValue(result.getSegmentId(), result.getColumns(), newEvents) + ); + } + } else { + return super.accumulate(accumulated, result); + } + } + } +} diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java index 21550e1280f..98e47f7252b 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java @@ -33,7 +33,6 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.GenericQueryMetricsFactory; import org.apache.druid.query.Query; import org.apache.druid.query.QueryMetrics; -import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.aggregation.MetricManipulationFn; @@ -69,27 +68,56 @@ public class ScanQueryQueryToolChest extends QueryToolChest { // Ensure "legacy" is a non-null value, such that all other nodes this query is forwarded to will treat it // the same way, even if they have different default legacy values. - final ScanQuery scanQuery = ((ScanQuery) (queryPlus.getQuery())) - .withNonNullLegacy(scanQueryConfig); - final QueryPlus queryPlusWithNonNullLegacy = queryPlus.withQuery(scanQuery); - if (scanQuery.getScanRowsLimit() == Long.MAX_VALUE) { - return runner.run(queryPlusWithNonNullLegacy, responseContext); - } - return new BaseSequence<>( - new BaseSequence.IteratorMaker() - { - @Override - public ScanQueryLimitRowIterator make() - { - return new ScanQueryLimitRowIterator(runner, queryPlusWithNonNullLegacy, responseContext); - } + // + // Also, remove "offset" and add it to the "limit" (we won't push the offset down, just apply it here, at the + // merge at the top of the stack). + final ScanQuery originalQuery = ((ScanQuery) (queryPlus.getQuery())); - @Override - public void cleanup(ScanQueryLimitRowIterator iterFromMake) + final long newLimit; + if (!originalQuery.isLimited()) { + // Unlimited stays unlimited. + newLimit = Long.MAX_VALUE; + } else if (originalQuery.getScanRowsLimit() > Long.MAX_VALUE - originalQuery.getScanRowsOffset()) { + throw new ISE( + "Cannot apply limit[%d] with offset[%d] due to overflow", + originalQuery.getScanRowsLimit(), + originalQuery.getScanRowsOffset() + ); + } else { + newLimit = originalQuery.getScanRowsLimit() + originalQuery.getScanRowsOffset(); + } + + final ScanQuery queryToRun = originalQuery.withNonNullLegacy(scanQueryConfig) + .withOffset(0) + .withLimit(newLimit); + + final Sequence results; + + if (!queryToRun.isLimited()) { + results = runner.run(queryPlus.withQuery(queryToRun), responseContext); + } else { + results = new BaseSequence<>( + new BaseSequence.IteratorMaker() { - CloseQuietly.close(iterFromMake); - } - }); + @Override + public ScanQueryLimitRowIterator make() + { + return new ScanQueryLimitRowIterator(runner, queryPlus.withQuery(queryToRun), responseContext); + } + + @Override + public void cleanup(ScanQueryLimitRowIterator iterFromMake) + { + CloseQuietly.close(iterFromMake); + } + }); + } + + if (originalQuery.getScanRowsOffset() > 0) { + return new ScanQueryOffsetSequence(results, originalQuery.getScanRowsOffset()); + } else { + return results; + } }; } diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java index 52066ab1f25..be164dd8f0f 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java @@ -20,9 +20,10 @@ package org.apache.druid.query.scan; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; -import com.google.common.collect.Ordering; import com.google.inject.Inject; +import org.apache.druid.collections.StableLimitingSorter; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.Pair; @@ -30,7 +31,7 @@ import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Yielder; -import org.apache.druid.java.util.common.guava.YieldingAccumulator; +import org.apache.druid.java.util.common.guava.Yielders; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryPlus; @@ -47,14 +48,11 @@ import org.apache.druid.segment.Segment; import org.joda.time.Interval; import java.io.IOException; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; -import java.util.Deque; import java.util.LinkedHashMap; import java.util.List; -import java.util.PriorityQueue; import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; @@ -124,7 +122,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory input.run(queryPlus, responseContext) @@ -204,14 +202,18 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory priorityQueueSortAndLimit( + Sequence stableLimitingSort( Sequence inputSequence, ScanQuery scanQuery, List intervalsOrdered ) throws IOException { - Comparator priorityQComparator = new ScanResultValueTimestampComparator(scanQuery); + Comparator comparator = scanQuery.getResultOrdering(); if (scanQuery.getScanRowsLimit() > Integer.MAX_VALUE) { throw new UOE( @@ -224,20 +226,9 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory q = new PriorityQueue<>(limit, priorityQComparator); + final StableLimitingSorter sorter = new StableLimitingSorter<>(comparator, limit); - Yielder yielder = inputSequence.toYielder( - null, - new YieldingAccumulator() - { - @Override - public ScanResultValue accumulate(ScanResultValue accumulated, ScanResultValue in) - { - yield(); - return in; - } - } - ); + Yielder yielder = Yielders.each(inputSequence); try { boolean doneScanning = yielder.isDone(); @@ -251,10 +242,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory limit) { - q.poll(); - } + sorter.add(srv); // Finish scanning the interval containing the limit row if (numRowsScanned > limit && finalInterval == null) { @@ -265,7 +253,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory sortedElements = new ArrayDeque<>(q.size()); - while (q.size() != 0) { - // addFirst is used since PriorityQueue#poll() dequeues the low-priority (timestamp-wise) events first. - sortedElements.addFirst(q.poll()); - } + + final List sortedElements = new ArrayList<>(sorter.size()); + Iterators.addAll(sortedElements, sorter.drain()); return Sequences.simple(sortedElements); } finally { @@ -343,9 +327,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory seq, - Ordering.from(new ScanResultValueTimestampComparator( - (ScanQuery) queryPlus.getQuery() - )).reverse() + queryPlus.getQuery().getResultOrdering() ) ) ); diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java index 69f780fca70..d23cbba2d45 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java @@ -45,7 +45,7 @@ public class ScanResultValueTimestampComparator implements Comparator implements StorageAdapter @Override public int getNumRows() { + if (rowIterable instanceof Collection) { + return ((Collection) rowIterable).size(); + } + + // getNumRows is only used by tests and by segmentMetadataQuery (which would be odd to call on inline datasources) + // so no big deal if it doesn't always work. throw new UnsupportedOperationException("Cannot retrieve number of rows"); } diff --git a/processing/src/test/java/org/apache/druid/query/scan/MultiSegmentScanQueryTest.java b/processing/src/test/java/org/apache/druid/query/scan/MultiSegmentScanQueryTest.java index 3b5a9252d0f..a579df09adf 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/MultiSegmentScanQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/MultiSegmentScanQueryTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.Lists; import com.google.common.io.CharSource; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; +import org.apache.druid.common.config.NullHandlingTest; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.concurrent.Execs; @@ -38,7 +39,6 @@ import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryRunnerTestHelper; import org.apache.druid.query.TableDataSource; -import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.spec.LegacySegmentSpec; import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.segment.IncrementalIndexSegment; @@ -65,7 +65,7 @@ import java.util.List; * */ @RunWith(Parameterized.class) -public class MultiSegmentScanQueryTest +public class MultiSegmentScanQueryTest extends NullHandlingTest { private static final ScanQueryQueryToolChest TOOL_CHEST = new ScanQueryQueryToolChest( new ScanQueryConfig(), @@ -164,37 +164,41 @@ public class MultiSegmentScanQueryTest IOUtils.closeQuietly(segment1); } - @Parameterized.Parameters(name = "limit={0},batchSize={1}") + @Parameterized.Parameters(name = "limit={0},offset={1},batchSize={2}") public static Iterable constructorFeeder() { return QueryRunnerTestHelper.cartesian( Arrays.asList(0, 1, 3, 7, 10, 20, 1000), + Arrays.asList(0, 1, 3, 5, 7, 10, 20, 200, 1000), Arrays.asList(0, 1, 3, 6, 7, 10, 123, 2000) ); } private final int limit; + private final int offset; private final int batchSize; - public MultiSegmentScanQueryTest(int limit, int batchSize) + public MultiSegmentScanQueryTest(int limit, int offset, int batchSize) { this.limit = limit; + this.offset = offset; this.batchSize = batchSize; } private Druids.ScanQueryBuilder newBuilder() { return Druids.newScanQueryBuilder() - .dataSource(new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE)) - .intervals(I_0112_0114_SPEC) - .batchSize(batchSize) - .columns(Collections.emptyList()) - .legacy(false) - .limit(limit); + .dataSource(new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE)) + .intervals(I_0112_0114_SPEC) + .batchSize(batchSize) + .columns(Collections.emptyList()) + .legacy(false) + .limit(limit) + .offset(offset); } @Test - public void testMergeRunnersWithLimit() + public void testMergeRunnersWithLimitAndOffset() { ScanQuery query = newBuilder().build(); List results = FACTORY @@ -216,26 +220,18 @@ public class MultiSegmentScanQueryTest } @Test - public void testMergeResultsWithLimit() + public void testMergeResultsWithLimitAndOffset() { QueryRunner runner = TOOL_CHEST.mergeResults( - new QueryRunner() - { - @Override - public Sequence run( - QueryPlus queryPlus, - ResponseContext responseContext - ) - { - // simulate results back from 2 historicals - List> sequences = Lists.newArrayListWithExpectedSize(2); - sequences.add(FACTORY.createRunner(segment0).run(queryPlus)); - sequences.add(FACTORY.createRunner(segment1).run(queryPlus)); - return new MergeSequence<>( - queryPlus.getQuery().getResultOrdering(), - Sequences.simple(sequences) - ); - } + (queryPlus, responseContext) -> { + // simulate results back from 2 historicals + List> sequences = Lists.newArrayListWithExpectedSize(2); + sequences.add(FACTORY.createRunner(segment0).run(queryPlus)); + sequences.add(FACTORY.createRunner(segment1).run(queryPlus)); + return new MergeSequence<>( + queryPlus.getQuery().getResultOrdering(), + Sequences.simple(sequences) + ); } ); ScanQuery query = newBuilder().build(); @@ -246,7 +242,12 @@ public class MultiSegmentScanQueryTest } Assert.assertEquals( totalCount, - limit != 0 ? Math.min(limit, V_0112.length + V_0113.length) : V_0112.length + V_0113.length + Math.max( + 0, + limit != 0 + ? Math.min(limit, V_0112.length + V_0113.length - offset) + : V_0112.length + V_0113.length - offset + ) ); } } diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryOffsetSequenceTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryOffsetSequenceTest.java new file mode 100644 index 00000000000..4bededd0cf8 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryOffsetSequenceTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.scan; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.guava.Sequences; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class ScanQueryOffsetSequenceTest +{ + private final List> rowList1 = ImmutableList.of( + ImmutableList.of(1, 2), + ImmutableList.of(3, 4), + ImmutableList.of(5, 6) + ); + + private final List> rowList2 = ImmutableList.of( + ImmutableList.of(7, 8), + ImmutableList.of(9, 10), + ImmutableList.of(11, 12) + ); + + @Test + public void testSkip() + { + final List unskipped = makeExpectedResults(0); + + for (int skip = 1; skip <= rowList1.size() + rowList2.size() + 1; skip++) { + final List expected = makeExpectedResults(skip); + final List resultsAfterSkip = new ScanQueryOffsetSequence( + Sequences.simple(unskipped), + skip + ).toList(); + + Assert.assertEquals("skip = " + skip, expected, resultsAfterSkip); + } + } + + /** + * Return a list of 0, 1, or 2 {@link ScanResultValue} based on the "skip" parameter. Rows are taken from + * {@link #rowList1} and {@link #rowList2}. + */ + private List makeExpectedResults(final int skip) + { + final List expected; + + if (skip < rowList1.size()) { + // Somewhere inside the first ScanResultValue. + expected = ImmutableList.of( + new ScanResultValue( + "1", + ImmutableList.of("a", "b"), + rowList1.subList(skip, rowList1.size()) + ), + new ScanResultValue( + "2", + ImmutableList.of("b", "c"), + rowList2 + ) + ); + } else if (skip < rowList1.size() + rowList2.size()) { + // Somewhere inside the second ScanResultValue. + expected = ImmutableList.of( + new ScanResultValue( + "2", + ImmutableList.of("b", "c"), + rowList2.subList(skip - rowList1.size(), rowList2.size()) + ) + ); + } else { + // Past the second ScanResultValue. + expected = ImmutableList.of(); + } + + return expected; + } +} diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryResultOrderingTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryResultOrderingTest.java new file mode 100644 index 00000000000..e1bbc71d29b --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryResultOrderingTest.java @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.scan; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.Sets; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.guava.MergeSequence; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.query.DefaultGenericQueryMetricsFactory; +import org.apache.druid.query.Druids; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; +import org.apache.druid.segment.RowAdapter; +import org.apache.druid.segment.RowBasedSegment; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.timeline.SegmentId; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.function.Function; +import java.util.function.ToLongFunction; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Tests the order in which Scan query results come back. + * + * Ensures that we have run-to-run stability of result order, which is important for offset-based pagination. + */ +@RunWith(Parameterized.class) +public class ScanQueryResultOrderingTest +{ + private static final String DATASOURCE = "datasource"; + private static final String ID_COLUMN = "id"; + + private static final RowAdapter ROW_ADAPTER = new RowAdapter() + { + @Override + public ToLongFunction timestampFunction() + { + return row -> ((DateTime) row[0]).getMillis(); + } + + @Override + public Function columnFunction(String columnName) + { + if (ID_COLUMN.equals(columnName)) { + return row -> row[1]; + } else if (ColumnHolder.TIME_COLUMN_NAME.equals(columnName)) { + return timestampFunction()::applyAsLong; + } else { + return row -> null; + } + } + }; + + private static final RowSignature ROW_SIGNATURE = RowSignature.builder() + .addTimeColumn() + .add(ID_COLUMN, ValueType.LONG) + .build(); + + private static final List> SEGMENTS = ImmutableList.of( + new RowBasedSegment<>( + SegmentId.of(DATASOURCE, Intervals.of("2000-01-01/P1D"), "1", 0), + ImmutableList.of( + new Object[]{DateTimes.of("2000T01"), 101}, + new Object[]{DateTimes.of("2000T01"), 80}, + new Object[]{DateTimes.of("2000T01"), 232}, + new Object[]{DateTimes.of("2000T01"), 12}, + new Object[]{DateTimes.of("2000T02"), 808}, + new Object[]{DateTimes.of("2000T02"), 411}, + new Object[]{DateTimes.of("2000T02"), 383}, + new Object[]{DateTimes.of("2000T05"), 22} + ), + ROW_ADAPTER, + ROW_SIGNATURE + ), + new RowBasedSegment<>( + SegmentId.of(DATASOURCE, Intervals.of("2000-01-01/P1D"), "1", 1), + ImmutableList.of( + new Object[]{DateTimes.of("2000T01"), 333}, + new Object[]{DateTimes.of("2000T01"), 222}, + new Object[]{DateTimes.of("2000T01"), 444}, + new Object[]{DateTimes.of("2000T01"), 111}, + new Object[]{DateTimes.of("2000T03"), 555}, + new Object[]{DateTimes.of("2000T03"), 999}, + new Object[]{DateTimes.of("2000T03"), 888}, + new Object[]{DateTimes.of("2000T05"), 777} + ), + ROW_ADAPTER, + ROW_SIGNATURE + ), + new RowBasedSegment<>( + SegmentId.of(DATASOURCE, Intervals.of("2000-01-02/P1D"), "1", 0), + ImmutableList.of( + new Object[]{DateTimes.of("2000-01-02T00"), 7}, + new Object[]{DateTimes.of("2000-01-02T02"), 9}, + new Object[]{DateTimes.of("2000-01-02T03"), 8} + ), + ROW_ADAPTER, + ROW_SIGNATURE + ) + ); + + private final List segmentToServerMap; + private final int limit; + private final int batchSize; + private final int maxRowsQueuedForOrdering; + + private ScanQueryRunnerFactory queryRunnerFactory; + private List> segmentRunners; + + @Parameterized.Parameters(name = "Segment-to-server map[{0}], limit[{1}], batchSize[{2}], maxRowsQueuedForOrdering[{3}]") + public static Iterable constructorFeeder() + { + // Set number of server equal to number of segments, then try all possible distributions of segments to servers. + final int numServers = SEGMENTS.size(); + + final Set> segmentToServerMaps = Sets.cartesianProduct( + IntStream.range(0, SEGMENTS.size()) + .mapToObj(i -> IntStream.range(0, numServers).boxed().collect(Collectors.toSet())) + .collect(Collectors.toList()) + ); + + // Try every limit up to one past the total number of rows. + final Set limits = new TreeSet<>(); + final int totalNumRows = SEGMENTS.stream().mapToInt(s -> s.asStorageAdapter().getNumRows()).sum(); + for (int i = 0; i <= totalNumRows + 1; i++) { + limits.add(i); + } + + // Try various batch sizes. + final Set batchSizes = ImmutableSortedSet.of(1, 2, 100); + final Set maxRowsQueuedForOrderings = ImmutableSortedSet.of(1, 7, 100000); + + return Sets.cartesianProduct( + segmentToServerMaps, + limits, + batchSizes, + maxRowsQueuedForOrderings + ).stream().map(args -> args.toArray(new Object[0])).collect(Collectors.toList()); + } + + public ScanQueryResultOrderingTest( + final List segmentToServerMap, + final int limit, + final int batchSize, + final int maxRowsQueuedForOrdering + ) + { + this.segmentToServerMap = segmentToServerMap; + this.limit = limit; + this.batchSize = batchSize; + this.maxRowsQueuedForOrdering = maxRowsQueuedForOrdering; + } + + @Before + public void setUp() + { + queryRunnerFactory = new ScanQueryRunnerFactory( + new ScanQueryQueryToolChest( + new ScanQueryConfig(), + new DefaultGenericQueryMetricsFactory() + ), + new ScanQueryEngine(), + new ScanQueryConfig() + ); + + segmentRunners = SEGMENTS.stream().map(queryRunnerFactory::createRunner).collect(Collectors.toList()); + } + + @Test + public void testOrderNone() + { + assertResultsEquals( + Druids.newScanQueryBuilder() + .dataSource("ds") + .intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2000/P1D")))) + .columns(ColumnHolder.TIME_COLUMN_NAME, ID_COLUMN) + .order(ScanQuery.Order.NONE) + .build(), + ImmutableList.of( + 101, + 80, + 232, + 12, + 808, + 411, + 383, + 22, + 333, + 222, + 444, + 111, + 555, + 999, + 888, + 777, + 7, + 9, + 8 + ) + ); + } + + @Test + public void testOrderTimeAscending() + { + assertResultsEquals( + Druids.newScanQueryBuilder() + .dataSource("ds") + .intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2000/P1D")))) + .columns(ColumnHolder.TIME_COLUMN_NAME, ID_COLUMN) + .order(ScanQuery.Order.ASCENDING) + .build(), + ImmutableList.of( + 101, + 80, + 232, + 12, + 333, + 222, + 444, + 111, + 808, + 411, + 383, + 555, + 999, + 888, + 22, + 777, + 7, + 9, + 8 + ) + ); + } + + @Test + public void testOrderTimeDescending() + { + assertResultsEquals( + Druids.newScanQueryBuilder() + .dataSource("ds") + .intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2000/P1D")))) + .columns(ColumnHolder.TIME_COLUMN_NAME, ID_COLUMN) + .order(ScanQuery.Order.DESCENDING) + .build(), + ImmutableList.of( + 8, + 9, + 7, + 777, + 22, + 888, + 999, + 555, + 383, + 411, + 808, + 111, + 444, + 222, + 333, + 12, + 232, + 80, + 101 + ) + ); + } + + private void assertResultsEquals(final ScanQuery query, final List expectedResults) + { + final List>>> serverRunners = new ArrayList<>(); + for (int i = 0; i <= segmentToServerMap.stream().max(Comparator.naturalOrder()).orElse(0); i++) { + serverRunners.add(new ArrayList<>()); + } + + for (int segmentNumber = 0; segmentNumber < segmentToServerMap.size(); segmentNumber++) { + final SegmentId segmentId = SEGMENTS.get(segmentNumber).getId(); + final int serverNumber = segmentToServerMap.get(segmentNumber); + + serverRunners.get(serverNumber).add(Pair.of(segmentId, segmentRunners.get(segmentNumber))); + } + + // Simulates what the Historical servers would do. + final List> mergedServerRunners = + serverRunners.stream() + .filter(runners -> !runners.isEmpty()) + .map( + runners -> + queryRunnerFactory.getToolchest().mergeResults( + new QueryRunner() + { + @Override + public Sequence run( + final QueryPlus queryPlus, + final ResponseContext responseContext + ) + { + return queryRunnerFactory.mergeRunners( + Execs.directExecutor(), + runners.stream().map(p -> p.rhs).collect(Collectors.toList()) + ).run( + queryPlus.withQuery( + queryPlus.getQuery() + .withQuerySegmentSpec( + new MultipleSpecificSegmentSpec( + runners.stream() + .map(p -> p.lhs.toDescriptor()) + .collect(Collectors.toList()) + ) + ) + ), + responseContext + ); + } + } + ) + ) + .collect(Collectors.toList()); + + // Simulates what the Broker would do. + final QueryRunner brokerRunner = queryRunnerFactory.getToolchest().mergeResults( + (queryPlus, responseContext) -> { + final List> sequences = + mergedServerRunners.stream() + .map(runner -> runner.run(queryPlus.withoutThreadUnsafeState())) + .collect(Collectors.toList()); + + return new MergeSequence<>( + queryPlus.getQuery().getResultOrdering(), + Sequences.simple(sequences) + ); + } + ); + + // Finally: run the query. + final List results = runQuery( + (ScanQuery) Druids.ScanQueryBuilder.copy(query) + .limit(limit) + .batchSize(batchSize) + .build() + .withOverriddenContext( + ImmutableMap.of( + ScanQueryConfig.CTX_KEY_MAX_ROWS_QUEUED_FOR_ORDERING, + maxRowsQueuedForOrdering + ) + ), + brokerRunner + ); + + Assert.assertEquals( + expectedResults.stream().limit(limit == 0 ? Long.MAX_VALUE : limit).collect(Collectors.toList()), + results + ); + } + + private List runQuery(final ScanQuery query, final QueryRunner brokerRunner) + { + final List results = queryRunnerFactory.getToolchest().resultsAsArrays( + query, + brokerRunner.run(QueryPlus.wrap(query)) + ).toList(); + + return results.stream().mapToInt(row -> (int) row[1]).boxed().collect(Collectors.toList()); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java index 801324847d7..62e2923b26a 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java @@ -138,7 +138,7 @@ public class ScanQueryRunnerFactoryTest }); Sequence inputSequence = Sequences.simple(srvs); try { - List output = FACTORY.priorityQueueSortAndLimit( + List output = FACTORY.stableLimitingSort( inputSequence, query, ImmutableList.of(new Interval( diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQuerySpecTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQuerySpecTest.java index 661960b609f..f609b283f5e 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQuerySpecTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQuerySpecTest.java @@ -22,6 +22,7 @@ package org.apache.druid.query.scan; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunnerTestHelper; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.spec.LegacySegmentSpec; @@ -36,7 +37,7 @@ public class ScanQuerySpecTest private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); @Test - public void testSerializationLegacyString() throws Exception + public void testSerialization() throws Exception { String legacy = "{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"testing\"}," @@ -67,6 +68,7 @@ public class ScanQuerySpecTest VirtualColumns.EMPTY, ScanQuery.ResultFormat.RESULT_FORMAT_LIST, 0, + 0, 3, ScanQuery.Order.NONE, null, @@ -80,4 +82,27 @@ public class ScanQuerySpecTest Assert.assertEquals(query, JSON_MAPPER.readValue(actual, ScanQuery.class)); Assert.assertEquals(query, JSON_MAPPER.readValue(legacy, ScanQuery.class)); } + + @Test + public void testSerializationLegacyString() throws Exception + { + ScanQuery query = new ScanQuery( + new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE), + new LegacySegmentSpec(Intervals.of("2011-01-12/2011-01-14")), + VirtualColumns.EMPTY, + ScanQuery.ResultFormat.RESULT_FORMAT_LIST, + 0, + 1, + 3, + ScanQuery.Order.NONE, + null, + Arrays.asList("market", "quality", "index"), + null, + null + ); + + final String serialized = JSON_MAPPER.writeValueAsString(query); + final ScanQuery deserialized = (ScanQuery) JSON_MAPPER.readValue(serialized, Query.class); + Assert.assertEquals(query, deserialized); + } } diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTimestampComparatorTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTimestampComparatorTest.java index 465794a2831..47cc4208640 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTimestampComparatorTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTimestampComparatorTest.java @@ -82,7 +82,7 @@ public class ScanResultValueTimestampComparatorTest events2 ); - Assert.assertEquals(-1, comparator.compare(s1, s2)); + Assert.assertEquals(1, comparator.compare(s1, s2)); } @Test @@ -119,7 +119,7 @@ public class ScanResultValueTimestampComparatorTest events2 ); - Assert.assertEquals(1, comparator.compare(s1, s2)); + Assert.assertEquals(-1, comparator.compare(s1, s2)); } @Test @@ -154,7 +154,7 @@ public class ScanResultValueTimestampComparatorTest events2 ); - Assert.assertEquals(-1, comparator.compare(s1, s2)); + Assert.assertEquals(1, comparator.compare(s1, s2)); } @Test @@ -189,6 +189,6 @@ public class ScanResultValueTimestampComparatorTest events2 ); - Assert.assertEquals(1, comparator.compare(s1, s2)); + Assert.assertEquals(-1, comparator.compare(s1, s2)); } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java index 9c1f40c94f1..45c29d1787a 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java @@ -995,6 +995,7 @@ public class DruidQuery getVirtualColumns(true), ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST, 0, + 0, scanLimit, order, filtration.getDimFilter(), diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index e7b537edc97..47d7777674a 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -1059,7 +1059,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ImmutableList.of(), ImmutableList.of( new Object[]{ - "DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"order\":\"none\",\"filter\":null,\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"dim3\",\"m1\",\"m2\",\"unique_dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{__time:LONG, cnt:LONG, dim1:STRING, dim2:STRING, dim3:STRING, m1:FLOAT, m2:DOUBLE, unique_dim1:COMPLEX}])\n" + "DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"order\":\"none\",\"filter\":null,\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"dim3\",\"m1\",\"m2\",\"unique_dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{__time:LONG, cnt:LONG, dim1:STRING, dim2:STRING, dim3:STRING, m1:FLOAT, m2:DOUBLE, unique_dim1:COMPLEX}])\n" } ) ); @@ -7403,7 +7403,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest final String explanation = "DruidOuterQueryRel(query=[{\"queryType\":\"timeseries\",\"dataSource\":{\"type\":\"table\",\"name\":\"__subquery__\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"descending\":false,\"virtualColumns\":[],\"filter\":null,\"granularity\":{\"type\":\"all\"},\"aggregations\":[{\"type\":\"count\",\"name\":\"a0\"}],\"postAggregations\":[],\"limit\":2147483647,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"skipEmptyBuckets\":true,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\"}}], signature=[{a0:LONG}])\n" + " DruidJoinQueryRel(condition=[=(SUBSTRING($3, 1, 1), $8)], joinType=[inner], query=[{\"queryType\":\"groupBy\",\"dataSource\":{\"type\":\"table\",\"name\":\"__join__\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"filter\":null,\"granularity\":{\"type\":\"all\"},\"dimensions\":[{\"type\":\"default\",\"dimension\":\"dim2\",\"outputName\":\"d0\",\"outputType\":\"STRING\"}],\"aggregations\":[],\"postAggregations\":[],\"having\":null,\"limitSpec\":{\"type\":\"NoopLimitSpec\"},\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\"},\"descending\":false}], signature=[{d0:STRING}])\n" - + " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"order\":\"none\",\"filter\":null,\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"dim3\",\"m1\",\"m2\",\"unique_dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{__time:LONG, cnt:LONG, dim1:STRING, dim2:STRING, dim3:STRING, m1:FLOAT, m2:DOUBLE, unique_dim1:COMPLEX}])\n" + + " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"order\":\"none\",\"filter\":null,\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"dim3\",\"m1\",\"m2\",\"unique_dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{__time:LONG, cnt:LONG, dim1:STRING, dim2:STRING, dim3:STRING, m1:FLOAT, m2:DOUBLE, unique_dim1:COMPLEX}])\n" + " DruidQueryRel(query=[{\"queryType\":\"groupBy\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"filter\":{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"dim1\",\"value\":null,\"extractionFn\":null}},\"granularity\":{\"type\":\"all\"},\"dimensions\":[{\"type\":\"extraction\",\"dimension\":\"dim1\",\"outputName\":\"d0\",\"outputType\":\"STRING\",\"extractionFn\":{\"type\":\"substring\",\"index\":0,\"length\":1}}],\"aggregations\":[],\"postAggregations\":[],\"having\":null,\"limitSpec\":{\"type\":\"NoopLimitSpec\"},\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\"},\"descending\":false}], signature=[{d0:STRING}])\n"; testQuery(