diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml index 3896a8eec72..b33df90f0ef 100644 --- a/benchmarks/pom.xml +++ b/benchmarks/pom.xml @@ -170,6 +170,11 @@ org.apache.datasketches datasketches-memory + + com.google.inject + guice + + junit junit diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java index 8ffed2f764c..7d9a55d904b 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java @@ -29,13 +29,19 @@ import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.apache.druid.query.aggregation.datasketches.quantiles.sql.DoublesSketchApproxQuantileSqlAggregator; +import org.apache.druid.query.aggregation.datasketches.quantiles.sql.DoublesSketchObjectSqlAggregator; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.generator.GeneratorBasicSchemas; import org.apache.druid.segment.generator.GeneratorSchemaInfo; import org.apache.druid.segment.generator.SegmentGenerator; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.security.AuthTestUtils; +import org.apache.druid.sql.calcite.aggregation.SqlAggregator; +import org.apache.druid.sql.calcite.expression.SqlOperatorConversion; +import org.apache.druid.sql.calcite.expression.builtin.QueryLookupOperatorConversion; import org.apache.druid.sql.calcite.planner.Calcites; +import org.apache.druid.sql.calcite.planner.DruidOperatorTable; import org.apache.druid.sql.calcite.planner.DruidPlanner; import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.planner.PlannerFactory; @@ -60,8 +66,10 @@ import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; import javax.annotation.Nullable; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; /** @@ -365,7 +373,12 @@ public class SqlBenchmark + " GROUP BY dimSequential\n" + " )\n" + ")\n" - + "SELECT * FROM matrix" + + "SELECT * FROM matrix", + + // 20: GroupBy, doubles sketches + "SELECT dimZipf, APPROX_QUANTILE_DS(sumFloatNormal, 0.5), DS_QUANTILES_SKETCH(maxLongUniform) " + + "FROM foo " + + "GROUP BY 1" ); @Param({"5000000"}) @@ -374,7 +387,7 @@ public class SqlBenchmark @Param({"false", "force"}) private String vectorize; - @Param({"10", "15"}) + @Param({"20"}) private String query; @Nullable @@ -413,7 +426,7 @@ public class SqlBenchmark plannerFactory = new PlannerFactory( rootSchema, CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), - CalciteTests.createOperatorTable(), + createOperatorTable(), CalciteTests.createExprMacroTable(), plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER, @@ -422,6 +435,21 @@ public class SqlBenchmark ); } + private static DruidOperatorTable createOperatorTable() + { + try { + final Set extractionOperators = new HashSet<>(); + extractionOperators.add(CalciteTests.INJECTOR.getInstance(QueryLookupOperatorConversion.class)); + final Set aggregators = new HashSet<>(); + aggregators.add(CalciteTests.INJECTOR.getInstance(DoublesSketchApproxQuantileSqlAggregator.class)); + aggregators.add(CalciteTests.INJECTOR.getInstance(DoublesSketchObjectSqlAggregator.class)); + return new DruidOperatorTable(aggregators, extractionOperators); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + @TearDown(Level.Trial) public void tearDown() throws Exception { diff --git a/docs/development/extensions-core/datasketches-quantiles.md b/docs/development/extensions-core/datasketches-quantiles.md index bf9726ee2fb..850d84846c4 100644 --- a/docs/development/extensions-core/datasketches-quantiles.md +++ b/docs/development/extensions-core/datasketches-quantiles.md @@ -56,6 +56,7 @@ The result of the aggregation is a DoublesSketch that is the union of all sketch |name|A String for the output (result) name of the calculation.|yes| |fieldName|A String for the name of the input field (can contain sketches or raw numeric values).|yes| |k|Parameter that determines the accuracy and size of the sketch. Higher k means higher accuracy but more space to store sketches. Must be a power of 2 from 2 to 32768. See [accuracy information](https://datasketches.apache.org/docs/Quantiles/OrigQuantilesSketch) in the DataSketches documentation for details.|no, defaults to 128| +|maxStreamLength|This parameter is a temporary solution to avoid a [known issue](https://github.com/apache/druid/issues/11544). It may be removed in a future release after the bug is fixed. This parameter defines the maximum number of items to store in each sketch. If a sketch reaches the limit, the query can throw `IllegalStateException`. To workaround this issue, increase the maximum stream length. See [accuracy information](https://datasketches.apache.org/docs/Quantiles/OrigQuantilesSketch) in the DataSketches documentation for how many bytes are required per stream length.|no, defaults to 1000000000| ### Post Aggregators diff --git a/docs/querying/sql.md b/docs/querying/sql.md index 4d203f477b3..f9c765bf7c0 100644 --- a/docs/querying/sql.md +++ b/docs/querying/sql.md @@ -339,9 +339,9 @@ Only the COUNT, ARRAY_AGG, and STRING_AGG aggregations can accept the DISTINCT k |`DS_HLL(expr, [lgK, tgtHllType])`|Creates an [HLL sketch](../development/extensions-core/datasketches-hll.md) on the values of expr, which can be a regular column or a column containing HLL sketches. The `lgK` and `tgtHllType` parameters are described in the HLL sketch documentation. The [DataSketches extension](../development/extensions-core/datasketches-extension.md) must be loaded to use this function.|`'0'` (STRING)| |`DS_THETA(expr, [size])`|Creates a [Theta sketch](../development/extensions-core/datasketches-theta.md) on the values of expr, which can be a regular column or a column containing Theta sketches. The `size` parameter is described in the Theta sketch documentation. The [DataSketches extension](../development/extensions-core/datasketches-extension.md) must be loaded to use this function.|`'0.0'` (STRING)| |`APPROX_QUANTILE(expr, probability, [resolution])`|_Deprecated._ Use `APPROX_QUANTILE_DS` instead, which provides a superior distribution-independent algorithm with formal error guarantees.

Computes approximate quantiles on numeric or [approxHistogram](../development/extensions-core/approximate-histograms.md#approximate-histogram-aggregator) exprs. The "probability" should be between 0 and 1 (exclusive). The "resolution" is the number of centroids to use for the computation. Higher resolutions will give more precise results but also have higher overhead. If not provided, the default resolution is 50. The [approximate histogram extension](../development/extensions-core/approximate-histograms.md) must be loaded to use this function.|`NaN`| -|`APPROX_QUANTILE_DS(expr, probability, [k])`|Computes approximate quantiles on numeric or [Quantiles sketch](../development/extensions-core/datasketches-quantiles.md) exprs. The "probability" should be between 0 and 1 (exclusive). The `k` parameter is described in the Quantiles sketch documentation. The [DataSketches extension](../development/extensions-core/datasketches-extension.md) must be loaded to use this function.|`NaN`| +|`APPROX_QUANTILE_DS(expr, probability, [k])`|Computes approximate quantiles on numeric or [Quantiles sketch](../development/extensions-core/datasketches-quantiles.md) exprs. Allowable "probability" values are between 0 and 1, exclusive. The `k` parameter is described in the Quantiles sketch documentation. You must load [DataSketches extension](../development/extensions-core/datasketches-extension.md) to use this function.

See the [known issue](#a-known-issue-with-approximate-functions-based-on-data-sketches) with this function.|`NaN`| |`APPROX_QUANTILE_FIXED_BUCKETS(expr, probability, numBuckets, lowerLimit, upperLimit, [outlierHandlingMode])`|Computes approximate quantiles on numeric or [fixed buckets histogram](../development/extensions-core/approximate-histograms.md#fixed-buckets-histogram) exprs. The "probability" should be between 0 and 1 (exclusive). The `numBuckets`, `lowerLimit`, `upperLimit`, and `outlierHandlingMode` parameters are described in the fixed buckets histogram documentation. The [approximate histogram extension](../development/extensions-core/approximate-histograms.md) must be loaded to use this function.|`0.0`| -|`DS_QUANTILES_SKETCH(expr, [k])`|Creates a [Quantiles sketch](../development/extensions-core/datasketches-quantiles.md) on the values of expr, which can be a regular column or a column containing quantiles sketches. The `k` parameter is described in the Quantiles sketch documentation. The [DataSketches extension](../development/extensions-core/datasketches-extension.md) must be loaded to use this function.|`'0'` (STRING)| +|`DS_QUANTILES_SKETCH(expr, [k])`|Creates a [Quantiles sketch](../development/extensions-core/datasketches-quantiles.md) on the values of `expr`, which can be a regular column or a column containing quantiles sketches. The `k` parameter is described in the Quantiles sketch documentation. You must load the [DataSketches extension](../development/extensions-core/datasketches-extension.md) to use this function.

See the [known issue](#a-known-issue-with-approximate-functions-based-on-data-sketches) with this function.|`'0'` (STRING)| |`BLOOM_FILTER(expr, numEntries)`|Computes a bloom filter from values produced by `expr`, with `numEntries` maximum number of distinct values before false positive rate increases. See [bloom filter extension](../development/extensions-core/bloom-filter.md) documentation for additional details.|Empty base64 encoded bloom filter STRING| |`TDIGEST_QUANTILE(expr, quantileFraction, [compression])`|Builds a T-Digest sketch on values produced by `expr` and returns the value for the quantile. Compression parameter (default value 100) determines the accuracy and size of the sketch. Higher compression means higher accuracy but more space to store sketches. See [t-digest extension](../development/extensions-contrib/tdigestsketch-quantiles.md) documentation for additional details.|`Double.NaN`| |`TDIGEST_GENERATE_SKETCH(expr, [compression])`|Builds a T-Digest sketch on values produced by `expr`. Compression parameter (default value 100) determines the accuracy and size of the sketch Higher compression means higher accuracy but more space to store sketches. See [t-digest extension](../development/extensions-contrib/tdigestsketch-quantiles.md) documentation for additional details.|Empty base64 encoded T-Digest sketch STRING| @@ -820,6 +820,16 @@ either through query context or through Broker configuration. - Aggregation functions that are labeled as using sketches or approximations, such as APPROX_COUNT_DISTINCT, are always approximate, regardless of configuration. +#### A known issue with approximate functions based on data sketches + +The `APPROX_QUANTILE_DS` and `DS_QUANTILES_SKETCH` functions can fail with an `IllegalStateException` if one of the sketches for +the query hits `maxStreamLength`: the maximum number of items to store in each sketch. +See [GitHub issue 11544](https://github.com/apache/druid/issues/11544) for more details. +To workaround the issue, increase value of the maximum string length with the `approxQuantileDsMaxStreamLength` parameter +in the query context. Since it is set to 1,000,000,000 by default, you don't need to override it in most cases. +See [accuracy information](https://datasketches.apache.org/docs/Quantiles/OrigQuantilesSketch) in the DataSketches documentation for how many bytes are required per stream length. +This query context parameter is a temporary solution to avoid the known issue. It may be removed in a future release after the bug is fixed. + ### Unsupported features Druid does not support all SQL features. In particular, the following features are not supported. diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java index 27da4a21ab3..443727975fb 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java @@ -21,6 +21,7 @@ package org.apache.druid.query.aggregation.datasketches.quantiles; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import org.apache.datasketches.Util; import org.apache.datasketches.quantiles.DoublesSketch; import org.apache.datasketches.quantiles.DoublesUnion; @@ -63,24 +64,42 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory public static final int DEFAULT_K = 128; // Used for sketch size estimation. - private static final long MAX_STREAM_LENGTH = 1_000_000_000; + public static final long DEFAULT_MAX_STREAM_LENGTH = 1_000_000_000; private final String name; private final String fieldName; private final int k; + private final long maxStreamLength; private final byte cacheTypeId; @JsonCreator public DoublesSketchAggregatorFactory( @JsonProperty("name") final String name, @JsonProperty("fieldName") final String fieldName, - @JsonProperty("k") final Integer k + @JsonProperty("k") @Nullable final Integer k, + @JsonProperty("maxStreamLength") @Nullable final Long maxStreamLength ) { - this(name, fieldName, k, AggregatorUtil.QUANTILES_DOUBLES_SKETCH_BUILD_CACHE_TYPE_ID); + this(name, fieldName, k, maxStreamLength, AggregatorUtil.QUANTILES_DOUBLES_SKETCH_BUILD_CACHE_TYPE_ID); } - DoublesSketchAggregatorFactory(final String name, final String fieldName, final Integer k, final byte cacheTypeId) + @VisibleForTesting + public DoublesSketchAggregatorFactory( + final String name, + final String fieldName, + @Nullable final Integer k + ) + { + this(name, fieldName, k, null); + } + + DoublesSketchAggregatorFactory( + final String name, + final String fieldName, + @Nullable final Integer k, + @Nullable final Long maxStreamLength, + final byte cacheTypeId + ) { if (name == null) { throw new IAE("Must have a valid, non-null aggregator name"); @@ -92,6 +111,7 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory this.fieldName = fieldName; this.k = k == null ? DEFAULT_K : k; Util.checkIfPowerOf2(this.k, "k"); + this.maxStreamLength = maxStreamLength == null ? DEFAULT_MAX_STREAM_LENGTH : maxStreamLength; this.cacheTypeId = cacheTypeId; } @@ -266,6 +286,12 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory return k; } + @JsonProperty + public long getMaxStreamLength() + { + return maxStreamLength; + } + @Override public List requiredFields() { @@ -278,7 +304,7 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory @Override public int getMaxIntermediateSize() { - return DoublesSketch.getUpdatableStorageBytes(k, MAX_STREAM_LENGTH); + return DoublesSketch.getUpdatableStorageBytes(k, maxStreamLength); } @Override @@ -288,7 +314,8 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory new DoublesSketchAggregatorFactory( fieldName, fieldName, - k + k, + maxStreamLength ) ); } @@ -296,7 +323,7 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory @Override public AggregatorFactory getCombiningFactory() { - return new DoublesSketchMergeAggregatorFactory(name, k); + return new DoublesSketchMergeAggregatorFactory(name, k, maxStreamLength); } @Override @@ -306,7 +333,11 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory // DoublesUnion supports inputs with different k. // The result will have effective k between the specified k and the minimum k from all input sketches // to achieve higher accuracy as much as possible. - return new DoublesSketchMergeAggregatorFactory(name, Math.max(k, ((DoublesSketchAggregatorFactory) other).k)); + return new DoublesSketchMergeAggregatorFactory( + name, + Math.max(k, ((DoublesSketchAggregatorFactory) other).k), + maxStreamLength + ); } else { throw new AggregatorFactoryNotMergeableException(this, other); } @@ -343,35 +374,30 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory @Override public byte[] getCacheKey() { + // maxStreamLength is not included in the cache key as it does nothing with query result. return new CacheKeyBuilder(cacheTypeId).appendString(name).appendString(fieldName).appendInt(k).build(); } @Override - public boolean equals(final Object o) + public boolean equals(Object o) { if (this == o) { return true; } - if (o == null || !getClass().equals(o.getClass())) { + if (o == null || getClass() != o.getClass()) { return false; } - final DoublesSketchAggregatorFactory that = (DoublesSketchAggregatorFactory) o; - if (!name.equals(that.name)) { - return false; - } - if (!fieldName.equals(that.fieldName)) { - return false; - } - if (k != that.k) { - return false; - } - return true; + DoublesSketchAggregatorFactory that = (DoublesSketchAggregatorFactory) o; + return k == that.k + && maxStreamLength == that.maxStreamLength + && name.equals(that.name) + && fieldName.equals(that.fieldName); } @Override public int hashCode() { - return Objects.hash(name, fieldName, k); // no need to use cacheTypeId here + return Objects.hash(name, fieldName, k, maxStreamLength); // no need to use cacheTypeId here } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java index 74be19bfd27..c2529ac7c30 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java @@ -57,7 +57,7 @@ public class DoublesSketchBuildBufferAggregator implements BufferAggregator } final UpdateDoublesSketch sketch = helper.getSketchAtPosition(buffer, position); - sketch.update(selector.getDouble()); + DoublesSketches.handleMaxStreamLengthLimit(() -> sketch.update(selector.getDouble())); } @Nullable diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildVectorAggregator.java index af29c5b9cde..c1074f5f0bc 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildVectorAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildVectorAggregator.java @@ -55,11 +55,13 @@ public class DoublesSketchBuildVectorAggregator implements VectorAggregator final UpdateDoublesSketch sketch = helper.getSketchAtPosition(buf, position); - for (int i = startRow; i < endRow; i++) { - if (nulls == null || !nulls[i]) { - sketch.update(doubles[i]); + DoublesSketches.handleMaxStreamLengthLimit(() -> { + for (int i = startRow; i < endRow; i++) { + if (nulls == null || !nulls[i]) { + sketch.update(doubles[i]); + } } - } + }); } @Override @@ -74,14 +76,16 @@ public class DoublesSketchBuildVectorAggregator implements VectorAggregator final double[] doubles = selector.getDoubleVector(); final boolean[] nulls = selector.getNullVector(); - for (int i = 0; i < numRows; i++) { - final int idx = rows != null ? rows[i] : i; + DoublesSketches.handleMaxStreamLengthLimit(() -> { + for (int i = 0; i < numRows; i++) { + final int idx = rows != null ? rows[i] : i; - if (nulls == null || !nulls[idx]) { - final int position = positions[i] + positionOffset; - helper.getSketchAtPosition(buf, position).update(doubles[idx]); + if (nulls == null || !nulls[idx]) { + final int position = positions[i] + positionOffset; + helper.getSketchAtPosition(buf, position).update(doubles[idx]); + } } - } + }); } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java index 6693742aefd..a5f12d2227d 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java @@ -76,10 +76,12 @@ public class DoublesSketchMergeAggregator implements Aggregator if (object == null) { return; } - if (object instanceof DoublesSketch) { - union.update((DoublesSketch) object); - } else { - union.update(selector.getDouble()); - } + DoublesSketches.handleMaxStreamLengthLimit(() -> { + if (object instanceof DoublesSketch) { + union.update((DoublesSketch) object); + } else { + union.update(selector.getDouble()); + } + }); } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregatorFactory.java index 14e8114e3fc..c8cd6b0ad13 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregatorFactory.java @@ -21,6 +21,7 @@ package org.apache.druid.query.aggregation.datasketches.quantiles; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import org.apache.datasketches.quantiles.DoublesSketch; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorUtil; @@ -29,15 +30,28 @@ import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.NilColumnValueSelector; +import javax.annotation.Nullable; + public class DoublesSketchMergeAggregatorFactory extends DoublesSketchAggregatorFactory { @JsonCreator public DoublesSketchMergeAggregatorFactory( @JsonProperty("name") final String name, - @JsonProperty("k") final Integer k) + @JsonProperty("k") @Nullable final Integer k, + @JsonProperty("maxStreamLength") @Nullable final Long maxStreamLength + ) { - super(name, name, k, AggregatorUtil.QUANTILES_DOUBLES_SKETCH_MERGE_CACHE_TYPE_ID); + super(name, name, k, maxStreamLength, AggregatorUtil.QUANTILES_DOUBLES_SKETCH_MERGE_CACHE_TYPE_ID); + } + + @VisibleForTesting + DoublesSketchMergeAggregatorFactory( + final String name, + @Nullable final Integer k + ) + { + this(name, k, null); } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeVectorAggregator.java index 8a8e10b48a0..92437d09bec 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeVectorAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeVectorAggregator.java @@ -55,12 +55,14 @@ public class DoublesSketchMergeVectorAggregator implements VectorAggregator final DoublesUnion union = helper.getSketchAtPosition(buf, position); - for (int i = startRow; i < endRow; i++) { - final DoublesSketch sketch = (DoublesSketch) vector[i]; - if (sketch != null) { - union.update(sketch); + DoublesSketches.handleMaxStreamLengthLimit(() -> { + for (int i = startRow; i < endRow; i++) { + final DoublesSketch sketch = (DoublesSketch) vector[i]; + if (sketch != null) { + union.update(sketch); + } } - } + }); } @Override @@ -74,15 +76,17 @@ public class DoublesSketchMergeVectorAggregator implements VectorAggregator { final Object[] vector = selector.getObjectVector(); - for (int i = 0; i < numRows; i++) { - final DoublesSketch sketch = (DoublesSketch) vector[rows != null ? rows[i] : i]; + DoublesSketches.handleMaxStreamLengthLimit(() -> { + for (int i = 0; i < numRows; i++) { + final DoublesSketch sketch = (DoublesSketch) vector[rows != null ? rows[i] : i]; - if (sketch != null) { - final int position = positions[i] + positionOffset; - final DoublesUnion union = helper.getSketchAtPosition(buf, position); - union.update(sketch); + if (sketch != null) { + final int position = positions[i] + positionOffset; + final DoublesUnion union = helper.getSketchAtPosition(buf, position); + union.update(sketch); + } } - } + }); } @Nullable diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketches.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketches.java new file mode 100644 index 00000000000..d7fc420e7f3 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketches.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.quantiles; + +import org.apache.druid.java.util.common.ISE; + +public final class DoublesSketches +{ + /** + * Runs the given task that updates a Doubles sketch backed by a direct byte buffer. This method intentionally + * accepts the update task as a {@link Runnable} instead of accpeting parameters of a sketch and other values + * needed for the update. This is to avoid any potential performance impact especially when the sketch is updated + * in a tight loop. The update task can throw NullPointerException because of the known issue filed in + * https://github.com/apache/druid/issues/11544. This method catches NPE and converts it to a more user-friendly + * exception. This method should be removed once the known bug above is fixed. + */ + public static void handleMaxStreamLengthLimit(Runnable updateSketchTask) + { + try { + updateSketchTask.run(); + } + catch (NullPointerException e) { + throw new ISE( + e, + "NullPointerException was thrown while updating Doubles sketch. " + + "This exception could be potentially because of the known bug filed in https://github.com/apache/druid/issues/11544. " + + "You could try a higher maxStreamLength than current to work around this bug if that is the case. " + + "See https://druid.apache.org/docs/latest/development/extensions-core/datasketches-quantiles.html for more details." + ); + } + } + + private DoublesSketches() + { + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchApproxQuantileSqlAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchApproxQuantileSqlAggregator.java index 69a07dd3689..8cab843126f 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchApproxQuantileSqlAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchApproxQuantileSqlAggregator.java @@ -32,6 +32,7 @@ import org.apache.calcite.sql.type.OperandTypes; import org.apache.calcite.sql.type.ReturnTypes; import org.apache.calcite.sql.type.SqlTypeFamily; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.java.util.common.Numbers; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory; @@ -51,9 +52,12 @@ import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry; import javax.annotation.Nullable; import java.util.List; +import java.util.Map; public class DoublesSketchApproxQuantileSqlAggregator implements SqlAggregator { + public static final String CTX_APPROX_QUANTILE_DS_MAX_STREAM_LENGTH = "approxQuantileDsMaxStreamLength"; + private static final SqlAggFunction FUNCTION_INSTANCE = new DoublesSketchApproxQuantileSqlAggFunction(); private static final String NAME = "APPROX_QUANTILE_DS"; @@ -169,7 +173,8 @@ public class DoublesSketchApproxQuantileSqlAggregator implements SqlAggregator aggregatorFactory = new DoublesSketchAggregatorFactory( histogramName, input.getDirectColumn(), - k + k, + getMaxStreamLengthFromQueryContext(plannerContext.getQueryContext()) ); } else { VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression( @@ -180,7 +185,8 @@ public class DoublesSketchApproxQuantileSqlAggregator implements SqlAggregator aggregatorFactory = new DoublesSketchAggregatorFactory( histogramName, virtualColumn.getOutputName(), - k + k, + getMaxStreamLengthFromQueryContext(plannerContext.getQueryContext()) ); } @@ -197,6 +203,13 @@ public class DoublesSketchApproxQuantileSqlAggregator implements SqlAggregator ); } + @Nullable + static Long getMaxStreamLengthFromQueryContext(Map queryContext) + { + final Object val = queryContext.get(CTX_APPROX_QUANTILE_DS_MAX_STREAM_LENGTH); + return val == null ? null : Numbers.parseLong(val); + } + private static class DoublesSketchApproxQuantileSqlAggFunction extends SqlAggFunction { private static final String SIGNATURE1 = "'" + NAME + "(column, probability)'\n"; diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchObjectSqlAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchObjectSqlAggregator.java index 76d3e9cbe93..d9b84e7145f 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchObjectSqlAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchObjectSqlAggregator.java @@ -113,7 +113,8 @@ public class DoublesSketchObjectSqlAggregator implements SqlAggregator aggregatorFactory = new DoublesSketchAggregatorFactory( histogramName, input.getDirectColumn(), - k + k, + DoublesSketchApproxQuantileSqlAggregator.getMaxStreamLengthFromQueryContext(plannerContext.getQueryContext()) ); } else { VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression( @@ -124,7 +125,8 @@ public class DoublesSketchObjectSqlAggregator implements SqlAggregator aggregatorFactory = new DoublesSketchAggregatorFactory( histogramName, virtualColumn.getOutputName(), - k + k, + DoublesSketchApproxQuantileSqlAggregator.getMaxStreamLengthFromQueryContext(plannerContext.getQueryContext()) ); } diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java index 0b5f80ee938..c18212cb22b 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java @@ -19,8 +19,13 @@ package org.apache.druid.query.aggregation.datasketches.quantiles; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.Druids; +import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator; @@ -31,8 +36,73 @@ import org.apache.druid.segment.column.ValueType; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; + public class DoublesSketchAggregatorFactoryTest { + @Test + public void testEquals() + { + EqualsVerifier.forClass(DoublesSketchAggregatorFactory.class) + .withNonnullFields("name", "fieldName") + .withIgnoredFields("cacheTypeId") + .usingGetClass() + .verify(); + } + + @Test + public void testSerde() throws IOException + { + final ObjectMapper mapper = new DefaultObjectMapper(); + mapper.registerSubtypes(new NamedType(DoublesSketchAggregatorFactory.class, DoublesSketchModule.DOUBLES_SKETCH)); + final DoublesSketchAggregatorFactory factory = new DoublesSketchAggregatorFactory( + "myFactory", + "myField", + 1024, + 1000L + ); + final byte[] json = mapper.writeValueAsBytes(factory); + final DoublesSketchAggregatorFactory fromJson = (DoublesSketchAggregatorFactory) mapper.readValue( + json, + AggregatorFactory.class + ); + Assert.assertEquals(factory, fromJson); + } + + @Test + public void testDefaultParams() + { + final DoublesSketchAggregatorFactory factory = new DoublesSketchAggregatorFactory( + "myFactory", + "myField", + null, + null + ); + + Assert.assertEquals(DoublesSketchAggregatorFactory.DEFAULT_K, factory.getK()); + Assert.assertEquals(DoublesSketchAggregatorFactory.DEFAULT_MAX_STREAM_LENGTH, factory.getMaxStreamLength()); + } + + @Test + public void testMaxIntermediateSize() + { + DoublesSketchAggregatorFactory factory = new DoublesSketchAggregatorFactory( + "myFactory", + "myField", + 128, + null + ); + Assert.assertEquals(24608L, factory.getMaxIntermediateSize()); + + factory = new DoublesSketchAggregatorFactory( + "myFactory", + "myField", + 128, + 1_000_000_000_000L + ); + Assert.assertEquals(34848L, factory.getMaxIntermediateSize()); + } + @Test public void testResultArraySignature() { diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java index 81523692269..d20f369a9d4 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java @@ -31,11 +31,15 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryRunnerTest; import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; import org.apache.druid.testing.InitializedNullHandlingTest; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; import org.junit.After; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -49,15 +53,19 @@ import java.util.List; @RunWith(Parameterized.class) public class DoublesSketchAggregatorTest extends InitializedNullHandlingTest { - + private final GroupByQueryConfig config; private final AggregationTestHelper helper; private final AggregationTestHelper timeSeriesHelper; @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + public DoublesSketchAggregatorTest(final GroupByQueryConfig config, final String vectorize) { + this.config = config; DoublesSketchModule.registerSerde(); DoublesSketchModule module = new DoublesSketchModule(); helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper( @@ -534,4 +542,130 @@ public class DoublesSketchAggregatorTest extends InitializedNullHandlingTest List results = seq.toList(); Assert.assertEquals(1, results.size()); } + + @Test + public void testFailureWhenMaxStreamLengthHit() throws Exception + { + if (GroupByStrategySelector.STRATEGY_V1.equals(config.getDefaultStrategy())) { + expectedException.expect(new RecursiveExceptionMatcher(IllegalStateException.class)); + expectedException.expectMessage("NullPointerException was thrown while updating Doubles sketch"); + + helper.createIndexAndRunQueryOnSegment( + new File(this.getClass().getClassLoader().getResource("quantiles/doubles_build_data.tsv").getFile()), + String.join( + "\n", + "{", + " \"type\": \"string\",", + " \"parseSpec\": {", + " \"format\": \"tsv\",", + " \"timestampSpec\": {\"column\": \"timestamp\", \"format\": \"yyyyMMddHH\"},", + " \"dimensionsSpec\": {", + " \"dimensions\": [\"sequenceNumber\", \"product\"],", + " \"dimensionExclusions\": [],", + " \"spatialDimensions\": []", + " },", + " \"columns\": [\"timestamp\", \"sequenceNumber\", \"product\", \"value\"]", + " }", + "}" + ), + "[{\"type\": \"doubleSum\", \"name\": \"value\", \"fieldName\": \"value\"}]", + 0, // minTimestamp + Granularities.NONE, + 10, // maxRowCount + String.join( + "\n", + "{", + " \"queryType\": \"groupBy\",", + " \"dataSource\": \"test_datasource\",", + " \"granularity\": \"ALL\",", + " \"dimensions\": [],", + " \"aggregations\": [", + " {\"type\": \"quantilesDoublesSketch\", \"name\": \"sketch\", \"fieldName\": \"value\", \"k\": 128, \"maxStreamLength\": 10}", + " ],", + " \"postAggregations\": [", + " {\"type\": \"quantilesDoublesSketchToQuantile\", \"name\": \"quantile\", \"fraction\": 0.5, \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}},", + " {\"type\": \"quantilesDoublesSketchToQuantiles\", \"name\": \"quantiles\", \"fractions\": [0, 0.5, 1], \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}},", + " {\"type\": \"quantilesDoublesSketchToHistogram\", \"name\": \"histogram\", \"splitPoints\": [0.25, 0.5, 0.75], \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}}", + " ],", + " \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]", + "}" + ) + ); + } else { + Sequence seq = helper.createIndexAndRunQueryOnSegment( + new File(this.getClass().getClassLoader().getResource("quantiles/doubles_build_data.tsv").getFile()), + String.join( + "\n", + "{", + " \"type\": \"string\",", + " \"parseSpec\": {", + " \"format\": \"tsv\",", + " \"timestampSpec\": {\"column\": \"timestamp\", \"format\": \"yyyyMMddHH\"},", + " \"dimensionsSpec\": {", + " \"dimensions\": [\"sequenceNumber\", \"product\"],", + " \"dimensionExclusions\": [],", + " \"spatialDimensions\": []", + " },", + " \"columns\": [\"timestamp\", \"sequenceNumber\", \"product\", \"value\"]", + " }", + "}" + ), + "[{\"type\": \"doubleSum\", \"name\": \"value\", \"fieldName\": \"value\"}]", + 0, // minTimestamp + Granularities.NONE, + 10, // maxRowCount + String.join( + "\n", + "{", + " \"queryType\": \"groupBy\",", + " \"dataSource\": \"test_datasource\",", + " \"granularity\": \"ALL\",", + " \"dimensions\": [],", + " \"aggregations\": [", + " {\"type\": \"quantilesDoublesSketch\", \"name\": \"sketch\", \"fieldName\": \"value\", \"k\": 128, \"maxStreamLength\": 10}", + " ],", + " \"postAggregations\": [", + " {\"type\": \"quantilesDoublesSketchToQuantile\", \"name\": \"quantile\", \"fraction\": 0.5, \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}},", + " {\"type\": \"quantilesDoublesSketchToQuantiles\", \"name\": \"quantiles\", \"fractions\": [0, 0.5, 1], \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}},", + " {\"type\": \"quantilesDoublesSketchToHistogram\", \"name\": \"histogram\", \"splitPoints\": [0.25, 0.5, 0.75], \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}}", + " ],", + " \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]", + "}" + ) + ); + + expectedException.expect(new RecursiveExceptionMatcher(IllegalStateException.class)); + expectedException.expectMessage("NullPointerException was thrown while updating Doubles sketch"); + seq.toList(); + } + } + + private static class RecursiveExceptionMatcher extends BaseMatcher + { + private final Class expected; + + private RecursiveExceptionMatcher(Class expected) + { + this.expected = expected; + } + + @Override + public boolean matches(Object item) + { + if (expected.isInstance(item)) { + return true; + } else if (item instanceof Throwable) { + if (((Throwable) item).getCause() != null) { + return matches(((Throwable) item).getCause()); + } + } + return false; + } + + @Override + public void describeTo(Description description) + { + description.appendText("a recursive instance of ").appendText(expected.getName()); + } + } } diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregatorFactoryTest.java new file mode 100644 index 00000000000..55d0b124624 --- /dev/null +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregatorFactoryTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.quantiles; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class DoublesSketchMergeAggregatorFactoryTest +{ + @Test + public void testEquals() + { + EqualsVerifier.forClass(DoublesSketchMergeAggregatorFactory.class) + .withNonnullFields("name", "fieldName") + .withIgnoredFields("cacheTypeId") + .usingGetClass() + .verify(); + } + + @Test + public void testSerde() throws IOException + { + final ObjectMapper mapper = new DefaultObjectMapper(); + mapper.registerSubtypes( + new NamedType(DoublesSketchMergeAggregatorFactory.class, DoublesSketchModule.DOUBLES_SKETCH_MERGE) + ); + final DoublesSketchMergeAggregatorFactory factory = new DoublesSketchMergeAggregatorFactory( + "myFactory", + 1024, + 1000L + ); + final byte[] json = mapper.writeValueAsBytes(factory); + final DoublesSketchMergeAggregatorFactory fromJson = (DoublesSketchMergeAggregatorFactory) mapper.readValue( + json, + AggregatorFactory.class + ); + Assert.assertEquals(factory, fromJson); + } +} diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java index 4ebd7c9c8ff..bf550312b22 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java @@ -67,7 +67,9 @@ import org.junit.Test; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; public class DoublesSketchSqlAggregatorTest extends BaseCalciteQueryTest { @@ -821,6 +823,43 @@ public class DoublesSketchSqlAggregatorTest extends BaseCalciteQueryTest ); } + @Test + public void testFailWithSmallMaxStreamLength() throws Exception + { + final Map context = new HashMap<>(QUERY_CONTEXT_DEFAULT); + context.put( + DoublesSketchApproxQuantileSqlAggregator.CTX_APPROX_QUANTILE_DS_MAX_STREAM_LENGTH, + 1 + ); + testQueryThrows( + "SELECT\n" + + "APPROX_QUANTILE_DS(m1, 0.01),\n" + + "APPROX_QUANTILE_DS(cnt, 0.5)\n" + + "FROM foo", + context, + Collections.singletonList( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .aggregators(ImmutableList.of( + new DoublesSketchAggregatorFactory("a0:agg", "m1", null, 1L), + new DoublesSketchAggregatorFactory("a1:agg", "cnt", null, 1L) + )) + .postAggregators( + new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f), + new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.50f) + ) + .context(context) + .build() + ), + expectedException -> { + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage("NullPointerException was thrown while updating Doubles sketch"); + } + ); + } + private static PostAggregator makeFieldAccessPostAgg(String name) { return new FieldAccessPostAggregator(name, name); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index 54d2971627a..83cc761ae1e 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -112,7 +112,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest fullDatasourceName, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING, 0, - 14312, + 14370, 0, 0, 2, @@ -130,7 +130,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest fullDatasourceName, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING, 0, - 22481, + 22568, 0, 0, 3, @@ -246,8 +246,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest getAndAssertCompactionStatus( fullDatasourceName, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING, - 14312, - 14311, + 14370, + 14369, 0, 2, 2, @@ -255,7 +255,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest 1, 1, 0); - Assert.assertEquals(compactionResource.getCompactionProgress(fullDatasourceName).get("remainingSegmentSize"), "14312"); + Assert.assertEquals(compactionResource.getCompactionProgress(fullDatasourceName).get("remainingSegmentSize"), "14370"); // Run compaction again to compact the remaining day // Remaining day compacted (1 new segment). Now both days compacted (2 total) forceTriggerAutoCompaction(2); @@ -266,7 +266,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest fullDatasourceName, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING, 0, - 22481, + 22568, 0, 0, 3, diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index b4c36829197..876ea5c9144 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -99,6 +99,7 @@ import org.junit.Rule; import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; +import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -106,6 +107,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Consumer; import java.util.stream.Collectors; /** @@ -760,6 +762,14 @@ public class BaseCalciteQueryTest extends CalciteTestBase Assert.assertEquals(StringUtils.format("result count: %s", sql), expectedResults.size(), results.size()); assertResultsEquals(sql, expectedResults, results); + verifyQueries(sql, expectedQueries); + } + + private void verifyQueries( + final String sql, + @Nullable final List expectedQueries + ) + { if (expectedQueries != null) { final List recordedQueries = queryLogHook.getRecordedQueries(); @@ -789,6 +799,74 @@ public class BaseCalciteQueryTest extends CalciteTestBase } } + public void testQueryThrows( + final String sql, + final Map queryContext, + final List expectedQueries, + final Consumer expectedExceptionInitializer + ) throws Exception + { + testQueryThrows( + PLANNER_CONFIG_DEFAULT, + queryContext, + DEFAULT_PARAMETERS, + sql, + CalciteTests.REGULAR_USER_AUTH_RESULT, + expectedQueries, + expectedExceptionInitializer + ); + } + + public void testQueryThrows( + final PlannerConfig plannerConfig, + final Map queryContext, + final List parameters, + final String sql, + final AuthenticationResult authenticationResult, + final List expectedQueries, + final Consumer expectedExceptionInitializer + ) throws Exception + { + log.info("SQL: %s", sql); + + final List vectorizeValues = new ArrayList<>(); + + vectorizeValues.add("false"); + + if (!skipVectorize) { + vectorizeValues.add("force"); + } + + for (final String vectorize : vectorizeValues) { + queryLogHook.clearRecordedQueries(); + + final Map theQueryContext = new HashMap<>(queryContext); + theQueryContext.put(QueryContexts.VECTORIZE_KEY, vectorize); + theQueryContext.put(QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize); + + if (!"false".equals(vectorize)) { + theQueryContext.put(QueryContexts.VECTOR_SIZE_KEY, 2); // Small vector size to ensure we use more than one. + } + + final List theQueries = new ArrayList<>(); + for (Query query : expectedQueries) { + theQueries.add(recursivelyOverrideContext(query, theQueryContext)); + } + + if (cannotVectorize && "force".equals(vectorize)) { + expectedException.expect(RuntimeException.class); + expectedException.expectMessage("Cannot vectorize"); + } else { + expectedExceptionInitializer.accept(expectedException); + } + + // this should validate expectedException + getResults(plannerConfig, theQueryContext, parameters, sql, authenticationResult); + + verifyQueries(sql, theQueries); + } + } + public Set analyzeResources( PlannerConfig plannerConfig, String sql, diff --git a/website/.spelling b/website/.spelling index 8d39a187ca8..017402cc0dc 100644 --- a/website/.spelling +++ b/website/.spelling @@ -682,6 +682,7 @@ tgtHllType - ../docs/development/extensions-core/datasketches-quantiles.md CDF DoublesSketch +maxStreamLength PMF quantilesDoublesSketch toString