Allow max rows and max segments for time-ordered scans to be overridden using the scan query JSON spec (#7413)

* Initial changes

* Fixed NPEs

* Fixed failing spec test

* Fixed failing Calcite test

* Move configs to context

* Validated and added docs

* fixed weird indentation

* Update default context vals in doc

* Fixed allowable values
This commit is contained in:
Justin Borromeo 2019-04-07 20:12:52 -07:00 committed by Fangjin Yang
parent 7778f29781
commit 799c66d9ac
5 changed files with 88 additions and 12 deletions

View File

@ -63,7 +63,7 @@ The following are the main parameters for Scan queries:
|limit|How many rows to return. If not specified, all rows will be returned.|no|
|order|The ordering of returned rows based on timestamp. "ascending", "descending", and "none" (default) are supported. Currently, "ascending" and "descending" are only supported for queries where the limit is less than `druid.query.scan.maxRowsQueuedForOrdering`. Scan queries that are either legacy mode or have a limit greater than `druid.query.scan.maxRowsQueuedForOrdering` will not be time-ordered and default to a order of "none".|none|
|legacy|Return results consistent with the legacy "scan-query" contrib extension. Defaults to the value set by `druid.query.scan.legacy`, which in turn defaults to false. See [Legacy mode](#legacy-mode) for details.|no|
|context|An additional JSON Object which can be used to specify certain flags.|no|
|context|An additional JSON Object which can be used to specify certain flags (see the Query Context Properties section below).|no|
## Example results
@ -179,7 +179,9 @@ decompression and decoding buffers for each. The `druid.query.scan.maxSegmentPa
from this by capping the number of partitions opened at any times when time ordering is used.
Both `druid.query.scan.maxRowsQueuedForOrdering` and `druid.query.scan.maxSegmentPartitionsOrderedInMemory` are
configurable and can be tuned based on hardware specs and number of dimensions being queried.
configurable and can be tuned based on hardware specs and number of dimensions being queried. These config properties
can also be overridden using the `maxRowsQueuedForOrdering` and `maxSegmentPartitionsOrderedInMemory` properties in
the query context (see the Query Context Properties section).
## Legacy mode
@ -198,8 +200,27 @@ is complete.
## Configuration Properties
Configuration properties:
|property|description|values|default|
|--------|-----------|------|-------|
|druid.query.scan.maxRowsQueuedForOrdering|The maximum number of rows returned when time ordering is used|An integer in [0, 2147483647]|100000|
|druid.query.scan.maxSegmentPartitionsOrderedInMemory|The maximum number of segments scanned per historical when time ordering is used|An integer in [0, 2147483647]|50|
|druid.query.scan.legacy|Whether legacy mode should be turned on for Scan queries|true or false|false|
|druid.query.scan.maxRowsQueuedForOrdering|The maximum number of rows returned when time ordering is used|An integer in [1, 2147483647]|100000|
|druid.query.scan.maxSegmentPartitionsOrderedInMemory|The maximum number of segments scanned per historical when time ordering is used|An integer in [1, 2147483647]|50|
|druid.query.scan.legacy|Whether legacy mode should be turned on for Scan queries|true or false|false|
## Query Context Properties
|property|description|values|default|
|--------|-----------|------|-------|
|maxRowsQueuedForOrdering|The maximum number of rows returned when time ordering is used. Overrides the identically named config.|An integer in [1, 2147483647]|`druid.query.scan.maxRowsQueuedForOrdering`|
|maxSegmentPartitionsOrderedInMemory|The maximum number of segments scanned per historical when time ordering is used. Overrides the identically named config.|An integer in [1, 2147483647]|`druid.query.scan.maxSegmentPartitionsOrderedInMemory`|
Sample query context JSON object:
```json
{
"maxRowsQueuedForOrdering": 100001,
"maxSegmentPartitionsOrderedInMemory": 100
}
```

View File

@ -20,6 +20,7 @@
package org.apache.druid.query.scan;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Preconditions;
@ -34,6 +35,7 @@ import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.VirtualColumns;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -112,6 +114,8 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
private final List<String> columns;
private final Boolean legacy;
private final Order order;
private final Integer maxRowsQueuedForOrdering;
private final Integer maxSegmentPartitionsOrderedInMemory;
@JsonCreator
public ScanQuery(
@ -132,13 +136,43 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns);
this.resultFormat = (resultFormat == null) ? ResultFormat.RESULT_FORMAT_LIST : resultFormat;
this.batchSize = (batchSize == 0) ? 4096 * 5 : batchSize;
Preconditions.checkArgument(
this.batchSize > 0,
"batchSize must be greater than 0"
);
this.limit = (limit == 0) ? Long.MAX_VALUE : limit;
Preconditions.checkArgument(this.batchSize > 0, "batchSize must be greater than 0");
Preconditions.checkArgument(this.limit > 0, "limit must be greater than 0");
Preconditions.checkArgument(
this.limit > 0,
"limit must be greater than 0"
);
this.dimFilter = dimFilter;
this.columns = columns;
this.legacy = legacy;
this.order = order == null ? Order.NONE : order;
this.order = (order == null) ? Order.NONE : order;
this.maxRowsQueuedForOrdering = validateAndGetMaxRowsQueuedForOrdering();
this.maxSegmentPartitionsOrderedInMemory = validateAndGetMaxSegmentPartitionsOrderedInMemory();
}
private Integer validateAndGetMaxRowsQueuedForOrdering()
{
final Integer maxRowsQueuedForOrdering =
getContextValue(ScanQueryConfig.CTX_KEY_MAX_ROWS_QUEUED_FOR_ORDERING, null);
Preconditions.checkArgument(
maxRowsQueuedForOrdering == null || maxRowsQueuedForOrdering > 0,
"maxRowsQueuedForOrdering must be greater than 0"
);
return maxRowsQueuedForOrdering;
}
private Integer validateAndGetMaxSegmentPartitionsOrderedInMemory()
{
final Integer maxSegmentPartitionsOrderedInMemory =
getContextValue(ScanQueryConfig.CTX_KEY_MAX_SEGMENT_PARTITIONS_FOR_ORDERING, null);
Preconditions.checkArgument(
maxSegmentPartitionsOrderedInMemory == null || maxSegmentPartitionsOrderedInMemory > 0,
"maxRowsQueuedForOrdering must be greater than 0"
);
return maxSegmentPartitionsOrderedInMemory;
}
@JsonProperty
@ -171,6 +205,20 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
return order;
}
@Nullable
@JsonIgnore
public Integer getMaxRowsQueuedForOrdering()
{
return maxRowsQueuedForOrdering;
}
@Nullable
@JsonIgnore
public Integer getMaxSegmentPartitionsOrderedInMemory()
{
return maxSegmentPartitionsOrderedInMemory;
}
@Override
public boolean hasFilters()
{

View File

@ -25,6 +25,9 @@ import java.util.Objects;
public class ScanQueryConfig
{
public static final String CTX_KEY_MAX_ROWS_QUEUED_FOR_ORDERING = "maxRowsQueuedForOrdering";
public static final String CTX_KEY_MAX_SEGMENT_PARTITIONS_FOR_ORDERING = "maxSegmentPartitionsOrderedInMemory";
@JsonProperty
private boolean legacy = false;

View File

@ -128,8 +128,10 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
descriptorsOrdered = Lists.reverse(descriptorsOrdered);
queryRunnersOrdered = Lists.reverse(queryRunnersOrdered);
}
if (query.getLimit() <= scanQueryConfig.getMaxRowsQueuedForOrdering()) {
int maxRowsQueuedForOrdering = (query.getMaxRowsQueuedForOrdering() == null
? scanQueryConfig.getMaxRowsQueuedForOrdering()
: query.getMaxRowsQueuedForOrdering());
if (query.getLimit() <= maxRowsQueuedForOrdering) {
// Use priority queue strategy
return priorityQueueSortAndLimit(
Sequences.concat(Sequences.map(
@ -172,7 +174,10 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
.max(Comparator.comparing(Integer::valueOf))
.get();
if (maxNumPartitionsInSegment <= scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory()) {
int segmentPartitionLimit = (query.getMaxSegmentPartitionsOrderedInMemory() == null
? scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory()
: query.getMaxSegmentPartitionsOrderedInMemory());
if (maxNumPartitionsInSegment <= segmentPartitionLimit) {
// Use n-way merge strategy
// Create a list of grouped runner lists (i.e. each sublist/"runner group" corresponds to an interval) ->

View File

@ -85,7 +85,6 @@ import java.util.List;
public class CalciteQueryTest extends BaseCalciteQueryTest
{
@Test
public void testSelectConstantExpression() throws Exception
{