Time Ordering On Scans (#7133)

* Moved Scan Builder to Druids class and started on Scan Benchmark setup

* Need to form queries

* It runs.

* Stuff for time-ordered scan query

* Move ScanResultValue timestamp comparator to a separate class for testing

* Licensing stuff

* Change benchmark

* Remove todos

* Added TimestampComparator tests

* Change number of benchmark iterations

* Added time ordering to the scan benchmark

* Changed benchmark params

* More param changes

* Benchmark param change

* Made Jon's changes and removed TODOs

* Broke some long lines into two lines

* nit

* Decrease segment size for less memory usage

* Wrote tests for heapsort scan result values and fixed bug where iterator
wasn't returning elements in correct order

* Wrote more tests for scan result value sort

* Committing a param change to kick teamcity

* Fixed codestyle and forbidden API errors

* .

* Improved conciseness

* nit

* Created an error message for when someone tries to time order a result
set > threshold limit

* Set to spaces over tabs

* Fixing tests WIP

* Fixed failing calcite tests

* Kicking travis with change to benchmark param

* added all query types to scan benchmark

* Fixed benchmark queries

* Renamed sort function

* Added javadoc on ScanResultValueTimestampComparator

* Unused import

* Added more javadoc

* improved doc

* Removed unused import to satisfy PMD check

* Small changes

* Changes based on Gian's comments

* Fixed failing test due to null resultFormat

* Added config and get # of segments

* Set up time ordering strategy decision tree

* Refactor and pQueue works

* Cleanup

* Ordering is correct on n-way merge -> still need to batch events into
ScanResultValues

* WIP

* Sequence stuff is so dirty :(

* Fixed bug introduced by replacing deque with list

* Wrote docs

* Multi-historical setup works

* WIP

* Change so batching only occurs on broker for time-ordered scans

Restricted batching to broker for time-ordered queries and adjusted
tests

Formatting

Cleanup

* Fixed mistakes in merge

* Fixed failing tests

* Reset config

* Wrote tests and added Javadoc

* Nit-change on javadoc

* Checkstyle fix

* Improved test and appeased TeamCity

* Sorry, checkstyle

* Applied Jon's recommended changes

* Checkstyle fix

* Optimization

* Fixed tests

* Updated error message

* Added error message for UOE

* Renaming

* Finish rename

* Smarter limiting for pQueue method

* Optimized n-way merge strategy

* Rename segment limit -> segment partitions limit

* Added a bit of docs

* More comments

* Fix checkstyle and test

* Nit comment

* Fixed failing tests -> allow usage of all types of segment spec

* Fixed failing tests -> allow usage of all types of segment spec

* Revert "Fixed failing tests -> allow usage of all types of segment spec"

This reverts commit ec470288c7.

* Revert "Merge branch '6088-Time-Ordering-On-Scans-N-Way-Merge' of github.com:justinborromeo/incubator-druid into 6088-Time-Ordering-On-Scans-N-Way-Merge"

This reverts commit 57033f36df, reversing
changes made to 8f01d8dd16.

* Check type of segment spec before using for time ordering

* Fix bug in numRowsScanned

* Fix bug messing up count of rows

* Fix docs and flipped boolean in ScanQueryLimitRowIterator

* Refactor n-way merge

* Added test for n-way merge

* Refixed regression

* Checkstyle and doc update

* Modified sequence limit to accept longs and added test for long limits

* doc fix

* Implemented Clint's recommendations
This commit is contained in:
Justin Borromeo 2019-03-28 14:37:09 -07:00 committed by Jonathan Wei
parent ffa95859c2
commit ad7862c58a
31 changed files with 1862 additions and 197 deletions

View File

@ -107,12 +107,15 @@ public class ScanBenchmark
@Param({"200000"})
private int rowsPerSegment;
@Param({"basic.A"})
@Param({"basic.A", "basic.B", "basic.C", "basic.D"})
private String schemaAndQuery;
@Param({"1000", "99999"})
private int limit;
@Param({"NONE", "DESCENDING", "ASCENDING"})
private static ScanQuery.Order ordering;
private static final Logger log = new Logger(ScanBenchmark.class);
private static final ObjectMapper JSON_MAPPER;
private static final IndexMergerV9 INDEX_MERGER_V9;
@ -178,7 +181,8 @@ public class ScanBenchmark
return Druids.newScanQueryBuilder()
.dataSource("blah")
.intervals(intervalSpec);
.intervals(intervalSpec)
.order(ordering);
}
private static Druids.ScanQueryBuilder basicB(final BenchmarkSchemaInfo basicSchema)
@ -197,7 +201,9 @@ public class ScanBenchmark
return Druids.newScanQueryBuilder()
.filters(filter)
.intervals(intervalSpec);
.dataSource("blah")
.intervals(intervalSpec)
.order(ordering);
}
private static Druids.ScanQueryBuilder basicC(final BenchmarkSchemaInfo basicSchema)
@ -208,7 +214,9 @@ public class ScanBenchmark
final String dimName = "dimUniform";
return Druids.newScanQueryBuilder()
.filters(new SelectorDimFilter(dimName, "3", StrlenExtractionFn.instance()))
.intervals(intervalSpec);
.intervals(intervalSpec)
.dataSource("blah")
.order(ordering);
}
private static Druids.ScanQueryBuilder basicD(final BenchmarkSchemaInfo basicSchema)
@ -221,7 +229,9 @@ public class ScanBenchmark
return Druids.newScanQueryBuilder()
.filters(new BoundDimFilter(dimName, "100", "10000", true, true, true, null, null))
.intervals(intervalSpec);
.intervals(intervalSpec)
.dataSource("blah")
.order(ordering);
}
@Setup
@ -289,7 +299,8 @@ public class ScanBenchmark
config,
DefaultGenericQueryMetricsFactory.instance()
),
new ScanQueryEngine()
new ScanQueryEngine(),
new ScanQueryConfig()
);
}

View File

@ -30,11 +30,11 @@ import java.io.IOException;
final class LimitedSequence<T> extends YieldingSequenceBase<T>
{
private final Sequence<T> baseSequence;
private final int limit;
private final long limit;
LimitedSequence(
Sequence<T> baseSequence,
int limit
long limit
)
{
Preconditions.checkNotNull(baseSequence);
@ -106,7 +106,7 @@ final class LimitedSequence<T> extends YieldingSequenceBase<T>
private class LimitedYieldingAccumulator<OutType, T> extends DelegatingYieldingAccumulator<OutType, T>
{
int count;
long count;
boolean interruptYield = false;
LimitedYieldingAccumulator(YieldingAccumulator<OutType, T> accumulator)

View File

@ -27,6 +27,7 @@ import java.io.IOException;
import java.util.PriorityQueue;
/**
* Used to perform an n-way merge on n ordered sequences
*/
public class MergeSequence<T> extends YieldingSequenceBase<T>
{
@ -42,20 +43,18 @@ public class MergeSequence<T> extends YieldingSequenceBase<T>
this.baseSequences = (Sequence<? extends Sequence<T>>) baseSequences;
}
/*
Note: the yielder for MergeSequence returns elements from the priority queue in order of increasing priority.
This is due to the fact that PriorityQueue#remove() polls from the head of the queue which is, according to
the PriorityQueue javadoc, "the least element with respect to the specified ordering"
*/
@Override
public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator)
{
PriorityQueue<Yielder<T>> pQueue = new PriorityQueue<>(
32,
ordering.onResultOf(
new Function<Yielder<T>, T>()
{
@Override
public T apply(Yielder<T> input)
{
return input.get();
}
}
(Function<Yielder<T>, T>) input -> input.get()
)
);

View File

@ -54,13 +54,13 @@ public interface Sequence<T>
<OutType> OutType accumulate(OutType initValue, Accumulator<OutType, T> accumulator);
/**
* Return an Yielder for accumulated sequence.
* Return a Yielder for accumulated sequence.
*
* @param initValue the initial value to pass along to start the accumulation.
* @param accumulator the accumulator which is responsible for accumulating input values.
* @param <OutType> the type of accumulated value.
*
* @return an Yielder for accumulated sequence.
* @return a Yielder for accumulated sequence.
*
* @see Yielder
*/
@ -72,6 +72,8 @@ public interface Sequence<T>
}
/**
* This will materialize the entire sequence. Use at your own risk.
*
* Several benchmarks rely on this method to eagerly accumulate Sequences to ArrayLists. e.g.
* GroupByBenchmark.
*/
@ -80,7 +82,7 @@ public interface Sequence<T>
return accumulate(new ArrayList<>(), Accumulators.list());
}
default Sequence<T> limit(int limit)
default Sequence<T> limit(long limit)
{
return new LimitedSequence<>(this, limit);
}

View File

@ -24,7 +24,16 @@ title: "Scan query"
# Scan query
Scan query returns raw Druid rows in streaming mode.
The Scan query returns raw Druid rows in streaming mode. The biggest difference between the Select query and the Scan
query is that the Scan query does not retain all the returned rows in memory before they are returned to the client.
The Select query _will_ retain the rows in memory, causing memory pressure if too many rows are returned.
The Scan query can return all the rows without issuing another pagination query.
In addition to straightforward usage where a Scan query is issued to the Broker, the Scan query can also be issued
directly to Historical processes or streaming ingestion tasks. This can be useful if you want to retrieve large
amounts of data in parallel.
An example Scan query object is shown below:
```json
{
@ -36,28 +45,29 @@ Scan query returns raw Druid rows in streaming mode.
"2013-01-01/2013-01-02"
],
"batchSize":20480,
"limit":5
"limit":3
}
```
There are several main parts to a scan query:
The following are the main parameters for Scan queries:
|property|description|required?|
|--------|-----------|---------|
|queryType|This String should always be "scan"; this is the first thing Druid looks at to figure out how to interpret the query|yes|
|dataSource|A String or Object defining the data source to query, very similar to a table in a relational database. See [DataSource](../querying/datasource.html) for more information.|yes|
|intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes|
|resultFormat|How result represented, list or compactedList or valueVector. Currently only `list` and `compactedList` are supported. Default is `list`|no|
|resultFormat|How the results are represented: list, compactedList or valueVector. Currently only `list` and `compactedList` are supported. Default is `list`|no|
|filter|See [Filters](../querying/filters.html)|no|
|columns|A String array of dimensions and metrics to scan. If left empty, all dimensions and metrics are returned.|no|
|batchSize|How many rows buffered before return to client. Default is `20480`|no|
|limit|How many rows to return. If not specified, all rows will be returned.|no|
|order|The ordering of returned rows based on timestamp. "ascending", "descending", and "none" (default) are supported. Currently, "ascending" and "descending" are only supported for queries where the limit is less than `druid.query.scan.maxRowsQueuedForOrdering`. Scan queries that are either legacy mode or have a limit greater than `druid.query.scan.maxRowsQueuedForOrdering` will not be time-ordered and default to a order of "none".|none|
|legacy|Return results consistent with the legacy "scan-query" contrib extension. Defaults to the value set by `druid.query.scan.legacy`, which in turn defaults to false. See [Legacy mode](#legacy-mode) for details.|no|
|context|An additional JSON Object which can be used to specify certain flags.|no|
## Example results
The format of the result when resultFormat equals to `list`:
The format of the result when resultFormat equals `list`:
```json
[{
@ -123,41 +133,11 @@ The format of the result when resultFormat equals to `list`:
"delta" : 77.0,
"variation" : 77.0,
"deleted" : 0.0
}, {
"timestamp" : "2013-01-01T00:00:00.000Z",
"robot" : "0",
"namespace" : "article",
"anonymous" : "0",
"unpatrolled" : "0",
"page" : "113_U.S._73",
"language" : "en",
"newpage" : "1",
"user" : "MZMcBride",
"count" : 1.0,
"added" : 70.0,
"delta" : 70.0,
"variation" : 70.0,
"deleted" : 0.0
}, {
"timestamp" : "2013-01-01T00:00:00.000Z",
"robot" : "0",
"namespace" : "article",
"anonymous" : "0",
"unpatrolled" : "0",
"page" : "113_U.S._756",
"language" : "en",
"newpage" : "1",
"user" : "MZMcBride",
"count" : 1.0,
"added" : 68.0,
"delta" : 68.0,
"variation" : 68.0,
"deleted" : 0.0
} ]
} ]
```
The format of the result when resultFormat equals to `compactedList`:
The format of the result when resultFormat equals `compactedList`:
```json
[{
@ -168,17 +148,38 @@ The format of the result when resultFormat equals to `compactedList`:
"events" : [
["2013-01-01T00:00:00.000Z", "1", "article", "0", "0", "11._korpus_(NOVJ)", "sl", "0", "EmausBot", 1.0, 39.0, 39.0, 39.0, 0.0],
["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "112_U.S._580", "en", "1", "MZMcBride", 1.0, 70.0, 70.0, 70.0, 0.0],
["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "113_U.S._243", "en", "1", "MZMcBride", 1.0, 77.0, 77.0, 77.0, 0.0],
["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "113_U.S._73", "en", "1", "MZMcBride", 1.0, 70.0, 70.0, 70.0, 0.0],
["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "113_U.S._756", "en", "1", "MZMcBride", 1.0, 68.0, 68.0, 68.0, 0.0]
["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "113_U.S._243", "en", "1", "MZMcBride", 1.0, 77.0, 77.0, 77.0, 0.0]
]
} ]
```
The biggest difference between select query and scan query is that, scan query doesn't retain all rows in memory before rows can be returned to client.
It will cause memory pressure if too many rows required by select query.
Scan query doesn't have this issue.
Scan query can return all rows without issuing another pagination query, which is extremely useful when query against Historical or realtime process directly.
## Time Ordering
The Scan query currently supports ordering based on timestamp for non-legacy queries. Note that using time ordering
will yield results that do not indicate which segment rows are from (`segmentId` will show up as `null`). Furthermore,
time ordering is only supported where the result set limit is less than `druid.query.scan.maxRowsQueuedForOrdering`
rows **or** all segments scanned have fewer than `druid.query.scan.maxSegmentPartitionsOrderedInMemory` partitions. Also,
time ordering is not supported for queries issued directly to historicals unless a list of segments is specified. The
reasoning behind these limitations is that the implementation of time ordering uses two strategies that can consume too
much heap memory if left unbounded. These strategies (listed below) are chosen on a per-Historical basis depending on
query result set limit and the number of segments being scanned.
1. Priority Queue: Each segment on a Historical is opened sequentially. Every row is added to a bounded priority
queue which is ordered by timestamp. For every row above the result set limit, the row with the earliest (if descending)
or latest (if ascending) timestamp will be dequeued. After every row has been processed, the sorted contents of the
priority queue are streamed back to the Broker(s) in batches. Attempting to load too many rows into memory runs the
risk of Historical nodes running out of memory. The `druid.query.scan.maxRowsQueuedForOrdering` property protects
from this by limiting the number of rows in the query result set when time ordering is used.
2. N-Way Merge: For each segment, each partition is opened in parallel. Since each partition's rows are already
time-ordered, an n-way merge can be performed on the results from each partition. This approach doesn't persist the entire
result set in memory (like the Priority Queue) as it streams back batches as they are returned from the merge function.
However, attempting to query too many partition could also result in high memory usage due to the need to open
decompression and decoding buffers for each. The `druid.query.scan.maxSegmentPartitionsOrderedInMemory` limit protects
from this by capping the number of partitions opened at any times when time ordering is used.
Both `druid.query.scan.maxRowsQueuedForOrdering` and `druid.query.scan.maxSegmentPartitionsOrderedInMemory` are
configurable and can be tuned based on hardware specs and number of dimensions being queried.
## Legacy mode
@ -194,3 +195,11 @@ Legacy mode can be triggered either by passing `"legacy" : true` in your query J
`druid.query.scan.legacy = true` on your Druid processes. If you were previously using the scan-query contrib extension,
the best way to migrate is to activate legacy mode during a rolling upgrade, then switch it off after the upgrade
is complete.
## Configuration Properties
|property|description|values|default|
|--------|-----------|------|-------|
|druid.query.scan.maxRowsQueuedForOrdering|The maximum number of rows returned when time ordering is used|An integer in [0, 2147483647]|100000|
|druid.query.scan.maxSegmentPartitionsOrderedInMemory|The maximum number of segments scanned per historical when time ordering is used|An integer in [0, 2147483647]|50|
|druid.query.scan.legacy|Whether legacy mode should be turned on for Scan queries|true or false|false|

View File

@ -2586,7 +2586,8 @@ public class KafkaIndexTaskTest
new ScanQueryConfig(),
new DefaultGenericQueryMetricsFactory(TestHelper.makeJsonMapper())
),
new ScanQueryEngine()
new ScanQueryEngine(),
new ScanQueryConfig()
)
)
.build()

View File

@ -918,12 +918,13 @@ public class Druids
private QuerySegmentSpec querySegmentSpec;
private VirtualColumns virtualColumns;
private Map<String, Object> context;
private String resultFormat;
private ScanQuery.ResultFormat resultFormat;
private int batchSize;
private long limit;
private DimFilter dimFilter;
private List<String> columns;
private Boolean legacy;
private ScanQuery.Order order;
public ScanQueryBuilder()
{
@ -937,6 +938,7 @@ public class Druids
dimFilter = null;
columns = new ArrayList<>();
legacy = null;
order = null;
}
public ScanQuery build()
@ -948,6 +950,7 @@ public class Druids
resultFormat,
batchSize,
limit,
order,
dimFilter,
columns,
legacy,
@ -967,7 +970,8 @@ public class Druids
.filters(query.getFilter())
.columns(query.getColumns())
.legacy(query.isLegacy())
.context(query.getContext());
.context(query.getContext())
.order(query.getOrder());
}
public ScanQueryBuilder dataSource(String ds)
@ -1005,7 +1009,7 @@ public class Druids
return this;
}
public ScanQueryBuilder resultFormat(String r)
public ScanQueryBuilder resultFormat(ScanQuery.ResultFormat r)
{
resultFormat = r;
return this;
@ -1046,6 +1050,12 @@ public class Druids
this.legacy = legacy;
return this;
}
public ScanQueryBuilder order(ScanQuery.Order order)
{
this.order = order;
return this;
}
}
public static ScanQueryBuilder newScanQueryBuilder()

View File

@ -21,7 +21,11 @@ package org.apache.druid.query.scan;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Preconditions;
import com.google.common.collect.Ordering;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Druids;
@ -36,26 +40,88 @@ import java.util.Objects;
public class ScanQuery extends BaseQuery<ScanResultValue>
{
public static final String RESULT_FORMAT_LIST = "list";
public static final String RESULT_FORMAT_COMPACTED_LIST = "compactedList";
public static final String RESULT_FORMAT_VALUE_VECTOR = "valueVector";
public enum ResultFormat
{
RESULT_FORMAT_LIST,
RESULT_FORMAT_COMPACTED_LIST,
RESULT_FORMAT_VALUE_VECTOR;
@JsonValue
@Override
public String toString()
{
switch (this) {
case RESULT_FORMAT_LIST:
return "list";
case RESULT_FORMAT_COMPACTED_LIST:
return "compactedList";
case RESULT_FORMAT_VALUE_VECTOR:
return "valueVector";
default:
return "";
}
}
@JsonCreator
public static ResultFormat fromString(String name)
{
switch (name) {
case "compactedList":
return RESULT_FORMAT_COMPACTED_LIST;
case "valueVector":
return RESULT_FORMAT_VALUE_VECTOR;
case "list":
return RESULT_FORMAT_LIST;
default:
throw new UOE("Scan query result format [%s] is not supported.", name);
}
}
}
public enum Order
{
ASCENDING,
DESCENDING,
NONE;
@JsonValue
@Override
public String toString()
{
return StringUtils.toLowerCase(this.name());
}
@JsonCreator
public static Order fromString(String name)
{
return valueOf(StringUtils.toUpperCase(name));
}
}
/**
* This context flag corresponds to whether the query is running on the "outermost" process (i.e. the process
* the query is sent to).
*/
public static final String CTX_KEY_OUTERMOST = "scanOutermost";
private final VirtualColumns virtualColumns;
private final String resultFormat;
private final ResultFormat resultFormat;
private final int batchSize;
private final long limit;
private final DimFilter dimFilter;
private final List<String> columns;
private final Boolean legacy;
private final Order order;
@JsonCreator
public ScanQuery(
@JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("virtualColumns") VirtualColumns virtualColumns,
@JsonProperty("resultFormat") String resultFormat,
@JsonProperty("resultFormat") ResultFormat resultFormat,
@JsonProperty("batchSize") int batchSize,
@JsonProperty("limit") long limit,
@JsonProperty("order") Order order,
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("columns") List<String> columns,
@JsonProperty("legacy") Boolean legacy,
@ -64,7 +130,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
{
super(dataSource, querySegmentSpec, false, context);
this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns);
this.resultFormat = resultFormat == null ? RESULT_FORMAT_LIST : resultFormat;
this.resultFormat = (resultFormat == null) ? ResultFormat.RESULT_FORMAT_LIST : resultFormat;
this.batchSize = (batchSize == 0) ? 4096 * 5 : batchSize;
this.limit = (limit == 0) ? Long.MAX_VALUE : limit;
Preconditions.checkArgument(this.batchSize > 0, "batchSize must be greater than 0");
@ -72,6 +138,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
this.dimFilter = dimFilter;
this.columns = columns;
this.legacy = legacy;
this.order = order == null ? Order.NONE : order;
}
@JsonProperty
@ -81,7 +148,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
}
@JsonProperty
public String getResultFormat()
public ResultFormat getResultFormat()
{
return resultFormat;
}
@ -98,6 +165,12 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
return limit;
}
@JsonProperty
public Order getOrder()
{
return order;
}
@Override
public boolean hasFilters()
{
@ -132,6 +205,12 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
return legacy;
}
@Override
public Ordering<ScanResultValue> getResultOrdering()
{
return Ordering.from(new ScanResultValueTimestampComparator(this)).reverse();
}
public ScanQuery withNonNullLegacy(final ScanQueryConfig scanQueryConfig)
{
return Druids.ScanQueryBuilder.copy(this).legacy(legacy != null ? legacy : scanQueryConfig.isLegacy()).build();

View File

@ -40,6 +40,22 @@ public class ScanQueryConfig
return this;
}
@JsonProperty
private int maxRowsQueuedForOrdering = 100000;
public int getMaxRowsQueuedForOrdering()
{
return maxRowsQueuedForOrdering;
}
@JsonProperty
private int maxSegmentPartitionsOrderedInMemory = 50;
public int getMaxSegmentPartitionsOrderedInMemory()
{
return maxSegmentPartitionsOrderedInMemory;
}
@Override
public boolean equals(final Object o)
{

View File

@ -68,7 +68,7 @@ public class ScanQueryEngine
if (responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) != null) {
long count = (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT);
if (count >= query.getLimit()) {
if (count >= query.getLimit() && query.getOrder().equals(ScanQuery.Order.NONE)) {
return Sequences.empty();
}
}
@ -123,7 +123,7 @@ public class ScanQueryEngine
if (responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) == null) {
responseContext.put(ScanQueryRunnerFactory.CTX_COUNT, 0L);
}
final long limit = query.getLimit() - (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT);
final long limit = calculateLimit(query, responseContext);
return Sequences.concat(
adapter
.makeCursors(
@ -131,7 +131,8 @@ public class ScanQueryEngine
intervals.get(0),
query.getVirtualColumns(),
Granularities.ALL,
query.isDescending(),
query.getOrder().equals(ScanQuery.Order.DESCENDING) ||
(query.getOrder().equals(ScanQuery.Order.NONE) && query.isDescending()),
null
)
.map(cursor -> new BaseSequence<>(
@ -177,13 +178,13 @@ public class ScanQueryEngine
}
final long lastOffset = offset;
final Object events;
final String resultFormat = query.getResultFormat();
if (ScanQuery.RESULT_FORMAT_COMPACTED_LIST.equals(resultFormat)) {
final ScanQuery.ResultFormat resultFormat = query.getResultFormat();
if (ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST.equals(resultFormat)) {
events = rowsToCompactedList();
} else if (ScanQuery.RESULT_FORMAT_LIST.equals(resultFormat)) {
} else if (ScanQuery.ResultFormat.RESULT_FORMAT_LIST.equals(resultFormat)) {
events = rowsToList();
} else {
throw new UOE("resultFormat[%s] is not supported", resultFormat);
throw new UOE("resultFormat[%s] is not supported", resultFormat.toString());
}
responseContext.put(
ScanQueryRunnerFactory.CTX_COUNT,
@ -256,4 +257,16 @@ public class ScanQueryEngine
))
);
}
/**
* If we're performing time-ordering, we want to scan through the first `limit` rows in each segment ignoring the number
* of rows already counted on other segments.
*/
private long calculateLimit(ScanQuery query, Map<String, Object> responseContext)
{
if (query.getOrder().equals(ScanQuery.Order.NONE)) {
return query.getLimit() - (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT);
}
return query.getLimit();
}
}

View File

@ -19,23 +19,43 @@
package org.apache.druid.query.scan;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* This iterator supports iteration through a Sequence returned by a ScanResultValue QueryRunner. Its behaviour
* varies depending on whether the query is returning time-ordered values and whether the CTX_KEY_OUTERMOST flag is
* set as false.
*
* Behaviours:
* 1) No time ordering: expects a Sequence of ScanResultValues which each contain up to query.batchSize events.
* The iterator will be "done" when the limit of events is reached. The final ScanResultValue might contain
* fewer than batchSize events so that the limit number of events is returned.
* 2) Time Ordering, CTX_KEY_OUTERMOST false: Same behaviour as no time ordering
* 3) Time Ordering, CTX_KEY_OUTERMOST=true or null: The Sequence processed in this case should contain ScanResultValues
* that contain only one event each for the CachingClusteredClient n-way merge. This iterator will perform
* batching according to query batch size until the limit is reached.
*/
public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultValue>
{
private Yielder<ScanResultValue> yielder;
private String resultFormat;
private ScanQuery.ResultFormat resultFormat;
private long limit;
private long count = 0;
private ScanQuery query;
public ScanQueryLimitRowIterator(
QueryRunner<ScanResultValue> baseRunner,
@ -43,11 +63,13 @@ public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultVa
Map<String, Object> responseContext
)
{
ScanQuery query = (ScanQuery) queryPlus.getQuery();
resultFormat = query.getResultFormat();
limit = query.getLimit();
Sequence<ScanResultValue> baseSequence = baseRunner.run(queryPlus, responseContext);
yielder = baseSequence.toYielder(
this.query = (ScanQuery) queryPlus.getQuery();
this.resultFormat = query.getResultFormat();
this.limit = query.getLimit();
Query<ScanResultValue> historicalQuery =
queryPlus.getQuery().withOverriddenContext(ImmutableMap.of(ScanQuery.CTX_KEY_OUTERMOST, false));
Sequence<ScanResultValue> baseSequence = baseRunner.run(QueryPlus.wrap(historicalQuery), responseContext);
this.yielder = baseSequence.toYielder(
null,
new YieldingAccumulator<ScanResultValue, ScanResultValue>()
{
@ -70,9 +92,15 @@ public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultVa
@Override
public ScanResultValue next()
{
if (ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR.equals(resultFormat)) {
throw new UOE(ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR + " is not supported yet");
}
// We want to perform multi-event ScanResultValue limiting if we are not time-ordering or are at the
// inner-level if we are time-ordering
if (query.getOrder() == ScanQuery.Order.NONE ||
!query.getContextBoolean(ScanQuery.CTX_KEY_OUTERMOST, true)) {
ScanResultValue batch = yielder.get();
if (ScanQuery.RESULT_FORMAT_COMPACTED_LIST.equals(resultFormat) ||
ScanQuery.RESULT_FORMAT_LIST.equals(resultFormat)) {
List events = (List) batch.getEvents();
if (events.size() <= limit - count) {
count += events.size();
@ -81,12 +109,26 @@ public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultVa
} else {
// last batch
// single batch length is <= Integer.MAX_VALUE, so this should not overflow
int left = (int) (limit - count);
int numLeft = (int) (limit - count);
count = limit;
return new ScanResultValue(batch.getSegmentId(), batch.getColumns(), events.subList(0, left));
return new ScanResultValue(batch.getSegmentId(), batch.getColumns(), events.subList(0, numLeft));
}
} else {
// Perform single-event ScanResultValue batching at the outer level. Each scan result value from the yielder
// in this case will only have one event so there's no need to iterate through events.
int batchSize = query.getBatchSize();
List<Object> eventsToAdd = new ArrayList<>(batchSize);
List<String> columns = new ArrayList<>();
while (eventsToAdd.size() < batchSize && !yielder.isDone() && count < limit) {
ScanResultValue srv = yielder.get();
// Only replace once using the columns from the first event
columns = columns.isEmpty() ? srv.getColumns() : columns;
eventsToAdd.add(Iterables.getOnlyElement((List<Object>) srv.getEvents()));
yielder = yielder.next(null);
count++;
}
return new ScanResultValue(null, columns, eventsToAdd);
}
throw new UnsupportedOperationException(ScanQuery.RESULT_FORMAT_VALUE_VECTOR + " is not supported yet");
}
@Override

View File

@ -25,7 +25,6 @@ import com.google.common.base.Functions;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.GenericQueryMetricsFactory;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryMetrics;
@ -34,8 +33,6 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import java.util.Map;
public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, ScanQuery>
{
private static final TypeReference<ScanResultValue> TYPE_REFERENCE = new TypeReference<ScanResultValue>()
@ -58,18 +55,12 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
@Override
public QueryRunner<ScanResultValue> mergeResults(final QueryRunner<ScanResultValue> runner)
{
return new QueryRunner<ScanResultValue>()
{
@Override
public Sequence<ScanResultValue> run(
final QueryPlus<ScanResultValue> queryPlus, final Map<String, Object> responseContext
)
{
return (queryPlus, responseContext) -> {
// Ensure "legacy" is a non-null value, such that all other nodes this query is forwarded to will treat it
// the same way, even if they have different default legacy values.
final ScanQuery scanQuery = ((ScanQuery) queryPlus.getQuery()).withNonNullLegacy(scanQueryConfig);
final ScanQuery scanQuery = ((ScanQuery) (queryPlus.getQuery()))
.withNonNullLegacy(scanQueryConfig);
final QueryPlus<ScanResultValue> queryPlusWithNonNullLegacy = queryPlus.withQuery(scanQuery);
if (scanQuery.getLimit() == Long.MAX_VALUE) {
return runner.run(queryPlusWithNonNullLegacy, responseContext);
}
@ -87,9 +78,7 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
{
CloseQuietly.close(iterFromMake);
}
}
);
}
});
};
}
@ -117,18 +106,13 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
@Override
public QueryRunner<ScanResultValue> preMergeQueryDecoration(final QueryRunner<ScanResultValue> runner)
{
return new QueryRunner<ScanResultValue>()
{
@Override
public Sequence<ScanResultValue> run(QueryPlus<ScanResultValue> queryPlus, Map<String, Object> responseContext)
{
return (queryPlus, responseContext) -> {
ScanQuery scanQuery = (ScanQuery) queryPlus.getQuery();
if (scanQuery.getFilter() != null) {
scanQuery = scanQuery.withDimFilter(scanQuery.getFilter().optimize());
queryPlus = queryPlus.withQuery(scanQuery);
}
return runner.run(queryPlus, responseContext);
}
};
}
}

View File

@ -19,22 +19,40 @@
package org.apache.druid.query.scan;
import com.google.common.base.Function;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.segment.Segment;
import org.joda.time.Interval;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValue, ScanQuery>
{
@ -44,15 +62,18 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
public static final String CTX_COUNT = "count";
private final ScanQueryQueryToolChest toolChest;
private final ScanQueryEngine engine;
private final ScanQueryConfig scanQueryConfig;
@Inject
public ScanQueryRunnerFactory(
ScanQueryQueryToolChest toolChest,
ScanQueryEngine engine
ScanQueryEngine engine,
ScanQueryConfig scanQueryConfig
)
{
this.toolChest = toolChest;
this.engine = engine;
this.scanQueryConfig = scanQueryConfig;
}
@Override
@ -68,34 +89,239 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
)
{
// in single thread and in jetty thread instead of processing thread
return new QueryRunner<ScanResultValue>()
{
@Override
public Sequence<ScanResultValue> run(
final QueryPlus<ScanResultValue> queryPlus, final Map<String, Object> responseContext
)
{
return (queryPlus, responseContext) -> {
ScanQuery query = (ScanQuery) queryPlus.getQuery();
// Note: this variable is effective only when queryContext has a timeout.
// See the comment of CTX_TIMEOUT_AT.
final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery());
responseContext.put(CTX_TIMEOUT_AT, timeoutAt);
return Sequences.concat(
if (query.getOrder().equals(ScanQuery.Order.NONE)) {
// Use normal strategy
Sequence<ScanResultValue> returnedRows = Sequences.concat(
Sequences.map(
Sequences.simple(queryRunners),
new Function<QueryRunner<ScanResultValue>, Sequence<ScanResultValue>>()
{
@Override
public Sequence<ScanResultValue> apply(final QueryRunner<ScanResultValue> input)
{
return input.run(queryPlus, responseContext);
}
}
input -> input.run(queryPlus, responseContext)
)
);
if (query.getLimit() <= Integer.MAX_VALUE) {
return returnedRows.limit(Math.toIntExact(query.getLimit()));
} else {
return returnedRows;
}
} else {
// Query segment spec must be an instance of MultipleSpecificSegmentSpec because segment descriptors need
// to be present for a 1:1 matching of intervals with query runners. The other types of segment spec condense
// the intervals (i.e. merge neighbouring intervals), eliminating the 1:1 relationship between intervals
// and query runners.
if (!(query.getQuerySegmentSpec() instanceof MultipleSpecificSegmentSpec)) {
throw new UOE("Time-ordering on scan queries is only supported for queries with segment specs"
+ "of type MultipleSpecificSegmentSpec");
}
// Ascending time order for both descriptors and query runners by default
List<SegmentDescriptor> descriptorsOrdered =
((MultipleSpecificSegmentSpec) query.getQuerySegmentSpec()).getDescriptors();
List<QueryRunner<ScanResultValue>> queryRunnersOrdered = Lists.newArrayList(queryRunners);
if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
descriptorsOrdered = Lists.reverse(descriptorsOrdered);
queryRunnersOrdered = Lists.reverse(queryRunnersOrdered);
}
if (query.getLimit() <= scanQueryConfig.getMaxRowsQueuedForOrdering()) {
// Use priority queue strategy
return priorityQueueSortAndLimit(
Sequences.concat(Sequences.map(
Sequences.simple(queryRunnersOrdered),
input -> input.run(queryPlus, responseContext)
)),
query,
descriptorsOrdered
);
} else {
Preconditions.checkState(
descriptorsOrdered.size() == queryRunnersOrdered.size(),
"Number of segment descriptors does not equal number of "
+ "query runners...something went wrong!"
);
// Combine the two lists of segment descriptors and query runners into a single list of
// segment descriptors - query runner pairs. This makes it easier to use stream operators.
List<Pair<SegmentDescriptor, QueryRunner<ScanResultValue>>> descriptorsAndRunnersOrdered = new ArrayList<>();
for (int i = 0; i < queryRunnersOrdered.size(); i++) {
descriptorsAndRunnersOrdered.add(new Pair<>(descriptorsOrdered.get(i), queryRunnersOrdered.get(i)));
}
// Group the list of pairs by interval. The LinkedHashMap will have an interval paired with a list of all the
// query runners for that segment
LinkedHashMap<Interval, List<Pair<SegmentDescriptor, QueryRunner<ScanResultValue>>>> partitionsGroupedByInterval =
descriptorsAndRunnersOrdered.stream()
.collect(Collectors.groupingBy(
x -> x.lhs.getInterval(),
LinkedHashMap::new,
Collectors.toList()
));
// Find the segment with the largest numbers of partitions. This will be used to compare with the
// maxSegmentPartitionsOrderedInMemory limit to determine if the query is at risk of consuming too much memory.
int maxNumPartitionsInSegment =
partitionsGroupedByInterval.values()
.stream()
.map(x -> x.size())
.max(Comparator.comparing(Integer::valueOf))
.get();
if (maxNumPartitionsInSegment <= scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory()) {
// Use n-way merge strategy
// Create a list of grouped runner lists (i.e. each sublist/"runner group" corresponds to an interval) ->
// there should be no interval overlap. We create a list of lists so we can create a sequence of sequences.
// There's no easy way to convert a LinkedHashMap to a sequence because it's non-iterable.
List<List<QueryRunner<ScanResultValue>>> groupedRunners =
partitionsGroupedByInterval.entrySet()
.stream()
.map(entry -> entry.getValue()
.stream()
.map(segQueryRunnerPair -> segQueryRunnerPair.rhs)
.collect(Collectors.toList()))
.collect(Collectors.toList());
return nWayMergeAndLimit(groupedRunners, queryPlus, responseContext);
}
throw new UOE(
"Time ordering for queries of %,d partitions per segment and a row limit of %,d is not supported."
+ " Try reducing the scope of the query to scan fewer partitions than the configurable limit of"
+ " %,d partitions or lower the row limit below %,d.",
maxNumPartitionsInSegment,
query.getLimit(),
scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory(),
scanQueryConfig.getMaxRowsQueuedForOrdering()
);
}
}
};
}
@VisibleForTesting
Sequence<ScanResultValue> priorityQueueSortAndLimit(
Sequence<ScanResultValue> inputSequence,
ScanQuery scanQuery,
List<SegmentDescriptor> descriptorsOrdered
)
{
Comparator<ScanResultValue> priorityQComparator = new ScanResultValueTimestampComparator(scanQuery);
if (scanQuery.getLimit() > Integer.MAX_VALUE) {
throw new UOE(
"Limit of %,d rows not supported for priority queue strategy of time-ordering scan results",
scanQuery.getLimit()
);
}
// Converting the limit from long to int could theoretically throw an ArithmeticException but this branch
// only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
int limit = Math.toIntExact(scanQuery.getLimit());
PriorityQueue<ScanResultValue> q = new PriorityQueue<>(limit, priorityQComparator);
Yielder<ScanResultValue> yielder = inputSequence.toYielder(
null,
new YieldingAccumulator<ScanResultValue, ScanResultValue>()
{
@Override
public ScanResultValue accumulate(ScanResultValue accumulated, ScanResultValue in)
{
yield();
return in;
}
}
);
boolean doneScanning = yielder.isDone();
// We need to scan limit elements and anything else in the last segment
int numRowsScanned = 0;
Interval finalInterval = null;
while (!doneScanning) {
ScanResultValue next = yielder.get();
List<ScanResultValue> singleEventScanResultValues = next.toSingleEventScanResultValues();
for (ScanResultValue srv : singleEventScanResultValues) {
numRowsScanned++;
// Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list
// needs to be preserved for queries using the compactedList result format
q.offer(srv);
if (q.size() > limit) {
q.poll();
}
// Finish scanning the interval containing the limit row
if (numRowsScanned > limit && finalInterval == null) {
long timestampOfLimitRow = srv.getFirstEventTimestamp(scanQuery.getResultFormat());
for (SegmentDescriptor descriptor : descriptorsOrdered) {
if (descriptor.getInterval().contains(timestampOfLimitRow)) {
finalInterval = descriptor.getInterval();
}
}
if (finalInterval == null) {
throw new ISE("WTH??? Row came from an unscanned interval?");
}
}
}
yielder = yielder.next(null);
doneScanning = yielder.isDone() ||
(finalInterval != null &&
!finalInterval.contains(next.getFirstEventTimestamp(scanQuery.getResultFormat())));
}
// Need to convert to a Deque because Priority Queue's iterator doesn't guarantee that the sorted order
// will be maintained. Deque was chosen over list because its addFirst is O(1).
final Deque<ScanResultValue> sortedElements = new ArrayDeque<>(q.size());
while (q.size() != 0) {
// addFirst is used since PriorityQueue#poll() dequeues the low-priority (timestamp-wise) events first.
sortedElements.addFirst(q.poll());
}
return Sequences.simple(sortedElements);
}
@VisibleForTesting
Sequence<ScanResultValue> nWayMergeAndLimit(
List<List<QueryRunner<ScanResultValue>>> groupedRunners,
QueryPlus<ScanResultValue> queryPlus,
Map<String, Object> responseContext
)
{
// Starting from the innermost Sequences.map:
// (1) Deaggregate each ScanResultValue returned by the query runners
// (2) Combine the deaggregated ScanResultValues into a single sequence
// (3) Create a sequence of results from each runner in the group and flatmerge based on timestamp
// (4) Create a sequence of results from each runner group
// (5) Join all the results into a single sequence
Sequence<ScanResultValue> resultSequence =
Sequences.concat(
Sequences.map(
Sequences.simple(groupedRunners),
runnerGroup ->
Sequences.map(
Sequences.simple(runnerGroup),
(input) -> Sequences.concat(
Sequences.map(
input.run(queryPlus, responseContext),
srv -> Sequences.simple(srv.toSingleEventScanResultValues())
)
)
).flatMerge(
seq -> seq,
Ordering.from(new ScanResultValueTimestampComparator(
(ScanQuery) queryPlus.getQuery()
)).reverse()
)
)
);
long limit = ((ScanQuery) (queryPlus.getQuery())).getLimit();
if (limit == Long.MAX_VALUE) {
return resultSequence;
}
return resultSequence.limit(limit);
}
@Override
public QueryToolChest<ScanResultValue, ScanQuery> getToolchest()
{

View File

@ -21,8 +21,14 @@ package org.apache.druid.query.scan;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.segment.column.ColumnHolder;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class ScanResultValue implements Comparable<ScanResultValue>
{
@ -40,7 +46,7 @@ public class ScanResultValue implements Comparable<ScanResultValue>
@JsonCreator
public ScanResultValue(
@JsonProperty("segmentId") String segmentId,
@Nullable @JsonProperty("segmentId") String segmentId,
@JsonProperty("columns") List<String> columns,
@JsonProperty("events") Object events
)
@ -50,6 +56,7 @@ public class ScanResultValue implements Comparable<ScanResultValue>
this.events = events;
}
@Nullable
@JsonProperty
public String getSegmentId()
{
@ -68,6 +75,29 @@ public class ScanResultValue implements Comparable<ScanResultValue>
return events;
}
public long getFirstEventTimestamp(ScanQuery.ResultFormat resultFormat)
{
if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) {
return (Long) ((Map<String, Object>) ((List<Object>) this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME);
} else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) {
int timeColumnIndex = this.getColumns().indexOf(ColumnHolder.TIME_COLUMN_NAME);
List<Object> firstEvent = (List<Object>) ((List<Object>) this.getEvents()).get(0);
return (Long) firstEvent.get(timeColumnIndex);
}
throw new UOE("Unable to get first event timestamp using result format of [%s]", resultFormat.toString());
}
public List<ScanResultValue> toSingleEventScanResultValues()
{
List<ScanResultValue> singleEventScanResultValues = new ArrayList<>();
List<Object> events = (List<Object>) this.getEvents();
for (Object event : events) {
singleEventScanResultValues.add(new ScanResultValue(segmentId, columns, Collections.singletonList(event)));
}
return singleEventScanResultValues;
}
@Override
public boolean equals(Object o)
{

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.scan;
import com.google.common.primitives.Longs;
import java.util.Comparator;
/**
* This comparator class supports comparisons of ScanResultValues based on the timestamp of their first event. Since
* only the first event is looked at, this Comparator is especially useful for unbatched ScanResultValues (such as in
* {@link ScanQueryQueryToolChest#mergeResults(org.apache.druid.query.QueryRunner <ScanResultValue>)}. The comparator takes a scanQuery as
* a parameter so that it knows the result format (list or compactedList) of Object ScanResultValue#events. It uses
* this result format to perform a bunch of type casts on the object to get the timestamp then compares the timestamps.
*/
public class ScanResultValueTimestampComparator implements Comparator<ScanResultValue>
{
private final ScanQuery scanQuery;
public ScanResultValueTimestampComparator(ScanQuery scanQuery)
{
this.scanQuery = scanQuery;
}
@Override
public int compare(ScanResultValue o1, ScanResultValue o2)
{
int comparison;
comparison = Longs.compare(
o1.getFirstEventTimestamp(scanQuery.getResultFormat()),
o2.getFirstEventTimestamp(scanQuery.getResultFormat()));
if (scanQuery.getOrder().equals(ScanQuery.Order.DESCENDING)) {
return comparison;
}
return comparison * -1;
}
}

View File

@ -93,7 +93,8 @@ public class DoubleStorageTest
private static final ScanQueryRunnerFactory SCAN_QUERY_RUNNER_FACTORY = new ScanQueryRunnerFactory(
scanQueryQueryToolChest,
new ScanQueryEngine()
new ScanQueryEngine(),
new ScanQueryConfig()
);
private Druids.ScanQueryBuilder newTestQuery()

View File

@ -73,7 +73,8 @@ public class MultiSegmentScanQueryTest
private static final QueryRunnerFactory<ScanResultValue, ScanQuery> factory = new ScanQueryRunnerFactory(
toolChest,
new ScanQueryEngine()
new ScanQueryEngine(),
new ScanQueryConfig()
);
// time modified version of druid.sample.numeric.tsv

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.scan;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Test;
public class ScanQueryConfigTest
{
private final ObjectMapper MAPPER = new DefaultObjectMapper();
private final ImmutableMap<String, String> CONFIG_MAP = ImmutableMap
.<String, String>builder()
.put("maxSegmentPartitionsOrderedInMemory", "1")
.put("maxRowsQueuedForOrdering", "1")
.put("legacy", "true")
.build();
private final ImmutableMap<String, String> CONFIG_MAP2 = ImmutableMap
.<String, String>builder()
.put("legacy", "false")
.put("maxSegmentPartitionsOrderedInMemory", "42")
.build();
private final ImmutableMap<String, String> CONFIG_MAP_EMPTY = ImmutableMap
.<String, String>builder()
.build();
@Test
public void testSerde()
{
final ScanQueryConfig config = MAPPER.convertValue(CONFIG_MAP, ScanQueryConfig.class);
Assert.assertEquals(1, config.getMaxRowsQueuedForOrdering());
Assert.assertEquals(1, config.getMaxSegmentPartitionsOrderedInMemory());
Assert.assertTrue(config.isLegacy());
final ScanQueryConfig config2 = MAPPER.convertValue(CONFIG_MAP2, ScanQueryConfig.class);
Assert.assertEquals(100000, config2.getMaxRowsQueuedForOrdering());
Assert.assertEquals(42, config2.getMaxSegmentPartitionsOrderedInMemory());
Assert.assertFalse(config2.isLegacy());
final ScanQueryConfig config3 = MAPPER.convertValue(CONFIG_MAP_EMPTY, ScanQueryConfig.class);
Assert.assertEquals(100000, config3.getMaxRowsQueuedForOrdering());
Assert.assertEquals(50, config3.getMaxSegmentPartitionsOrderedInMemory());
Assert.assertFalse(config3.isLegacy());
}
}

View File

@ -0,0 +1,211 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.scan;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
@RunWith(Parameterized.class)
public class ScanQueryLimitRowIteratorTest
{
private static final int NUM_ELEMENTS = 1000;
private static int batchSize;
private static int limit;
private static List<ScanResultValue> singleEventScanResultValues = new ArrayList<>();
private static List<ScanResultValue> multiEventScanResultValues = new ArrayList<>();
private static final ScanQuery.ResultFormat resultFormat = ScanQuery.ResultFormat.RESULT_FORMAT_LIST;
public ScanQueryLimitRowIteratorTest(
final int batchSize,
final int limit
)
{
this.batchSize = batchSize;
this.limit = limit;
}
@Parameterized.Parameters(name = "{0} {1}")
public static Iterable<Object[]> constructorFeeder()
{
List<Integer> batchSizes = ImmutableList.of(1, 33);
List<Integer> limits = ImmutableList.of(3, 10000);
return QueryRunnerTestHelper.cartesian(
batchSizes,
limits
);
}
@Before
public void setup()
{
singleEventScanResultValues = new ArrayList<>();
multiEventScanResultValues = new ArrayList<>();
for (int i = 0; i < NUM_ELEMENTS; i++) {
singleEventScanResultValues.add(
ScanQueryTestHelper.generateScanResultValue(
ThreadLocalRandom.current().nextLong(),
resultFormat,
1
));
}
for (int i = 0; i < NUM_ELEMENTS / batchSize; i++) {
multiEventScanResultValues.add(
ScanQueryTestHelper.generateScanResultValue(
ThreadLocalRandom.current().nextLong(),
resultFormat,
batchSize
));
}
multiEventScanResultValues.add(
ScanQueryTestHelper.generateScanResultValue(
ThreadLocalRandom.current().nextLong(),
resultFormat,
NUM_ELEMENTS % batchSize
));
}
/**
* Expect no batching to occur and limit to be applied
*/
@Test
public void testNonOrderedScan()
{
ScanQuery query = Druids.newScanQueryBuilder()
.limit(limit)
.order(ScanQuery.Order.NONE)
.dataSource("some datasource")
.batchSize(batchSize)
.intervals(QueryRunnerTestHelper.fullOnIntervalSpec)
.resultFormat(resultFormat)
.context(ImmutableMap.of(ScanQuery.CTX_KEY_OUTERMOST, false))
.build();
QueryPlus<ScanResultValue> queryPlus = QueryPlus.wrap(query);
ScanQueryLimitRowIterator itr = new ScanQueryLimitRowIterator(
((queryInput, responseContext) -> Sequences.simple(multiEventScanResultValues)),
queryPlus,
ImmutableMap.of()
);
int count = 0;
int expectedNumRows = Math.min(limit, NUM_ELEMENTS);
while (itr.hasNext()) {
ScanResultValue curr = itr.next();
List<Map<String, Object>> events = ScanQueryTestHelper.getEventsListResultFormat(curr);
if (events.size() != batchSize) {
if (expectedNumRows - count > batchSize) {
Assert.fail("Batch size is incorrect");
} else {
Assert.assertEquals(expectedNumRows - count, events.size());
}
}
count += events.size();
}
Assert.assertEquals(expectedNumRows, count);
}
/**
* Expect batching to occur and limit to be applied on the Broker. Input from Historical
* is a sequence of single-event ScanResultValues.
*/
@Test
public void testBrokerOrderedScan()
{
ScanQuery query = Druids.newScanQueryBuilder()
.limit(limit)
.order(ScanQuery.Order.DESCENDING)
.dataSource("some datasource")
.batchSize(batchSize)
.intervals(QueryRunnerTestHelper.fullOnIntervalSpec)
.resultFormat(resultFormat)
.build();
QueryPlus<ScanResultValue> queryPlus = QueryPlus.wrap(query);
ScanQueryLimitRowIterator itr = new ScanQueryLimitRowIterator(
((queryInput, responseContext) -> Sequences.simple(singleEventScanResultValues)),
queryPlus,
ImmutableMap.of()
);
int count = 0;
int expectedNumRows = Math.min(limit, NUM_ELEMENTS);
while (itr.hasNext()) {
ScanResultValue curr = itr.next();
List<Map<String, Object>> events = ScanQueryTestHelper.getEventsListResultFormat(curr);
if (events.size() != batchSize) {
if (expectedNumRows - count >= batchSize) {
Assert.fail("Batch size is incorrect");
} else {
Assert.assertEquals(expectedNumRows - count, events.size());
}
}
count += events.size();
}
Assert.assertEquals(expectedNumRows, count);
}
/**
* Expect no batching to occur and limit to be applied. Input is a sequence of sorted single-event ScanResultValues
* (unbatching and sorting occurs in ScanQueryRunnerFactory#mergeRunners()).
*/
@Test
public void testHistoricalOrderedScan()
{
ScanQuery query = Druids.newScanQueryBuilder()
.limit(limit)
.order(ScanQuery.Order.DESCENDING)
.dataSource("some datasource")
.batchSize(batchSize)
.intervals(QueryRunnerTestHelper.fullOnIntervalSpec)
.resultFormat(resultFormat)
.context(ImmutableMap.of(ScanQuery.CTX_KEY_OUTERMOST, false))
.build();
QueryPlus<ScanResultValue> queryPlus = QueryPlus.wrap(query);
ScanQueryLimitRowIterator itr = new ScanQueryLimitRowIterator(
((queryInput, responseContext) -> Sequences.simple(singleEventScanResultValues)),
queryPlus,
ImmutableMap.of()
);
int count = 0;
int expectedNumRows = Math.min(limit, NUM_ELEMENTS);
while (itr.hasNext()) {
ScanResultValue curr = itr.next();
List<Map<String, Object>> events = ScanQueryTestHelper.getEventsListResultFormat(curr);
Assert.assertEquals(1, events.size());
count += events.size();
}
Assert.assertEquals(expectedNumRows, count);
}
}

View File

@ -0,0 +1,258 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.scan;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.SegmentDescriptor;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@RunWith(Parameterized.class)
public class ScanQueryRunnerFactoryTest
{
private int numElements;
private ScanQuery query;
private ScanQuery.ResultFormat resultFormat;
private static final ScanQueryRunnerFactory factory = new ScanQueryRunnerFactory(
new ScanQueryQueryToolChest(
new ScanQueryConfig(),
DefaultGenericQueryMetricsFactory.instance()
),
new ScanQueryEngine(),
new ScanQueryConfig()
);
public ScanQueryRunnerFactoryTest(
final int numElements,
final int batchSize,
final long limit,
final ScanQuery.ResultFormat resultFormat,
final ScanQuery.Order order
)
{
this.numElements = numElements;
this.query = Druids.newScanQueryBuilder()
.batchSize(batchSize)
.limit(limit)
.order(order)
.intervals(QueryRunnerTestHelper.fullOnIntervalSpec)
.dataSource("some datasource")
.resultFormat(resultFormat)
.build();
this.resultFormat = resultFormat;
}
@Parameterized.Parameters(name = "{0} {1} {2} {3} {4}")
public static Iterable<Object[]> constructorFeeder()
{
List<Integer> numsElements = ImmutableList.of(0, 10, 100);
List<Integer> batchSizes = ImmutableList.of(1, 100);
List<Long> limits = ImmutableList.of(3L, 1000L, Long.MAX_VALUE);
List<ScanQuery.ResultFormat> resultFormats = ImmutableList.of(
ScanQuery.ResultFormat.RESULT_FORMAT_LIST,
ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST
);
List<ScanQuery.Order> order = ImmutableList.of(
ScanQuery.Order.ASCENDING,
ScanQuery.Order.DESCENDING
);
return QueryRunnerTestHelper.cartesian(
numsElements,
batchSizes,
limits,
resultFormats,
order
);
}
@Test
public void testSortAndLimitScanResultValues()
{
List<ScanResultValue> srvs = new ArrayList<>(numElements);
List<Long> expectedEventTimestamps = new ArrayList<>();
for (int i = 0; i < numElements; i++) {
long timestamp = DateTimes.of("2015-01-01").plusHours(i).getMillis();
expectedEventTimestamps.add(timestamp);
srvs.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1));
}
expectedEventTimestamps.sort((o1, o2) -> {
int retVal = 0;
if (o1 > o2) {
retVal = 1;
} else if (o1 < o2) {
retVal = -1;
}
if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
return retVal * -1;
}
return retVal;
});
Sequence<ScanResultValue> inputSequence = Sequences.simple(srvs);
try {
List<ScanResultValue> output = factory.priorityQueueSortAndLimit(
inputSequence,
query,
ImmutableList.of(new SegmentDescriptor(new Interval(
DateTimes.of("2010-01-01"),
DateTimes.of("2019-01-01").plusHours(1)
), "1", 0))
).toList();
if (query.getLimit() > Integer.MAX_VALUE) {
Assert.fail("Unsupported exception should have been thrown due to high limit");
}
validateSortedOutput(output, expectedEventTimestamps);
}
catch (UOE e) {
if (query.getLimit() <= Integer.MAX_VALUE) {
Assert.fail("Unsupported operation exception should not have been thrown here");
}
}
}
@Test
public void testNWayMerge()
{
List<Long> expectedEventTimestamps = new ArrayList<>(numElements * 3);
List<ScanResultValue> scanResultValues1 = new ArrayList<>(numElements);
for (int i = 0; i < numElements; i++) {
long timestamp = DateTimes.of("2015-01-01").plusMinutes(i * 2).getMillis();
expectedEventTimestamps.add(timestamp);
scanResultValues1.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1));
}
List<ScanResultValue> scanResultValues2 = new ArrayList<>(numElements);
for (int i = 0; i < numElements; i++) {
long timestamp = DateTimes.of("2015-01-01").plusMinutes(i * 2 + 1).getMillis();
expectedEventTimestamps.add(timestamp);
scanResultValues2.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1));
}
List<ScanResultValue> scanResultValues3 = new ArrayList<>(numElements);
for (int i = 0; i < numElements; i++) {
long timestamp = DateTimes.of("2015-01-02").plusMinutes(i).getMillis();
expectedEventTimestamps.add(timestamp);
scanResultValues3.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1));
}
if (query.getOrder() == ScanQuery.Order.DESCENDING) {
Collections.reverse(scanResultValues1);
Collections.reverse(scanResultValues2);
Collections.reverse(scanResultValues3);
}
QueryRunner<ScanResultValue> runnerSegment1Partition1 =
(queryPlus, responseContext) -> Sequences.simple(scanResultValues1);
QueryRunner<ScanResultValue> runnerSegment1Partition2 =
(queryPlus, responseContext) -> Sequences.simple(scanResultValues2);
QueryRunner<ScanResultValue> runnerSegment2Partition1 =
(queryPlus, responseContext) -> Sequences.simple(scanResultValues3);
QueryRunner<ScanResultValue> runnerSegment2Partition2 =
(queryPlus, responseContext) -> Sequences.empty();
List<List<QueryRunner<ScanResultValue>>> groupedRunners = new ArrayList<>(2);
if (query.getOrder() == ScanQuery.Order.DESCENDING) {
groupedRunners.add(Arrays.asList(runnerSegment2Partition1, runnerSegment2Partition2));
groupedRunners.add(Arrays.asList(runnerSegment1Partition1, runnerSegment1Partition2));
} else {
groupedRunners.add(Arrays.asList(runnerSegment1Partition1, runnerSegment1Partition2));
groupedRunners.add(Arrays.asList(runnerSegment2Partition1, runnerSegment2Partition2));
}
expectedEventTimestamps.sort((o1, o2) -> {
int retVal = 0;
if (o1 > o2) {
retVal = 1;
} else if (o1 < o2) {
retVal = -1;
}
if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
return retVal * -1;
}
return retVal;
});
List<ScanResultValue> output =
factory.nWayMergeAndLimit(
groupedRunners,
QueryPlus.wrap(query),
ImmutableMap.of()
).toList();
validateSortedOutput(output, expectedEventTimestamps);
}
private void validateSortedOutput(List<ScanResultValue> output, List<Long> expectedEventTimestamps)
{
// check each scan result value has one event
for (ScanResultValue srv : output) {
if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) {
Assert.assertTrue(ScanQueryTestHelper.getEventsCompactedListResultFormat(srv).size() == 1);
} else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) {
Assert.assertTrue(ScanQueryTestHelper.getEventsListResultFormat(srv).size() == 1);
}
}
// check total # of rows <= limit
Assert.assertTrue(output.size() <= query.getLimit());
// check ordering is correct
for (int i = 1; i < output.size(); i++) {
if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
Assert.assertTrue(output.get(i).getFirstEventTimestamp(resultFormat) <
output.get(i - 1).getFirstEventTimestamp(resultFormat));
} else {
Assert.assertTrue(output.get(i).getFirstEventTimestamp(resultFormat) >
output.get(i - 1).getFirstEventTimestamp(resultFormat));
}
}
// check the values are correct
for (int i = 0; i < query.getLimit() && i < output.size(); i++) {
Assert.assertEquals((long) expectedEventTimestamps.get(i), output.get(i).getFirstEventTimestamp(resultFormat));
}
}
}

View File

@ -21,11 +21,13 @@ package org.apache.druid.query.scan;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.ObjectArrays;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import org.apache.commons.lang.ArrayUtils;
import org.apache.druid.hll.HyperLogLogCollector;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
@ -121,7 +123,8 @@ public class ScanQueryRunnerTest
QueryRunnerTestHelper.makeQueryRunners(
new ScanQueryRunnerFactory(
toolChest,
new ScanQueryEngine()
new ScanQueryEngine(),
new ScanQueryConfig()
)
),
ImmutableList.of(false, true)
@ -215,7 +218,7 @@ public class ScanQueryRunnerTest
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.virtualColumns(EXPR_COLUMN)
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.build();
HashMap<String, Object> context = new HashMap<String, Object>();
@ -235,7 +238,11 @@ public class ScanQueryRunnerTest
{
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.columns(ColumnHolder.TIME_COLUMN_NAME, QueryRunnerTestHelper.marketDimension, QueryRunnerTestHelper.indexMetric)
.columns(
ColumnHolder.TIME_COLUMN_NAME,
QueryRunnerTestHelper.marketDimension,
QueryRunnerTestHelper.indexMetric
)
.build();
HashMap<String, Object> context = new HashMap<String, Object>();
@ -317,7 +324,7 @@ public class ScanQueryRunnerTest
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.columns(QueryRunnerTestHelper.marketDimension, QueryRunnerTestHelper.indexMetric)
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.build();
HashMap<String, Object> context = new HashMap<String, Object>();
@ -509,6 +516,251 @@ public class ScanQueryRunnerTest
verify(expectedResults, results);
}
@Test
public void testFullOnSelectWithFilterLimitAndAscendingTimeOrderingListFormat()
{
// limits shouldn't matter -> all rows should be returned if time-ordering on the broker is occurring
for (int limit : new int[]{3, 1, 5, 7, 0}) {
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null))
.columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric)
.limit(limit)
.order(ScanQuery.Order.ASCENDING)
.context(ImmutableMap.of(ScanQuery.CTX_KEY_OUTERMOST, false))
.build();
HashMap<String, Object> context = new HashMap<>();
Iterable<ScanResultValue> results = runner.run(QueryPlus.wrap(query), context).toList();
String[] seg1Results = new String[]{
"2011-01-12T00:00:00.000Z\tspot\tautomotive\tpreferred\tapreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tbusiness\tpreferred\tbpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tentertainment\tpreferred\tepreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\thealth\tpreferred\thpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tmezzanine\tpreferred\tmpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tnews\tpreferred\tnpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tpremium\tpreferred\tppreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\ttechnology\tpreferred\ttpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\ttravel\tpreferred\ttpreferred\t100.000000"
};
String[] seg2Results = new String[]{
"2011-01-13T00:00:00.000Z\tspot\tautomotive\tpreferred\tapreferred\t94.874713",
"2011-01-13T00:00:00.000Z\tspot\tbusiness\tpreferred\tbpreferred\t103.629399",
"2011-01-13T00:00:00.000Z\tspot\tentertainment\tpreferred\tepreferred\t110.087299",
"2011-01-13T00:00:00.000Z\tspot\thealth\tpreferred\thpreferred\t114.947403",
"2011-01-13T00:00:00.000Z\tspot\tmezzanine\tpreferred\tmpreferred\t104.465767",
"2011-01-13T00:00:00.000Z\tspot\tnews\tpreferred\tnpreferred\t102.851683",
"2011-01-13T00:00:00.000Z\tspot\tpremium\tpreferred\tppreferred\t108.863011",
"2011-01-13T00:00:00.000Z\tspot\ttechnology\tpreferred\ttpreferred\t111.356672",
"2011-01-13T00:00:00.000Z\tspot\ttravel\tpreferred\ttpreferred\t106.236928"
};
final List<List<Map<String, Object>>> ascendingEvents = toEvents(
new String[]{
legacy ? getTimestampName() + ":TIME" : null,
null,
QueryRunnerTestHelper.qualityDimension + ":STRING",
null,
null,
QueryRunnerTestHelper.indexMetric + ":DOUBLE"
},
(String[]) ArrayUtils.addAll(seg1Results, seg2Results)
);
List<ScanResultValue> ascendingExpectedResults = toExpected(
ascendingEvents,
legacy ? Lists.newArrayList(getTimestampName(), "quality", "index") : Lists.newArrayList("quality", "index"),
0,
limit
);
verify(ascendingExpectedResults, results);
}
}
@Test
public void testFullOnSelectWithFilterLimitAndDescendingTimeOrderingListFormat()
{
// limits shouldn't matter -> all rows should be returned if time-ordering on the broker is occurring
for (int limit : new int[]{3, 1, 5, 7, 0}) {
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null))
.columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric)
.limit(limit)
.order(ScanQuery.Order.DESCENDING)
.build();
HashMap<String, Object> context = new HashMap<>();
Iterable<ScanResultValue> results = runner.run(QueryPlus.wrap(query), context).toList();
String[] seg1Results = new String[]{
"2011-01-12T00:00:00.000Z\tspot\tautomotive\tpreferred\tapreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tbusiness\tpreferred\tbpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tentertainment\tpreferred\tepreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\thealth\tpreferred\thpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tmezzanine\tpreferred\tmpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tnews\tpreferred\tnpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tpremium\tpreferred\tppreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\ttechnology\tpreferred\ttpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\ttravel\tpreferred\ttpreferred\t100.000000"
};
String[] seg2Results = new String[]{
"2011-01-13T00:00:00.000Z\tspot\tautomotive\tpreferred\tapreferred\t94.874713",
"2011-01-13T00:00:00.000Z\tspot\tbusiness\tpreferred\tbpreferred\t103.629399",
"2011-01-13T00:00:00.000Z\tspot\tentertainment\tpreferred\tepreferred\t110.087299",
"2011-01-13T00:00:00.000Z\tspot\thealth\tpreferred\thpreferred\t114.947403",
"2011-01-13T00:00:00.000Z\tspot\tmezzanine\tpreferred\tmpreferred\t104.465767",
"2011-01-13T00:00:00.000Z\tspot\tnews\tpreferred\tnpreferred\t102.851683",
"2011-01-13T00:00:00.000Z\tspot\tpremium\tpreferred\tppreferred\t108.863011",
"2011-01-13T00:00:00.000Z\tspot\ttechnology\tpreferred\ttpreferred\t111.356672",
"2011-01-13T00:00:00.000Z\tspot\ttravel\tpreferred\ttpreferred\t106.236928"
};
String[] expectedRet = (String[]) ArrayUtils.addAll(seg1Results, seg2Results);
ArrayUtils.reverse(expectedRet);
final List<List<Map<String, Object>>> descendingEvents = toEvents(
new String[]{
legacy ? getTimestampName() + ":TIME" : null,
null,
QueryRunnerTestHelper.qualityDimension + ":STRING",
null,
null,
QueryRunnerTestHelper.indexMetric + ":DOUBLE"
},
expectedRet
);
List<ScanResultValue> descendingExpectedResults = toExpected(
descendingEvents,
legacy ? Lists.newArrayList(getTimestampName(), "quality", "index") : Lists.newArrayList("quality", "index"),
0,
limit
);
verify(descendingExpectedResults, results);
}
}
@Test
public void testFullOnSelectWithFilterLimitAndAscendingTimeOrderingCompactedListFormat()
{
String[] seg1Results = new String[]{
"2011-01-12T00:00:00.000Z\tspot\tautomotive\tpreferred\tapreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tbusiness\tpreferred\tbpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tentertainment\tpreferred\tepreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\thealth\tpreferred\thpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tmezzanine\tpreferred\tmpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tnews\tpreferred\tnpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tpremium\tpreferred\tppreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\ttechnology\tpreferred\ttpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\ttravel\tpreferred\ttpreferred\t100.000000"
};
String[] seg2Results = new String[]{
"2011-01-13T00:00:00.000Z\tspot\tautomotive\tpreferred\tapreferred\t94.874713",
"2011-01-13T00:00:00.000Z\tspot\tbusiness\tpreferred\tbpreferred\t103.629399",
"2011-01-13T00:00:00.000Z\tspot\tentertainment\tpreferred\tepreferred\t110.087299",
"2011-01-13T00:00:00.000Z\tspot\thealth\tpreferred\thpreferred\t114.947403",
"2011-01-13T00:00:00.000Z\tspot\tmezzanine\tpreferred\tmpreferred\t104.465767",
"2011-01-13T00:00:00.000Z\tspot\tnews\tpreferred\tnpreferred\t102.851683",
"2011-01-13T00:00:00.000Z\tspot\tpremium\tpreferred\tppreferred\t108.863011",
"2011-01-13T00:00:00.000Z\tspot\ttechnology\tpreferred\ttpreferred\t111.356672",
"2011-01-13T00:00:00.000Z\tspot\ttravel\tpreferred\ttpreferred\t106.236928"
};
// limits shouldn't matter -> all rows should be returned if time-ordering on the broker is occurring
for (int limit : new int[]{3, 0}) {
/* Ascending */
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null))
.columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.order(ScanQuery.Order.ASCENDING)
.limit(limit)
.build();
HashMap<String, Object> context = new HashMap<>();
Iterable<ScanResultValue> results = runner.run(QueryPlus.wrap(query), context).toList();
final List<List<Map<String, Object>>> ascendingEvents = toEvents(
new String[]{
legacy ? getTimestampName() + ":TIME" : null,
null,
QueryRunnerTestHelper.qualityDimension + ":STRING",
null,
null,
QueryRunnerTestHelper.indexMetric + ":DOUBLE"
},
(String[]) ArrayUtils.addAll(seg1Results, seg2Results)
);
List<ScanResultValue> ascendingExpectedResults = toExpected(
ascendingEvents,
legacy ? Lists.newArrayList(getTimestampName(), "quality", "index") : Lists.newArrayList("quality", "index"),
0,
limit
);
results = compactedListToRow(results);
verify(ascendingExpectedResults, results);
}
}
@Test
public void testFullOnSelectWithFilterLimitAndDescendingTimeOrderingCompactedListFormat()
{
String[] seg1Results = new String[]{
"2011-01-12T00:00:00.000Z\tspot\tautomotive\tpreferred\tapreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tbusiness\tpreferred\tbpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tentertainment\tpreferred\tepreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\thealth\tpreferred\thpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tmezzanine\tpreferred\tmpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tnews\tpreferred\tnpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tpremium\tpreferred\tppreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\ttechnology\tpreferred\ttpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\ttravel\tpreferred\ttpreferred\t100.000000"
};
String[] seg2Results = new String[]{
"2011-01-13T00:00:00.000Z\tspot\tautomotive\tpreferred\tapreferred\t94.874713",
"2011-01-13T00:00:00.000Z\tspot\tbusiness\tpreferred\tbpreferred\t103.629399",
"2011-01-13T00:00:00.000Z\tspot\tentertainment\tpreferred\tepreferred\t110.087299",
"2011-01-13T00:00:00.000Z\tspot\thealth\tpreferred\thpreferred\t114.947403",
"2011-01-13T00:00:00.000Z\tspot\tmezzanine\tpreferred\tmpreferred\t104.465767",
"2011-01-13T00:00:00.000Z\tspot\tnews\tpreferred\tnpreferred\t102.851683",
"2011-01-13T00:00:00.000Z\tspot\tpremium\tpreferred\tppreferred\t108.863011",
"2011-01-13T00:00:00.000Z\tspot\ttechnology\tpreferred\ttpreferred\t111.356672",
"2011-01-13T00:00:00.000Z\tspot\ttravel\tpreferred\ttpreferred\t106.236928"
};
// limits shouldn't matter -> all rows should be returned if time-ordering on the broker is occurring
for (int limit : new int[]{3, 1}) {
/* Descending */
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null))
.columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.order(ScanQuery.Order.DESCENDING)
.context(ImmutableMap.of(ScanQuery.CTX_KEY_OUTERMOST, false))
.limit(limit)
.build();
HashMap<String, Object> context = new HashMap<>();
Iterable<ScanResultValue> results = runner.run(QueryPlus.wrap(query), context).toList();
String[] expectedRet = (String[]) ArrayUtils.addAll(seg1Results, seg2Results);
ArrayUtils.reverse(expectedRet);
final List<List<Map<String, Object>>> descendingEvents = toEvents(
new String[]{
legacy ? getTimestampName() + ":TIME" : null,
null,
QueryRunnerTestHelper.qualityDimension + ":STRING",
null,
null,
QueryRunnerTestHelper.indexMetric + ":DOUBLE"
},
expectedRet //segments in reverse order from above
);
List<ScanResultValue> descendingExpectedResults = toExpected(
descendingEvents,
legacy ? Lists.newArrayList(getTimestampName(), "quality", "index") : Lists.newArrayList("quality", "index"),
0,
limit
);
results = compactedListToRow(results);
verify(descendingExpectedResults, results);
}
}
private List<List<Map<String, Object>>> toFullEvents(final String[]... valueSet)
{
return toEvents(
@ -687,7 +939,12 @@ public class ScanQueryRunnerTest
Object exValue = ex.getValue();
if (exValue instanceof Double || exValue instanceof Float) {
final double expectedDoubleValue = ((Number) exValue).doubleValue();
Assert.assertEquals("invalid value for " + ex.getKey(), expectedDoubleValue, ((Number) actVal).doubleValue(), expectedDoubleValue * 1e-6);
Assert.assertEquals(
"invalid value for " + ex.getKey(),
expectedDoubleValue,
((Number) actVal).doubleValue(),
expectedDoubleValue * 1e-6
);
} else {
Assert.assertEquals("invalid value for " + ex.getKey(), ex.getValue(), actVal);
}

View File

@ -53,6 +53,7 @@ public class ScanQuerySpecTest
+ "\"resultFormat\":\"list\","
+ "\"batchSize\":20480,"
+ "\"limit\":3,"
+ "\"order\":\"none\","
+ "\"filter\":null,"
+ "\"columns\":[\"market\",\"quality\",\"index\"],"
+ "\"legacy\":null,"
@ -64,9 +65,10 @@ public class ScanQuerySpecTest
new TableDataSource(QueryRunnerTestHelper.dataSource),
new LegacySegmentSpec(Intervals.of("2011-01-12/2011-01-14")),
VirtualColumns.EMPTY,
null,
ScanQuery.ResultFormat.RESULT_FORMAT_LIST,
0,
3,
ScanQuery.Order.NONE,
null,
Arrays.asList("market", "quality", "index"),
null,

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.scan;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.segment.column.ColumnHolder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ScanQueryTestHelper
{
public static ScanResultValue generateScanResultValue(
long timestamp,
ScanQuery.ResultFormat resultFormat,
int batchSize
)
{
String segmentId = "some_segment_id";
List<String> columns = new ArrayList<>(Arrays.asList(ColumnHolder.TIME_COLUMN_NAME, "name", "count"));
List<Object> events = new ArrayList<>();
for (int i = 0; i < batchSize; i++) {
Object event;
if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) {
Map<String, Object> eventMap = new HashMap<>();
eventMap.put(ColumnHolder.TIME_COLUMN_NAME, timestamp);
eventMap.put("name", "Feridun");
eventMap.put("count", i);
event = eventMap;
} else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) {
event = new ArrayList<>(Arrays.asList(
timestamp,
"Feridun",
i
));
} else {
throw new UOE("Result format [%s] not supported yet", resultFormat.toString());
}
events.add(event);
}
return new ScanResultValue(segmentId, columns, events);
}
public static List<Map<String, Object>> getEventsListResultFormat(ScanResultValue scanResultValue)
{
return (List<Map<String, Object>>) scanResultValue.getEvents();
}
public static List<List<Object>> getEventsCompactedListResultFormat(ScanResultValue scanResultValue)
{
return (List<List<Object>>) scanResultValue.getEvents();
}
}

View File

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.scan;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.column.ColumnHolder;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ScanResultValueTest
{
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
private static final long TIME_1 = 1234567890000L;
private static final long TIME_2 = 9876543210000L;
private static ScanResultValue compactedListSRV;
private static ScanResultValue listSRV;
@BeforeClass
public static void setup()
{
String segmentId = "some_segment_id";
List<String> columns = new ArrayList<>(Arrays.asList(ColumnHolder.TIME_COLUMN_NAME, "name", "count"));
List<Object> event = new ArrayList<>(Arrays.asList(
TIME_1,
"Feridun",
4
));
List<Object> event2 = new ArrayList<>(Arrays.asList(
TIME_2,
"Justin",
6
));
List<List<Object>> events = Arrays.asList(event, event2);
compactedListSRV = new ScanResultValue(segmentId, columns, events);
Map<String, Object> eventMap1 = new HashMap<>();
eventMap1.put(ColumnHolder.TIME_COLUMN_NAME, TIME_1);
eventMap1.put("name", "Feridun");
eventMap1.put("count", 4);
Map<String, Object> eventMap2 = new HashMap<>();
eventMap2.put(ColumnHolder.TIME_COLUMN_NAME, TIME_2);
eventMap2.put("name", "Justin");
eventMap2.put("count", 6);
List<Map<String, Object>> eventMaps = Arrays.asList(eventMap1, eventMap2);
listSRV = new ScanResultValue(segmentId, columns, eventMaps);
}
@Test
public void testSerdeScanResultValueCompactedList() throws IOException
{
String serialized = JSON_MAPPER.writeValueAsString(compactedListSRV);
ScanResultValue deserialized = JSON_MAPPER.readValue(serialized, ScanResultValue.class);
Assert.assertEquals(compactedListSRV, deserialized);
}
@Test
public void testSerdeScanResultValueNonCompactedList() throws IOException
{
String serialized = JSON_MAPPER.writeValueAsString(listSRV);
ScanResultValue deserialized = JSON_MAPPER.readValue(serialized, ScanResultValue.class);
Assert.assertEquals(listSRV, deserialized);
}
@Test
public void testGetFirstEventTimestampCompactedList()
{
long timestamp = compactedListSRV.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST);
Assert.assertEquals(TIME_1, timestamp);
}
@Test
public void testGetFirstEventTimestampNonCompactedList()
{
long timestamp = listSRV.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_LIST);
Assert.assertEquals(TIME_1, timestamp);
}
@Test
public void testToSingleEventScanResultValues()
{
List<ScanResultValue> compactedListScanResultValues = compactedListSRV.toSingleEventScanResultValues();
for (ScanResultValue srv : compactedListScanResultValues) {
List<Object> events = (List<Object>) srv.getEvents();
Assert.assertEquals(1, events.size());
}
List<ScanResultValue> listScanResultValues = listSRV.toSingleEventScanResultValues();
for (ScanResultValue srv : compactedListScanResultValues) {
List<Object> events = (List<Object>) srv.getEvents();
Assert.assertEquals(1, events.size());
}
}
}

View File

@ -0,0 +1,194 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.scan;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.query.Druids;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.column.ColumnHolder;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
public class ScanResultValueTimestampComparatorTest
{
private static QuerySegmentSpec intervalSpec;
@BeforeClass
public static void setup()
{
intervalSpec = new MultipleIntervalSegmentSpec(
Collections.singletonList(
new Interval(DateTimes.of("2012-01-01"), DateTimes.of("2012-01-01").plusHours(1))
)
);
}
@Test
public void comparisonDescendingListTest()
{
ScanQuery query = Druids.newScanQueryBuilder()
.order(ScanQuery.Order.DESCENDING)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)
.dataSource("some src")
.intervals(intervalSpec)
.build();
ScanResultValueTimestampComparator comparator = new ScanResultValueTimestampComparator(query);
ArrayList<HashMap<String, Object>> events1 = new ArrayList<>();
HashMap<String, Object> event1 = new HashMap<>();
event1.put(ColumnHolder.TIME_COLUMN_NAME, new Long(42));
events1.add(event1);
ScanResultValue s1 = new ScanResultValue(
"segmentId",
Collections.singletonList(ColumnHolder.TIME_COLUMN_NAME),
events1
);
ArrayList<HashMap<String, Object>> events2 = new ArrayList<>();
HashMap<String, Object> event2 = new HashMap<>();
event2.put(ColumnHolder.TIME_COLUMN_NAME, new Long(43));
events2.add(event2);
ScanResultValue s2 = new ScanResultValue(
"segmentId",
Collections.singletonList(ColumnHolder.TIME_COLUMN_NAME),
events2
);
Assert.assertEquals(-1, comparator.compare(s1, s2));
}
@Test
public void comparisonAscendingListTest()
{
ScanQuery query = Druids.newScanQueryBuilder()
.order(ScanQuery.Order.ASCENDING)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)
.dataSource("some src")
.intervals(intervalSpec)
.build();
ScanResultValueTimestampComparator comparator = new ScanResultValueTimestampComparator(query);
ArrayList<HashMap<String, Object>> events1 = new ArrayList<>();
HashMap<String, Object> event1 = new HashMap<>();
event1.put(ColumnHolder.TIME_COLUMN_NAME, new Long(42));
events1.add(event1);
ScanResultValue s1 = new ScanResultValue(
"segmentId",
Collections.singletonList(ColumnHolder.TIME_COLUMN_NAME),
events1
);
ArrayList<HashMap<String, Object>> events2 = new ArrayList<>();
HashMap<String, Object> event2 = new HashMap<>();
event2.put(ColumnHolder.TIME_COLUMN_NAME, new Long(43));
events2.add(event2);
ScanResultValue s2 = new ScanResultValue(
"segmentId",
Collections.singletonList(ColumnHolder.TIME_COLUMN_NAME),
events2
);
Assert.assertEquals(1, comparator.compare(s1, s2));
}
@Test
public void comparisonDescendingCompactedListTest()
{
ScanQuery query = Druids.newScanQueryBuilder()
.order(ScanQuery.Order.DESCENDING)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.dataSource("some src")
.intervals(intervalSpec)
.build();
ScanResultValueTimestampComparator comparator = new ScanResultValueTimestampComparator(query);
List<List<Object>> events1 = new ArrayList<>();
List<Object> event1 = Collections.singletonList(new Long(42));
events1.add(event1);
ScanResultValue s1 = new ScanResultValue(
"segmentId",
Collections.singletonList(ColumnHolder.TIME_COLUMN_NAME),
events1
);
List<List<Object>> events2 = new ArrayList<>();
List<Object> event2 = Collections.singletonList(new Long(43));
events2.add(event2);
ScanResultValue s2 = new ScanResultValue(
"segmentId",
Collections.singletonList(ColumnHolder.TIME_COLUMN_NAME),
events2
);
Assert.assertEquals(-1, comparator.compare(s1, s2));
}
@Test
public void comparisonAscendingCompactedListTest()
{
ScanQuery query = Druids.newScanQueryBuilder()
.order(ScanQuery.Order.ASCENDING)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.dataSource("some src")
.intervals(intervalSpec)
.build();
ScanResultValueTimestampComparator comparator = new ScanResultValueTimestampComparator(query);
List<List<Object>> events1 = new ArrayList<>();
List<Object> event1 = Collections.singletonList(new Long(42));
events1.add(event1);
ScanResultValue s1 = new ScanResultValue(
"segmentId",
Collections.singletonList(ColumnHolder.TIME_COLUMN_NAME),
events1
);
List<List<Object>> events2 = new ArrayList<>();
List<Object> event2 = Collections.singletonList(new Long(43));
events2.add(event2);
ScanResultValue s2 = new ScanResultValue(
"segmentId",
Collections.singletonList(ColumnHolder.TIME_COLUMN_NAME),
events2
);
Assert.assertEquals(1, comparator.compare(s1, s2));
}
}

View File

@ -199,7 +199,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
}
/**
* This class essentially incapsulates the major part of the logic of {@link CachingClusteredClient}. It's state and
* This class essentially encapsulates the major part of the logic of {@link CachingClusteredClient}. It's state and
* methods couldn't belong to {@link CachingClusteredClient} itself, because they depend on the specific query object
* being run, but {@link QuerySegmentWalker} API is designed so that implementations should be able to accept
* arbitrary queries.

View File

@ -197,7 +197,7 @@ public class ServerManager implements QuerySegmentWalker
);
return CPUTimeMetricQueryRunner.safeBuild(
new FinalizeResultsQueryRunner<T>(
new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)),
toolChest
),

View File

@ -988,9 +988,10 @@ public class DruidQuery
dataSource,
filtration.getQuerySegmentSpec(),
selectProjection != null ? VirtualColumns.create(selectProjection.getVirtualColumns()) : VirtualColumns.EMPTY,
ScanQuery.RESULT_FORMAT_COMPACTED_LIST,
ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST,
0,
scanLimit,
null, // Will default to "none"
filtration.getDimFilter(),
Ordering.natural().sortedCopy(ImmutableSet.copyOf(outputRowSignature.getRowOrder())),
false,

View File

@ -370,7 +370,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
public static Druids.ScanQueryBuilder newScanQueryBuilder()
{
return new Druids.ScanQueryBuilder().resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
return new Druids.ScanQueryBuilder().resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false);
}

View File

@ -111,7 +111,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(expressionVirtualColumn("v0", "2", ValueType.LONG))
.columns("dim1", "v0")
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.limit(1)
.context(QUERY_CONTEXT_DEFAULT)
.build()
@ -431,7 +431,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -465,7 +465,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.dataSource(CalciteTests.FORBIDDEN_DATASOURCE)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1")
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -511,7 +511,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
ImmutableList.of(),
ImmutableList.of(
new Object[]{
"DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"filter\":null,\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"dim3\",\"m1\",\"m2\",\"unique_dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{__time:LONG, cnt:LONG, dim1:STRING, dim2:STRING, dim3:STRING, m1:FLOAT, m2:DOUBLE, unique_dim1:COMPLEX}])\n"
"DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"order\":\"none\",\"filter\":null,\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"dim3\",\"m1\",\"m2\",\"unique_dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{__time:LONG, cnt:LONG, dim1:STRING, dim2:STRING, dim3:STRING, m1:FLOAT, m2:DOUBLE, unique_dim1:COMPLEX}])\n"
}
)
);
@ -531,7 +531,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.limit(2)
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -556,7 +556,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
.columns("v0")
.limit(2)
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -652,7 +652,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("dim2")
.limit(2)
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -744,14 +744,14 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("dim1")
.filters(not(selector("dim1", "", null)))
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build(),
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("dim1", "dim2")
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -767,10 +767,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
String emptyStringEq = NullHandling.replaceWithDefault() ? null : "\"\"";
final String explanation =
"BindableJoin(condition=[=($0, $2)], joinType=[inner])\n"
+ " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"filter\":{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"dim1\",\"value\":"
+ " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"order\":\"none\",\"filter\":{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"dim1\",\"value\":"
+ emptyStringEq
+ ",\"extractionFn\":null}},\"columns\":[\"dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{dim1:STRING}])\n"
+ " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"filter\":null,\"columns\":[\"dim1\",\"dim2\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{dim1:STRING, dim2:STRING}])\n";
+ " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"order\":\"none\",\"filter\":null,\"columns\":[\"dim1\",\"dim2\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{dim1:STRING, dim2:STRING}])\n";
testQuery(
PLANNER_CONFIG_FALLBACK,
@ -1879,7 +1879,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
)
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -6669,7 +6669,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
+ " BindableFilter(condition=[OR(=($0, 'xxx'), CAST(AND(IS NOT NULL($4), <>($2, 0), IS NOT NULL($1))):BOOLEAN)])\n"
+ " BindableJoin(condition=[=($1, $3)], joinType=[left])\n"
+ " BindableJoin(condition=[true], joinType=[inner])\n"
+ " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"filter\":null,\"columns\":[\"dim1\",\"dim2\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{dim1:STRING, dim2:STRING}])\n"
+ " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"order\":\"none\",\"filter\":null,\"columns\":[\"dim1\",\"dim2\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{dim1:STRING, dim2:STRING}])\n"
+ " DruidQueryRel(query=[{\"queryType\":\"timeseries\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"descending\":false,\"virtualColumns\":[],\"filter\":{\"type\":\"like\",\"dimension\":\"dim1\",\"pattern\":\"%bc\",\"escape\":null,\"extractionFn\":null},\"granularity\":{\"type\":\"all\"},\"aggregations\":[{\"type\":\"count\",\"name\":\"a0\"}],\"postAggregations\":[],\"limit\":2147483647,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"skipEmptyBuckets\":true,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\"}}], signature=[{a0:LONG}])\n"
+ " DruidQueryRel(query=[{\"queryType\":\"groupBy\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[{\"type\":\"expression\",\"name\":\"v0\",\"expression\":\"1\",\"outputType\":\"LONG\"}],\"filter\":{\"type\":\"like\",\"dimension\":\"dim1\",\"pattern\":\"%bc\",\"escape\":null,\"extractionFn\":null},\"granularity\":{\"type\":\"all\"},\"dimensions\":[{\"type\":\"default\",\"dimension\":\"dim1\",\"outputName\":\"d0\",\"outputType\":\"STRING\"},{\"type\":\"default\",\"dimension\":\"v0\",\"outputName\":\"v0\",\"outputType\":\"LONG\"}],\"aggregations\":[],\"postAggregations\":[],\"having\":null,\"limitSpec\":{\"type\":\"NoopLimitSpec\"},\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\"},\"descending\":false}], signature=[{d0:STRING, v0:LONG}])\n";
@ -6739,7 +6739,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
))
.columns("__time", "cnt", "dim1", "dim2")
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -7294,7 +7294,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
ValueType.STRING
))
.columns("v0")
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -7320,7 +7320,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
ValueType.STRING
))
.columns("v0")
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -7346,7 +7346,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(expressionVirtualColumn("v0", "concat(\"dim1\",\"dim1\")", ValueType.STRING))
.columns("v0")
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -7372,7 +7372,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
ValueType.STRING
))
.columns("v0")
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -7568,7 +7568,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("dim1")
.filters(selector("f1", "0.1", null))
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.limit(1)
.context(QUERY_CONTEXT_DEFAULT)
.build()
@ -7590,7 +7590,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("dim1")
.filters(selector("d1", "1.7", null))
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.limit(1)
.context(QUERY_CONTEXT_DEFAULT)
.build()
@ -7612,7 +7612,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("dim1")
.filters(selector("l1", "7", null))
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.limit(1)
.context(QUERY_CONTEXT_DEFAULT)
.build()

View File

@ -508,7 +508,8 @@ public class CalciteTests
new ScanQueryConfig(),
new DefaultGenericQueryMetricsFactory(TestHelper.makeJsonMapper())
),
new ScanQueryEngine()
new ScanQueryEngine(),
new ScanQueryConfig()
)
)
.put(