Always return sketches from DS_HLL, DS_THETA, DS_QUANTILES_SKETCH. (#13247)

* Always return sketches from DS_HLL, DS_THETA, DS_QUANTILES_SKETCH.

These aggregation functions are documented as creating sketches. However,
they are planned into native aggregators that include finalization logic
to convert the sketch to a number of some sort. This creates an
inconsistency: the functions sometimes return sketches, and sometimes
return numbers, depending on where they lie in the native query plan.

This patch changes these SQL aggregators to _never_ finalize, by using
the "shouldFinalize" feature of the native aggregators. It already
existed for theta sketches. This patch adds the feature for hll and
quantiles sketches.

As to impact, Druid finalizes aggregators in two cases:

- When they appear in the outer level of a query (not a subquery).
- When they are used as input to an expression or finalizing-field-access
  post-aggregator (not any other kind of post-aggregator).

With this patch, the functions will no longer be finalized in these cases.

The second item is not likely to matter much. The SQL functions all declare
return type OTHER, which would be usable as an input to any other function
that makes sense and that would be planned into an expression.

So, the main effect of this patch is the first item. To provide backwards
compatibility with anyone that was depending on the old behavior, the
patch adds a "sqlFinalizeOuterSketches" query context parameter that
restores the old behavior.

Other changes:

1) Move various argument-checking logic from runtime to planning time in
   DoublesSketchListArgBaseOperatorConversion, by adding an OperandTypeChecker.

2) Add various JsonIgnores to the sketches to simplify their JSON representations.

3) Allow chaining of ExpressionPostAggregators and other PostAggregators
   in the SQL layer.

4) Avoid unnecessary FieldAccessPostAggregator wrapping in the SQL layer,
   now that expressions can operate on complex inputs.

5) Adjust return type to thetaSketch (instead of OTHER) in
   ThetaSketchSetBaseOperatorConversion.

* Fix benchmark class.

* Fix compilation error.

* Fix ThetaSketchSqlAggregatorTest.

* Hopefully fix ITAutoCompactionTest.

* Adjustment to ITAutoCompactionTest.
This commit is contained in:
Gian Merlino 2022-11-03 09:43:00 -07:00 committed by GitHub
parent d1877e41ec
commit 8f90589ce5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
43 changed files with 1290 additions and 302 deletions

View File

@ -64,6 +64,7 @@ public class DataSketchesHllBenchmark
"hll",
null,
null,
null,
false
);

View File

@ -44,6 +44,7 @@
<And>
<Bug pattern="EQ_CHECK_FOR_OPERAND_NOT_COMPATIBLE_WITH_THIS"/>
<Or>
<Class name="org.apache.druid.jackson.DefaultTrueJsonIncludeFilter"/>
<Class name="org.apache.druid.query.scan.ScanQuery$ScanRowsLimitJsonIncludeFilter"/>
<Class name="org.apache.druid.query.scan.ScanQuery$ScanTimeOrderJsonIncludeFilter"/>
<Class name="org.apache.druid.query.scan.ScanQuery$BatchSizeJsonIncludeFilter"/>

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.jackson;
import com.fasterxml.jackson.annotation.JsonInclude;
/**
* {@link JsonInclude} filter for boolean values that default to true.
*
* This API works by "creative" use of equals. It requires warnings to be suppressed and also requires spotbugs
* exclusions (see spotbugs-exclude.xml).
*/
@SuppressWarnings({"EqualsAndHashcode", "EqualsHashCode"})
public class DefaultTrueJsonIncludeFilter // lgtm [java/inconsistent-equals-and-hashcode]
{
@Override
public boolean equals(Object obj)
{
return obj == null || (obj instanceof Boolean && (boolean) obj);
}
}

View File

@ -36,13 +36,14 @@ Configure Druid SQL query planning using the parameters in the table below.
|Parameter|Description|Default value|
|---------|-----------|-------------|
|`sqlQueryId`|Unique identifier given to this SQL query. For HTTP client, it will be returned in `X-Druid-SQL-Query-Id` header.<br/><br/>To specify a unique identifier for SQL query, use `sqlQueryId` instead of [`queryId`](query-context.md). Setting `queryId` for a SQL request has no effect. All native queries underlying SQL use an auto-generated `queryId`.|auto-generated|
|`sqlTimeZone`|Sets the time zone for this connection, which will affect how time functions and timestamp literals behave. Should be a time zone name like "America/Los_Angeles" or offset like "-08:00".|druid.sql.planner.sqlTimeZone on the Broker (default: UTC)|
|`sqlStringifyArrays`|When set to true, result columns which return array values will be serialized into a JSON string in the response instead of as an array (default: true, except for JDBC connections, where it is always false)|
|`useApproximateCountDistinct`|Whether to use an approximate cardinality algorithm for `COUNT(DISTINCT foo)`.|druid.sql.planner.useApproximateCountDistinct on the Broker (default: true)|
|`useGroupingSetForExactDistinct`|Whether to use grouping sets to execute queries with multiple exact distinct aggregations.|druid.sql.planner.useGroupingSetForExactDistinct on the Broker (default: false)|
|`useApproximateTopN`|Whether to use approximate [TopN queries](topnquery.md) when a SQL query could be expressed as such. If false, exact [GroupBy queries](groupbyquery.md) will be used instead.|druid.sql.planner.useApproximateTopN on the Broker (default: true)|
|`enableTimeBoundaryPlanning`|If true, SQL queries will get converted to TimeBoundary queries wherever possible. TimeBoundary queries are very efficient for min-max calculation on __time column in a datasource |druid.query.default.context.enableTimeBoundaryPlanning on the Broker (default: false)|
|`useNativeQueryExplain`|If true, `EXPLAIN PLAN FOR` will return the explain plan as a JSON representation of equivalent native query(s), else it will return the original version of explain plan generated by Calcite.|`druid.sql.planner.useNativeQueryExplain` on the Broker (default: true)|
|`sqlTimeZone`|Sets the time zone for this connection, which will affect how time functions and timestamp literals behave. Should be a time zone name like "America/Los_Angeles" or offset like "-08:00".|`druid.sql.planner.sqlTimeZone` on the Broker (default: UTC)|
|`sqlStringifyArrays`|When set to true, result columns which return array values will be serialized into a JSON string in the response instead of as an array|true, except for JDBC connections, where it is always false|
|`useApproximateCountDistinct`|Whether to use an approximate cardinality algorithm for `COUNT(DISTINCT foo)`.|`druid.sql.planner.useApproximateCountDistinct` on the Broker (default: true)|
|`useGroupingSetForExactDistinct`|Whether to use grouping sets to execute queries with multiple exact distinct aggregations.|`druid.sql.planner.useGroupingSetForExactDistinct` on the Broker (default: false)|
|`useApproximateTopN`|Whether to use approximate [TopN queries](topnquery.md) when a SQL query could be expressed as such. If false, exact [GroupBy queries](groupbyquery.md) will be used instead.|`druid.sql.planner.useApproximateTopN` on the Broker (default: true)|
|`enableTimeBoundaryPlanning`|If true, SQL queries will get converted to TimeBoundary queries wherever possible. TimeBoundary queries are very efficient for min-max calculation on __time column in a datasource |`druid.query.default.context.enableTimeBoundaryPlanning` on the Broker (default: false)|
|`useNativeQueryExplain`|If true, `EXPLAIN PLAN FOR` will return the explain plan as a JSON representation of equivalent native query(s), else it will return the original version of explain plan generated by Calcite.<br /><br />This property is provided for backwards compatibility. It is not recommended to use this parameter unless you were depending on the older behavior.|`druid.sql.planner.useNativeQueryExplain` on the Broker (default: true)|
|`sqlFinalizeOuterSketches`|If false (default behavior in Druid 25.0.0 and later), `DS_HLL`, `DS_THETA`, and `DS_QUANTILES_SKETCH` return sketches in query results, as documented. If true (default behavior in Druid 24.0.1 and earlier), sketches from these functions are finalized when they appear in query results.<br /><br />This property is provided for backwards compatibility with behavior in Druid 24.0.1 and earlier. It is not recommended to use this parameter unless you were depending on the older behavior. Instead, use a function that does not return a sketch, such as `APPROX_COUNT_DISTINCT_DS_HLL`, `APPROX_COUNT_DISTINCT_DS_THETA`, `APPROX_QUANTILE_DS`, `DS_THETA_ESTIMATE`, or `DS_GET_QUANTILE`.|`druid.query.default.context.sqlFinalizeOuterSketches` on the Broker (default: false)|
## Setting the query context
The query context parameters can be specified as a "context" object in the [JSON API](sql-api.md) or as a [JDBC connection properties object](sql-jdbc.md).

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.sql.calcite.planner.PlannerContext;
public class SketchQueryContext
{
public static final String CTX_FINALIZE_OUTER_SKETCHES = "sqlFinalizeOuterSketches";
public static final boolean DEFAULT_FINALIZE_OUTER_SKETCHES = false;
public static boolean isFinalizeOuterSketches(final PlannerContext plannerContext)
{
return QueryContexts.getAsBoolean(
CTX_FINALIZE_OUTER_SKETCHES,
plannerContext.queryContextMap().get(CTX_FINALIZE_OUTER_SKETCHES),
DEFAULT_FINALIZE_OUTER_SKETCHES
);
}
}

View File

@ -19,10 +19,12 @@
package org.apache.druid.query.aggregation.datasketches.hll;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.TgtHllType;
import org.apache.datasketches.hll.Union;
import org.apache.druid.jackson.DefaultTrueJsonIncludeFilter;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
@ -42,6 +44,7 @@ import java.util.Objects;
public abstract class HllSketchAggregatorFactory extends AggregatorFactory
{
public static final boolean DEFAULT_ROUND = false;
public static final boolean DEFAULT_SHOULD_FINALIZE = true;
public static final int DEFAULT_LG_K = 12;
public static final TgtHllType DEFAULT_TGT_HLL_TYPE = TgtHllType.HLL_4;
@ -52,6 +55,7 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
private final String fieldName;
private final int lgK;
private final TgtHllType tgtHllType;
private final boolean shouldFinalize;
private final boolean round;
HllSketchAggregatorFactory(
@ -59,6 +63,7 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
final String fieldName,
@Nullable final Integer lgK,
@Nullable final String tgtHllType,
final Boolean shouldFinalize,
final boolean round
)
{
@ -66,6 +71,7 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
this.fieldName = Objects.requireNonNull(fieldName);
this.lgK = lgK == null ? DEFAULT_LG_K : lgK;
this.tgtHllType = tgtHllType == null ? DEFAULT_TGT_HLL_TYPE : TgtHllType.valueOf(tgtHllType);
this.shouldFinalize = shouldFinalize == null ? DEFAULT_SHOULD_FINALIZE : shouldFinalize;
this.round = round;
}
@ -95,6 +101,14 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
}
@JsonProperty
@JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = DefaultTrueJsonIncludeFilter.class)
public boolean isShouldFinalize()
{
return shouldFinalize;
}
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public boolean isRound()
{
return round;
@ -114,7 +128,7 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
public List<AggregatorFactory> getRequiredColumns()
{
return Collections.singletonList(
new HllSketchBuildAggregatorFactory(fieldName, fieldName, lgK, tgtHllType.toString(), round)
new HllSketchBuildAggregatorFactory(fieldName, fieldName, lgK, tgtHllType.toString(), shouldFinalize, round)
);
}
@ -179,9 +193,10 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
@Override
public Object finalizeComputation(@Nullable final Object object)
{
if (object == null) {
return null;
if (!shouldFinalize || object == null) {
return object;
}
final HllSketch sketch = (HllSketch) object;
final double estimate = sketch.getEstimate();
@ -201,7 +216,14 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
@Override
public AggregatorFactory getCombiningFactory()
{
return new HllSketchMergeAggregatorFactory(getName(), getName(), getLgK(), getTgtHllType(), isRound());
return new HllSketchMergeAggregatorFactory(
getName(),
getName(),
getLgK(),
getTgtHllType(),
isShouldFinalize(),
isRound()
);
}
@Override
@ -212,51 +234,41 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
}
@Override
public boolean equals(final Object object)
public boolean equals(Object o)
{
if (this == object) {
if (this == o) {
return true;
}
if (object == null || !getClass().equals(object.getClass())) {
if (o == null || getClass() != o.getClass()) {
return false;
}
final HllSketchAggregatorFactory that = (HllSketchAggregatorFactory) object;
if (!name.equals(that.getName())) {
return false;
}
if (!fieldName.equals(that.getFieldName())) {
return false;
}
if (lgK != that.getLgK()) {
return false;
}
if (!tgtHllType.equals(that.tgtHllType)) {
return false;
}
if (round != that.round) {
return false;
}
return true;
HllSketchAggregatorFactory that = (HllSketchAggregatorFactory) o;
return lgK == that.lgK
&& shouldFinalize == that.shouldFinalize
&& round == that.round
&& Objects.equals(name, that.name)
&& Objects.equals(fieldName, that.fieldName)
&& tgtHllType == that.tgtHllType;
}
@Override
public int hashCode()
{
return Objects.hash(name, fieldName, lgK, tgtHllType);
return Objects.hash(name, fieldName, lgK, tgtHllType, shouldFinalize, round);
}
@Override
public String toString()
{
return getClass().getSimpleName() + " {"
+ " name=" + name
+ ", fieldName=" + fieldName
+ ", lgK=" + lgK
+ ", tgtHllType=" + tgtHllType
+ ", round=" + round
+ " }";
return getClass().getSimpleName() + "{" +
"name='" + name + '\'' +
", fieldName='" + fieldName + '\'' +
", lgK=" + lgK +
", tgtHllType=" + tgtHllType +
(shouldFinalize != DEFAULT_SHOULD_FINALIZE ? ", shouldFinalize=" + shouldFinalize : "") +
(round != DEFAULT_ROUND ? ", round=" + round : "") +
'}';
}
protected abstract byte getCacheTypeId();
}

View File

@ -53,10 +53,11 @@ public class HllSketchBuildAggregatorFactory extends HllSketchAggregatorFactory
@JsonProperty("fieldName") final String fieldName,
@JsonProperty("lgK") @Nullable final Integer lgK,
@JsonProperty("tgtHllType") @Nullable final String tgtHllType,
@JsonProperty("shouldFinalize") final Boolean shouldFinalize,
@JsonProperty("round") final boolean round
)
{
super(name, fieldName, lgK, tgtHllType, round);
super(name, fieldName, lgK, tgtHllType, shouldFinalize, round);
}
@ -125,7 +126,14 @@ public class HllSketchBuildAggregatorFactory extends HllSketchAggregatorFactory
@Override
public AggregatorFactory withName(String newName)
{
return new HllSketchBuildAggregatorFactory(newName, getFieldName(), getLgK(), getTgtHllType(), isRound());
return new HllSketchBuildAggregatorFactory(
newName,
getFieldName(),
getLgK(),
getTgtHllType(),
isShouldFinalize(),
isRound()
);
}
private void validateInputs(@Nullable ColumnCapabilities capabilities)

View File

@ -52,10 +52,11 @@ public class HllSketchMergeAggregatorFactory extends HllSketchAggregatorFactory
@JsonProperty("fieldName") final String fieldName,
@JsonProperty("lgK") @Nullable final Integer lgK,
@JsonProperty("tgtHllType") @Nullable final String tgtHllType,
@JsonProperty("shouldFinalize") final Boolean shouldFinalize,
@JsonProperty("round") final boolean round
)
{
super(name, fieldName, lgK, tgtHllType, round);
super(name, fieldName, lgK, tgtHllType, shouldFinalize, round);
}
@Override
@ -64,16 +65,19 @@ public class HllSketchMergeAggregatorFactory extends HllSketchAggregatorFactory
if (other.getName().equals(this.getName()) && other instanceof HllSketchMergeAggregatorFactory) {
HllSketchMergeAggregatorFactory castedOther = (HllSketchMergeAggregatorFactory) other;
return new HllSketchMergeAggregatorFactory(
getName(),
getName(),
Math.max(getLgK(), castedOther.getLgK()),
getTgtHllType().compareTo(castedOther.getTgtHllType()) < 0 ? castedOther.getTgtHllType() : getTgtHllType(),
isRound() || castedOther.isRound()
);
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
if (castedOther.isShouldFinalize() == isShouldFinalize()) {
return new HllSketchMergeAggregatorFactory(
getName(),
getName(),
Math.max(getLgK(), castedOther.getLgK()),
getTgtHllType().compareTo(castedOther.getTgtHllType()) < 0 ? castedOther.getTgtHllType() : getTgtHllType(),
isShouldFinalize(),
isRound() || castedOther.isRound()
);
}
}
throw new AggregatorFactoryNotMergeableException(this, other);
}
@Override
@ -134,7 +138,14 @@ public class HllSketchMergeAggregatorFactory extends HllSketchAggregatorFactory
@Override
public AggregatorFactory withName(String newName)
{
return new HllSketchMergeAggregatorFactory(newName, getFieldName(), getLgK(), getTgtHllType(), isRound());
return new HllSketchMergeAggregatorFactory(
newName,
getFieldName(),
getLgK(),
getTgtHllType(),
isShouldFinalize(),
isRound()
);
}
}

View File

@ -40,6 +40,11 @@ public class HllSketchApproxCountDistinctSqlAggregator extends HllSketchBaseSqlA
private static final SqlAggFunction FUNCTION_INSTANCE = new HllSketchApproxCountDistinctSqlAggFunction();
public HllSketchApproxCountDistinctSqlAggregator()
{
super(true);
}
@Override
public SqlAggFunction calciteFunction()
{

View File

@ -28,6 +28,7 @@ import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.SketchQueryContext;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchMergeAggregatorFactory;
@ -51,6 +52,13 @@ public abstract class HllSketchBaseSqlAggregator implements SqlAggregator
{
private static final boolean ROUND = true;
private final boolean finalizeSketch;
protected HllSketchBaseSqlAggregator(boolean finalizeSketch)
{
this.finalizeSketch = finalizeSketch;
}
@Nullable
@Override
public Aggregation toDruidAggregation(
@ -118,12 +126,15 @@ public abstract class HllSketchBaseSqlAggregator implements SqlAggregator
final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name;
if (columnArg.isDirectColumnAccess()
&& rowSignature.getColumnType(columnArg.getDirectColumn()).map(type -> type.is(ValueType.COMPLEX)).orElse(false)) {
&& rowSignature.getColumnType(columnArg.getDirectColumn())
.map(type -> type.is(ValueType.COMPLEX))
.orElse(false)) {
aggregatorFactory = new HllSketchMergeAggregatorFactory(
aggregatorName,
columnArg.getDirectColumn(),
logK,
tgtHllType,
finalizeSketch || SketchQueryContext.isFinalizeOuterSketches(plannerContext),
ROUND
);
} else {
@ -154,6 +165,7 @@ public abstract class HllSketchBaseSqlAggregator implements SqlAggregator
dimensionSpec.getDimension(),
logK,
tgtHllType,
finalizeSketch || SketchQueryContext.isFinalizeOuterSketches(plannerContext),
ROUND
);
}

View File

@ -81,7 +81,8 @@ public class HllSketchEstimateOperatorConversion implements SqlOperatorConversio
plannerContext,
rowSignature,
operands.get(0),
postAggregatorVisitor
postAggregatorVisitor,
true
);
if (firstOperand == null) {

View File

@ -80,7 +80,8 @@ public class HllSketchEstimateWithErrorBoundsOperatorConversion implements SqlOp
plannerContext,
rowSignature,
operands.get(0),
postAggregatorVisitor
postAggregatorVisitor,
true
);
if (firstOperand == null) {

View File

@ -38,6 +38,11 @@ public class HllSketchObjectSqlAggregator extends HllSketchBaseSqlAggregator imp
private static final SqlAggFunction FUNCTION_INSTANCE = new HllSketchSqlAggFunction();
private static final String NAME = "DS_HLL";
public HllSketchObjectSqlAggregator()
{
super(false);
}
@Override
public SqlAggFunction calciteFunction()
{

View File

@ -94,7 +94,8 @@ public class HllSketchSetUnionOperatorConversion implements SqlOperatorConversio
plannerContext,
rowSignature,
operand,
postAggregatorVisitor
postAggregatorVisitor,
true
);
if (convertedPostAgg == null) {
if (operandCounter == 0) {

View File

@ -77,7 +77,8 @@ public class HllSketchToStringOperatorConversion implements SqlOperatorConversio
plannerContext,
rowSignature,
operands.get(0),
postAggregatorVisitor
postAggregatorVisitor,
true
);
if (firstOperand == null) {

View File

@ -20,11 +20,13 @@
package org.apache.druid.query.aggregation.datasketches.quantiles;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import org.apache.datasketches.Util;
import org.apache.datasketches.quantiles.DoublesSketch;
import org.apache.datasketches.quantiles.DoublesUnion;
import org.apache.druid.jackson.DefaultTrueJsonIncludeFilter;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.Aggregator;
@ -62,6 +64,7 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory
Comparator.nullsFirst(Comparator.comparingLong(DoublesSketch::getN));
public static final int DEFAULT_K = 128;
public static final boolean DEFAULT_SHOULD_FINALIZE = true;
// Used for sketch size estimation.
public static final long DEFAULT_MAX_STREAM_LENGTH = 1_000_000_000;
@ -70,6 +73,7 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory
private final String fieldName;
private final int k;
private final long maxStreamLength;
private final boolean shouldFinalize;
private final byte cacheTypeId;
@JsonCreator
@ -77,10 +81,18 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory
@JsonProperty("name") final String name,
@JsonProperty("fieldName") final String fieldName,
@JsonProperty("k") @Nullable final Integer k,
@JsonProperty("maxStreamLength") @Nullable final Long maxStreamLength
@JsonProperty("maxStreamLength") @Nullable final Long maxStreamLength,
@JsonProperty("shouldFinalize") @Nullable final Boolean shouldFinalize
)
{
this(name, fieldName, k, maxStreamLength, AggregatorUtil.QUANTILES_DOUBLES_SKETCH_BUILD_CACHE_TYPE_ID);
this(
name,
fieldName,
k,
maxStreamLength,
shouldFinalize,
AggregatorUtil.QUANTILES_DOUBLES_SKETCH_BUILD_CACHE_TYPE_ID
);
}
@VisibleForTesting
@ -90,7 +102,7 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory
@Nullable final Integer k
)
{
this(name, fieldName, k, null);
this(name, fieldName, k, null, DEFAULT_SHOULD_FINALIZE);
}
DoublesSketchAggregatorFactory(
@ -98,6 +110,7 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory
final String fieldName,
@Nullable final Integer k,
@Nullable final Long maxStreamLength,
@Nullable final Boolean shouldFinalize,
final byte cacheTypeId
)
{
@ -112,6 +125,7 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory
this.k = k == null ? DEFAULT_K : k;
Util.checkIfPowerOf2(this.k, "k");
this.maxStreamLength = maxStreamLength == null ? DEFAULT_MAX_STREAM_LENGTH : maxStreamLength;
this.shouldFinalize = shouldFinalize == null ? DEFAULT_SHOULD_FINALIZE : shouldFinalize;
this.cacheTypeId = cacheTypeId;
}
@ -292,6 +306,13 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory
return maxStreamLength;
}
@JsonProperty
@JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = DefaultTrueJsonIncludeFilter.class)
public boolean isShouldFinalize()
{
return shouldFinalize;
}
@Override
public List<String> requiredFields()
{
@ -307,7 +328,14 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory
@Override
public AggregatorFactory withName(String newName)
{
return new DoublesSketchAggregatorFactory(newName, getFieldName(), getK(), getMaxStreamLength(), cacheTypeId);
return new DoublesSketchAggregatorFactory(
newName,
getFieldName(),
getK(),
getMaxStreamLength(),
shouldFinalize,
cacheTypeId
);
}
// Quantiles sketches never stop growing, but they do so very slowly.
@ -327,7 +355,8 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory
fieldName,
fieldName,
k,
maxStreamLength
maxStreamLength,
shouldFinalize
)
);
}
@ -335,31 +364,40 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory
@Override
public AggregatorFactory getCombiningFactory()
{
return new DoublesSketchMergeAggregatorFactory(name, k, maxStreamLength);
return new DoublesSketchMergeAggregatorFactory(name, k, maxStreamLength, shouldFinalize);
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && other instanceof DoublesSketchAggregatorFactory) {
// DoublesUnion supports inputs with different k.
// The result will have effective k between the specified k and the minimum k from all input sketches
// to achieve higher accuracy as much as possible.
return new DoublesSketchMergeAggregatorFactory(
name,
Math.max(k, ((DoublesSketchAggregatorFactory) other).k),
maxStreamLength
);
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
final DoublesSketchAggregatorFactory castedOther = (DoublesSketchAggregatorFactory) other;
if (castedOther.shouldFinalize == shouldFinalize) {
// DoublesUnion supports inputs with different k.
// The result will have effective k between the specified k and the minimum k from all input sketches
// to achieve higher accuracy as much as possible.
return new DoublesSketchMergeAggregatorFactory(
name,
Math.max(k, castedOther.k),
Math.max(maxStreamLength, castedOther.maxStreamLength),
shouldFinalize
);
}
}
throw new AggregatorFactoryNotMergeableException(this, other);
}
@Nullable
@Override
public Object finalizeComputation(@Nullable final Object object)
{
return object == null ? null : ((DoublesSketch) object).getN();
if (!shouldFinalize || object == null) {
return object;
}
return ((DoublesSketch) object).getN();
}
/**
@ -394,8 +432,11 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory
return false;
}
DoublesSketchAggregatorFactory that = (DoublesSketchAggregatorFactory) o;
// no need to use cacheTypeId here
return k == that.k
&& maxStreamLength == that.maxStreamLength
&& shouldFinalize == that.shouldFinalize
&& name.equals(that.name)
&& fieldName.equals(that.fieldName);
}
@ -403,7 +444,8 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory
@Override
public int hashCode()
{
return Objects.hash(name, fieldName, k, maxStreamLength); // no need to use cacheTypeId here
// no need to use cacheTypeId here
return Objects.hash(name, fieldName, k, maxStreamLength, shouldFinalize);
}
@Override

View File

@ -40,10 +40,11 @@ public class DoublesSketchMergeAggregatorFactory extends DoublesSketchAggregator
public DoublesSketchMergeAggregatorFactory(
@JsonProperty("name") final String name,
@JsonProperty("k") @Nullable final Integer k,
@JsonProperty("maxStreamLength") @Nullable final Long maxStreamLength
@JsonProperty("maxStreamLength") @Nullable final Long maxStreamLength,
@JsonProperty("shouldFinalize") @Nullable final Boolean shouldFinalize
)
{
super(name, name, k, maxStreamLength, AggregatorUtil.QUANTILES_DOUBLES_SKETCH_MERGE_CACHE_TYPE_ID);
super(name, name, k, maxStreamLength, shouldFinalize, AggregatorUtil.QUANTILES_DOUBLES_SKETCH_MERGE_CACHE_TYPE_ID);
}
@VisibleForTesting
@ -52,7 +53,7 @@ public class DoublesSketchMergeAggregatorFactory extends DoublesSketchAggregator
@Nullable final Integer k
)
{
this(name, k, null);
this(name, k, null, null);
}
@Override
@ -78,6 +79,6 @@ public class DoublesSketchMergeAggregatorFactory extends DoublesSketchAggregator
@Override
public AggregatorFactory withName(String newName)
{
return new DoublesSketchMergeAggregatorFactory(newName, getK(), getMaxStreamLength());
return new DoublesSketchMergeAggregatorFactory(newName, getK(), getMaxStreamLength(), isShouldFinalize());
}
}

View File

@ -172,7 +172,8 @@ public class DoublesSketchApproxQuantileSqlAggregator implements SqlAggregator
histogramName,
input.getDirectColumn(),
k,
getMaxStreamLengthFromQueryContext(plannerContext.queryContext())
getMaxStreamLengthFromQueryContext(plannerContext.queryContext()),
true
);
} else {
String virtualColumnName = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
@ -183,7 +184,8 @@ public class DoublesSketchApproxQuantileSqlAggregator implements SqlAggregator
histogramName,
virtualColumnName,
k,
getMaxStreamLengthFromQueryContext(plannerContext.queryContext())
getMaxStreamLengthFromQueryContext(plannerContext.queryContext()),
true
);
}

View File

@ -19,17 +19,25 @@
package org.apache.druid.query.aggregation.datasketches.quantiles.sql;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlCallBinding;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperandCountRange;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.SqlUtil;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlOperandCountRanges;
import org.apache.calcite.sql.type.SqlOperandTypeChecker;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Static;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.expression.DruidExpression;
@ -72,47 +80,24 @@ public abstract class DoublesSketchListArgBaseOperatorConversion implements SqlO
{
final List<RexNode> operands = ((RexCall) rexNode).getOperands();
final double[] args = new double[operands.size() - 1];
PostAggregator inputSketchPostAgg = null;
int operandCounter = 0;
for (RexNode operand : operands) {
final PostAggregator convertedPostAgg = OperatorConversions.toPostAggregator(
plannerContext,
rowSignature,
operand,
postAggregatorVisitor
);
if (convertedPostAgg == null) {
if (operandCounter > 0) {
try {
if (!operand.isA(SqlKind.LITERAL)) {
return null;
}
double arg = ((Number) RexLiteral.value(operand)).doubleValue();
args[operandCounter - 1] = arg;
}
catch (ClassCastException cce) {
return null;
}
} else {
return null;
}
} else {
if (operandCounter == 0) {
inputSketchPostAgg = convertedPostAgg;
} else {
if (!operand.isA(SqlKind.LITERAL)) {
return null;
}
}
}
operandCounter++;
}
final PostAggregator inputSketchPostAgg = OperatorConversions.toPostAggregator(
plannerContext,
rowSignature,
operands.get(0),
postAggregatorVisitor,
true
);
if (inputSketchPostAgg == null) {
return null;
}
for (int i = 1; i < operands.size(); i++) {
RexNode operand = operands.get(i);
double arg = ((Number) RexLiteral.value(operand)).doubleValue();
args[i - 1] = arg;
}
return makePostAgg(
postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(),
inputSketchPostAgg,
@ -129,7 +114,7 @@ public abstract class DoublesSketchListArgBaseOperatorConversion implements SqlO
factory -> Calcites.createSqlType(factory, SqlTypeName.OTHER)
),
null,
OperandTypes.variadic(SqlOperandCountRanges.from(2)),
new DoublesSketchListArgOperandTypeChecker(),
SqlFunctionCategory.USER_DEFINED_FUNCTION
);
}
@ -141,4 +126,68 @@ public abstract class DoublesSketchListArgBaseOperatorConversion implements SqlO
PostAggregator field,
double[] args
);
/**
* Minimum 2 arguments. 2nd and further arguments must be literal numbers.
*/
private static class DoublesSketchListArgOperandTypeChecker implements SqlOperandTypeChecker
{
private static final int REQUIRED_OPERANDS = 2;
@Override
public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure)
{
for (int i = 1; i < callBinding.operands().size(); i++) {
final SqlNode operand = callBinding.operands().get(i);
final RelDataType operandType = callBinding.getValidator().deriveType(callBinding.getScope(), operand);
// Verify that 'operand' is a literal number.
if (!SqlUtil.isLiteral(operand)) {
return OperatorConversions.throwOrReturn(
throwOnFailure,
callBinding,
cb -> cb.getValidator()
.newValidationError(
operand,
Static.RESOURCE.argumentMustBeLiteral(callBinding.getOperator().getName())
)
);
}
if (!SqlTypeFamily.NUMERIC.contains(operandType)) {
return OperatorConversions.throwOrReturn(
throwOnFailure,
callBinding,
SqlCallBinding::newValidationSignatureError
);
}
}
return true;
}
@Override
public SqlOperandCountRange getOperandCountRange()
{
return SqlOperandCountRanges.from(REQUIRED_OPERANDS);
}
@Override
public String getAllowedSignatures(SqlOperator op, String opName)
{
return StringUtils.format("'%s(sketch, arg1, [arg2, ...])'", opName);
}
@Override
public Consistency getConsistency()
{
return Consistency.NONE;
}
@Override
public boolean isOptional(int i)
{
return i + 1 > REQUIRED_OPERANDS;
}
}
}

View File

@ -34,6 +34,7 @@ import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.SketchQueryContext;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
@ -114,7 +115,8 @@ public class DoublesSketchObjectSqlAggregator implements SqlAggregator
histogramName,
input.getDirectColumn(),
k,
DoublesSketchApproxQuantileSqlAggregator.getMaxStreamLengthFromQueryContext(plannerContext.queryContext())
DoublesSketchApproxQuantileSqlAggregator.getMaxStreamLengthFromQueryContext(plannerContext.queryContext()),
SketchQueryContext.isFinalizeOuterSketches(plannerContext)
);
} else {
String virtualColumnName = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
@ -125,7 +127,8 @@ public class DoublesSketchObjectSqlAggregator implements SqlAggregator
histogramName,
virtualColumnName,
k,
DoublesSketchApproxQuantileSqlAggregator.getMaxStreamLengthFromQueryContext(plannerContext.queryContext())
DoublesSketchApproxQuantileSqlAggregator.getMaxStreamLengthFromQueryContext(plannerContext.queryContext()),
SketchQueryContext.isFinalizeOuterSketches(plannerContext)
);
}

View File

@ -74,7 +74,8 @@ public abstract class DoublesSketchSingleArgBaseOperatorConversion implements Sq
plannerContext,
rowSignature,
operands.get(0),
postAggregatorVisitor
postAggregatorVisitor,
true
);
if (firstOperand == null) {

View File

@ -77,7 +77,8 @@ public class DoublesSketchSummaryOperatorConversion implements SqlOperatorConver
plannerContext,
rowSignature,
operands.get(0),
postAggregatorVisitor
postAggregatorVisitor,
true
);
if (firstOperand == null) {

View File

@ -20,6 +20,7 @@
package org.apache.druid.query.aggregation.datasketches.theta;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Ordering;
@ -32,7 +33,6 @@ import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
@ -122,6 +122,7 @@ public class SketchEstimatePostAggregator implements PostAggregator
@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public Integer getErrorBoundsStdDev()
{
return errorBoundsStdDev;

View File

@ -20,7 +20,9 @@
package org.apache.druid.query.aggregation.datasketches.theta;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.jackson.DefaultTrueJsonIncludeFilter;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import org.apache.druid.query.aggregation.AggregatorUtil;
@ -94,12 +96,14 @@ public class SketchMergeAggregatorFactory extends SketchAggregatorFactory
}
@JsonProperty
@JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = DefaultTrueJsonIncludeFilter.class)
public boolean getShouldFinalize()
{
return shouldFinalize;
}
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public boolean getIsInputThetaSketch()
{
return isInputThetaSketch;
@ -107,6 +111,7 @@ public class SketchMergeAggregatorFactory extends SketchAggregatorFactory
@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public Integer getErrorBoundsStdDev()
{
return errorBoundsStdDev;

View File

@ -40,6 +40,11 @@ public class ThetaSketchApproxCountDistinctSqlAggregator extends ThetaSketchBase
private static final SqlAggFunction FUNCTION_INSTANCE = new ThetaSketchSqlAggFunction();
public ThetaSketchApproxCountDistinctSqlAggregator()
{
super(true);
}
@Override
public SqlAggFunction calciteFunction()
{

View File

@ -28,6 +28,7 @@ import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.SketchQueryContext;
import org.apache.druid.query.aggregation.datasketches.theta.SketchAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.theta.SketchMergeAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
@ -48,6 +49,13 @@ import java.util.List;
public abstract class ThetaSketchBaseSqlAggregator implements SqlAggregator
{
private final boolean finalizeSketch;
protected ThetaSketchBaseSqlAggregator(boolean finalizeSketch)
{
this.finalizeSketch = finalizeSketch;
}
@Nullable
@Override
public Aggregation toDruidAggregation(
@ -97,12 +105,14 @@ public abstract class ThetaSketchBaseSqlAggregator implements SqlAggregator
final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name;
if (columnArg.isDirectColumnAccess()
&& rowSignature.getColumnType(columnArg.getDirectColumn()).map(type -> type.is(ValueType.COMPLEX)).orElse(false)) {
&& rowSignature.getColumnType(columnArg.getDirectColumn())
.map(type -> type.is(ValueType.COMPLEX))
.orElse(false)) {
aggregatorFactory = new SketchMergeAggregatorFactory(
aggregatorName,
columnArg.getDirectColumn(),
sketchSize,
null,
finalizeSketch || SketchQueryContext.isFinalizeOuterSketches(plannerContext),
null,
null
);
@ -133,7 +143,7 @@ public abstract class ThetaSketchBaseSqlAggregator implements SqlAggregator
aggregatorName,
dimensionSpec.getDimension(),
sketchSize,
null,
finalizeSketch || SketchQueryContext.isFinalizeOuterSketches(plannerContext),
null,
null
);

View File

@ -77,7 +77,8 @@ public class ThetaSketchEstimateOperatorConversion implements SqlOperatorConvers
plannerContext,
rowSignature,
operands.get(0),
postAggregatorVisitor
postAggregatorVisitor,
true
);
if (firstOperand == null) {

View File

@ -79,7 +79,8 @@ public class ThetaSketchEstimateWithErrorBoundsOperatorConversion implements Sql
plannerContext,
rowSignature,
operands.get(0),
postAggregatorVisitor
postAggregatorVisitor,
true
);
if (firstOperand == null) {

View File

@ -24,9 +24,7 @@ import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
@ -38,6 +36,11 @@ public class ThetaSketchObjectSqlAggregator extends ThetaSketchBaseSqlAggregator
private static final SqlAggFunction FUNCTION_INSTANCE = new ThetaSketchObjectSqlAggFunction();
private static final String NAME = "DS_THETA";
public ThetaSketchObjectSqlAggregator()
{
super(false);
}
@Override
public SqlAggFunction calciteFunction()
{
@ -67,7 +70,7 @@ public class ThetaSketchObjectSqlAggregator extends ThetaSketchBaseSqlAggregator
NAME,
null,
SqlKind.OTHER_FUNCTION,
ReturnTypes.explicit(SqlTypeName.OTHER),
ThetaSketchSqlOperators.RETURN_TYPE_INFERENCE,
InferTypes.VARCHAR_1024,
OperandTypes.or(
OperandTypes.ANY,

View File

@ -27,9 +27,8 @@ import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlOperandCountRanges;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.datasketches.theta.SketchSetPostAggregator;
@ -38,7 +37,6 @@ import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nullable;
@ -81,31 +79,24 @@ public abstract class ThetaSketchSetBaseOperatorConversion implements SqlOperato
final List<PostAggregator> inputPostAggs = new ArrayList<>();
Integer size = null;
int operandCounter = 0;
for (RexNode operand : operands) {
final PostAggregator convertedPostAgg = OperatorConversions.toPostAggregator(
plannerContext,
rowSignature,
operand,
postAggregatorVisitor
);
if (convertedPostAgg == null) {
if (operandCounter == 0) {
try {
if (!operand.isA(SqlKind.LITERAL)) {
return null;
}
size = RexLiteral.intValue(operand);
}
catch (ClassCastException cce) {
return null;
}
} else {
return null;
}
for (int i = 0; i < operands.size(); i++) {
RexNode operand = operands.get(i);
if (i == 0 && operand.isA(SqlKind.LITERAL) && SqlTypeFamily.INTEGER.contains(operand.getType())) {
size = RexLiteral.intValue(operand);
} else {
inputPostAggs.add(convertedPostAgg);
operandCounter++;
final PostAggregator convertedPostAgg = OperatorConversions.toPostAggregator(
plannerContext,
rowSignature,
operand,
postAggregatorVisitor,
true
);
if (convertedPostAgg == null) {
return null;
} else {
inputPostAggs.add(convertedPostAgg);
}
}
}
@ -122,9 +113,7 @@ public abstract class ThetaSketchSetBaseOperatorConversion implements SqlOperato
return new SqlFunction(
getFunctionName(),
SqlKind.OTHER_FUNCTION,
ReturnTypes.explicit(
factory -> Calcites.createSqlType(factory, SqlTypeName.OTHER)
),
ThetaSketchSqlOperators.RETURN_TYPE_INFERENCE,
null,
OperandTypes.variadic(SqlOperandCountRanges.from(2)),
SqlFunctionCategory.USER_DEFINED_FUNCTION

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.theta.sql;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.druid.query.aggregation.datasketches.theta.SketchModule;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.sql.calcite.table.RowSignatures;
public class ThetaSketchSqlOperators
{
public static final SqlReturnTypeInference RETURN_TYPE_INFERENCE =
opBinding -> RowSignatures.makeComplexType(
opBinding.getTypeFactory(),
ColumnType.ofComplex(SketchModule.THETA_SKETCH),
true
);
}

View File

@ -45,6 +45,7 @@ import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class HllSketchAggregatorFactoryTest
@ -80,6 +81,7 @@ public class HllSketchAggregatorFactoryTest
Assert.assertEquals(FIELD_NAME, aggregatorFactory.getFieldName());
Assert.assertEquals(LG_K, aggregatorFactory.getLgK());
Assert.assertEquals(TGT_HLL_TYPE, aggregatorFactory.getTgtHllType());
Assert.assertEquals(HllSketchAggregatorFactory.DEFAULT_SHOULD_FINALIZE, aggregatorFactory.isShouldFinalize());
Assert.assertEquals(ROUND, aggregatorFactory.isRound());
}
@ -236,8 +238,13 @@ public class HllSketchAggregatorFactoryTest
.collect(Collectors.toList());
for (Field field : toStringFields) {
String expectedToken = formatFieldForToString(field);
Assert.assertTrue("Missing \"" + expectedToken + "\"", string.contains(expectedToken));
if ("shouldFinalize".equals(field.getName())) {
// Skip; not included in the toString if it has the default value.
continue;
}
Pattern expectedPattern = testPatternForToString(field);
Assert.assertTrue("Missing \"" + field.getName() + "\"", expectedPattern.matcher(string).find());
}
}
@ -256,6 +263,7 @@ public class HllSketchAggregatorFactoryTest
"col",
null,
null,
null,
false
),
new HllSketchBuildAggregatorFactory(
@ -263,6 +271,7 @@ public class HllSketchAggregatorFactoryTest
"col",
null,
null,
null,
true
),
new HllSketchMergeAggregatorFactory(
@ -270,6 +279,7 @@ public class HllSketchAggregatorFactoryTest
"col",
null,
null,
null,
false
),
new HllSketchMergeAggregatorFactory(
@ -277,6 +287,7 @@ public class HllSketchAggregatorFactoryTest
"col",
null,
null,
null,
true
)
)
@ -319,9 +330,9 @@ public class HllSketchAggregatorFactoryTest
return Modifier.isPrivate(modfiers) && !Modifier.isStatic(modfiers) && Modifier.isFinal(modfiers);
}
private static String formatFieldForToString(Field field)
private static Pattern testPatternForToString(Field field)
{
return " " + field.getName() + "=";
return Pattern.compile("\\b" + Pattern.quote(field.getName()) + "=");
}
// Helper for testing abstract base class
@ -341,7 +352,7 @@ public class HllSketchAggregatorFactoryTest
boolean round
)
{
super(name, fieldName, lgK, tgtHllType, round);
super(name, fieldName, lgK, tgtHllType, null, round);
}
@Override

View File

@ -254,7 +254,7 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest
.setGranularity(Granularities.ALL)
.setInterval(Intervals.ETERNITY)
.setAggregatorSpecs(
new HllSketchMergeAggregatorFactory("sketch", "sketch", null, null, false)
new HllSketchMergeAggregatorFactory("sketch", "sketch", null, null, null, false)
)
.setPostAggregatorSpecs(
ImmutableList.of(

View File

@ -31,6 +31,7 @@ public class HllSketchMergeAggregatorFactoryTest
private static final String FIELD_NAME = "fieldName";
private static final int LG_K = 2;
private static final String TGT_HLL_TYPE = TgtHllType.HLL_6.name();
private static final boolean SHOULD_FINALIZE = true;
private static final boolean ROUND = true;
private HllSketchMergeAggregatorFactory targetRound;
@ -39,8 +40,8 @@ public class HllSketchMergeAggregatorFactoryTest
@Before
public void setUp()
{
targetRound = new HllSketchMergeAggregatorFactory(NAME, FIELD_NAME, LG_K, TGT_HLL_TYPE, ROUND);
targetNoRound = new HllSketchMergeAggregatorFactory(NAME, FIELD_NAME, LG_K, TGT_HLL_TYPE, !ROUND);
targetRound = new HllSketchMergeAggregatorFactory(NAME, FIELD_NAME, LG_K, TGT_HLL_TYPE, SHOULD_FINALIZE, ROUND);
targetNoRound = new HllSketchMergeAggregatorFactory(NAME, FIELD_NAME, LG_K, TGT_HLL_TYPE, SHOULD_FINALIZE, !ROUND);
}
@Test(expected = AggregatorFactoryNotMergeableException.class)
@ -51,6 +52,7 @@ public class HllSketchMergeAggregatorFactoryTest
FIELD_NAME,
LG_K,
TGT_HLL_TYPE,
SHOULD_FINALIZE,
ROUND
);
targetRound.getMergingFactory(other);
@ -64,6 +66,7 @@ public class HllSketchMergeAggregatorFactoryTest
FIELD_NAME,
LG_K,
TGT_HLL_TYPE,
SHOULD_FINALIZE,
ROUND
);
targetRound.getMergingFactory(other);
@ -78,6 +81,7 @@ public class HllSketchMergeAggregatorFactoryTest
FIELD_NAME,
smallerLgK,
TGT_HLL_TYPE,
SHOULD_FINALIZE,
ROUND
);
HllSketchAggregatorFactory result = (HllSketchAggregatorFactory) targetRound.getMergingFactory(other);
@ -93,6 +97,7 @@ public class HllSketchMergeAggregatorFactoryTest
FIELD_NAME,
largerLgK,
TGT_HLL_TYPE,
SHOULD_FINALIZE,
ROUND
);
HllSketchAggregatorFactory result = (HllSketchAggregatorFactory) targetRound.getMergingFactory(other);
@ -108,6 +113,7 @@ public class HllSketchMergeAggregatorFactoryTest
FIELD_NAME,
LG_K,
smallerTgtHllType,
SHOULD_FINALIZE,
ROUND
);
HllSketchAggregatorFactory result = (HllSketchAggregatorFactory) targetRound.getMergingFactory(other);
@ -123,6 +129,7 @@ public class HllSketchMergeAggregatorFactoryTest
FIELD_NAME,
LG_K,
largerTgtHllType,
SHOULD_FINALIZE,
ROUND
);
HllSketchAggregatorFactory result = (HllSketchAggregatorFactory) targetRound.getMergingFactory(other);

View File

@ -93,6 +93,7 @@ public class HllSketchToEstimatePostAggregatorTest
"col",
null,
null,
null,
false
)
)

View File

@ -36,6 +36,7 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.SketchQueryContext;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchMergeAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchModule;
@ -107,6 +108,7 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
"dim1",
null,
null,
false,
ROUND
)
)
@ -209,6 +211,7 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
"dim2",
null,
null,
null,
ROUND
),
new FilteredAggregatorFactory(
@ -217,6 +220,7 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
"dim2",
null,
null,
null,
ROUND
),
BaseCalciteQueryTest.not(BaseCalciteQueryTest.selector("dim2", "", null))
@ -226,6 +230,7 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
"v0",
null,
null,
null,
ROUND
),
new HllSketchBuildAggregatorFactory(
@ -233,10 +238,11 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
"v1",
null,
null,
null,
ROUND
),
new HllSketchMergeAggregatorFactory("a5", "hllsketch_dim1", 21, "HLL_8", ROUND),
new HllSketchMergeAggregatorFactory("a6", "hllsketch_dim1", null, null, ROUND)
new HllSketchMergeAggregatorFactory("a5", "hllsketch_dim1", 21, "HLL_8", null, ROUND),
new HllSketchMergeAggregatorFactory("a6", "hllsketch_dim1", null, null, null, ROUND)
)
)
.context(QUERY_CONTEXT_DEFAULT)
@ -284,6 +290,7 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
"cnt",
null,
null,
null,
ROUND
)
)
@ -361,7 +368,7 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
.setGranularity(Granularities.ALL)
.setAggregatorSpecs(
aggregators(
new HllSketchBuildAggregatorFactory("a0", "m1", null, null, true)
new HllSketchBuildAggregatorFactory("a0", "m1", null, null, null, true)
)
)
.setHavingSpec(having(selector("a0", "2", null)))
@ -445,6 +452,7 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
"dim2",
null,
null,
false,
true
),
new HllSketchBuildAggregatorFactory(
@ -452,6 +460,7 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
"m1",
null,
null,
false,
true
),
new HllSketchBuildAggregatorFactory(
@ -459,6 +468,7 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
"v0",
null,
null,
false,
true
),
new HllSketchBuildAggregatorFactory(
@ -466,6 +476,7 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
"v1",
null,
null,
false,
true
),
new HllSketchBuildAggregatorFactory(
@ -473,39 +484,37 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
"dim2",
null,
null,
null,
true
)
)
)
.postAggregators(
ImmutableList.of(
new FieldAccessPostAggregator("p0", "a0"),
new FieldAccessPostAggregator("p1", "a1"),
new HllSketchToEstimatePostAggregator("p1", new FieldAccessPostAggregator("p0", "a0"), false),
new HllSketchToEstimatePostAggregator("p3", new FieldAccessPostAggregator("p2", "a0"), false),
new HllSketchToEstimatePostAggregator("p5", new FieldAccessPostAggregator("p4", "a0"), false),
new ExpressionPostAggregator("p6", "(\"p5\" + 1)", null, TestExprMacroTable.INSTANCE),
new HllSketchToEstimatePostAggregator("p8", new FieldAccessPostAggregator("p7", "a2"), false),
new ExpressionPostAggregator("p4", "(\"p3\" + 1)", null, TestExprMacroTable.INSTANCE),
new HllSketchToEstimatePostAggregator("p6", new FieldAccessPostAggregator("p5", "a2"), false),
new HllSketchToEstimatePostAggregator(
"p10",
new FieldAccessPostAggregator("p9", "a0"),
"p8",
new FieldAccessPostAggregator("p7", "a0"),
false
),
new ExpressionPostAggregator("p11", "abs(\"p10\")", null, TestExprMacroTable.INSTANCE),
new ExpressionPostAggregator("p9", "abs(\"p8\")", null, TestExprMacroTable.INSTANCE),
new HllSketchToEstimateWithBoundsPostAggregator(
"p13",
new FieldAccessPostAggregator("p12", "a0"),
"p11",
new FieldAccessPostAggregator("p10", "a0"),
2
),
new HllSketchToEstimateWithBoundsPostAggregator(
"p15",
new FieldAccessPostAggregator("p14", "a0"),
"p13",
new FieldAccessPostAggregator("p12", "a0"),
1
),
new FieldAccessPostAggregator("p16", "a3"),
new HllSketchToStringPostAggregator("p18", new FieldAccessPostAggregator("p17", "a0")),
new HllSketchToStringPostAggregator("p20", new FieldAccessPostAggregator("p19", "a0")),
new ExpressionPostAggregator("p21", "upper(\"p20\")", null, TestExprMacroTable.INSTANCE),
new HllSketchToEstimatePostAggregator("p23", new FieldAccessPostAggregator("p22", "a0"), true)
new HllSketchToStringPostAggregator("p15", new FieldAccessPostAggregator("p14", "a0")),
new HllSketchToStringPostAggregator("p17", new FieldAccessPostAggregator("p16", "a0")),
new ExpressionPostAggregator("p18", "upper(\"p17\")", null, TestExprMacroTable.INSTANCE),
new HllSketchToEstimatePostAggregator("p20", new FieldAccessPostAggregator("p19", "a0"), true)
)
)
.context(QUERY_CONTEXT_DEFAULT)
@ -531,6 +540,167 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
);
}
@Test
public void testHllSketchPostAggsFinalizeOuterSketches()
{
final ImmutableMap<String, Object> queryContext =
ImmutableMap.<String, Object>builder()
.putAll(QUERY_CONTEXT_DEFAULT)
.put(SketchQueryContext.CTX_FINALIZE_OUTER_SKETCHES, true)
.build();
final String sketchSummary = "### HLL SKETCH SUMMARY: \n"
+ " Log Config K : 12\n"
+ " Hll Target : HLL_4\n"
+ " Current Mode : LIST\n"
+ " Memory : false\n"
+ " LB : 2.0\n"
+ " Estimate : 2.000000004967054\n"
+ " UB : 2.000099863468538\n"
+ " OutOfOrder Flag: false\n"
+ " Coupon Count : 2\n";
final String otherSketchSummary = "### HLL SKETCH SUMMARY: \n"
+ " LOG CONFIG K : 12\n"
+ " HLL TARGET : HLL_4\n"
+ " CURRENT MODE : LIST\n"
+ " MEMORY : FALSE\n"
+ " LB : 2.0\n"
+ " ESTIMATE : 2.000000004967054\n"
+ " UB : 2.000099863468538\n"
+ " OUTOFORDER FLAG: FALSE\n"
+ " COUPON COUNT : 2\n";
testQuery(
"SELECT\n"
+ " DS_HLL(dim2),\n"
+ " DS_HLL(m1),\n"
+ " HLL_SKETCH_ESTIMATE(DS_HLL(dim2)),\n"
+ " HLL_SKETCH_ESTIMATE(DS_HLL(dim2)) + 1,\n"
+ " HLL_SKETCH_ESTIMATE(DS_HLL(CONCAT(dim2, 'hello'))),\n"
+ " ABS(HLL_SKETCH_ESTIMATE(DS_HLL(dim2))),\n"
+ " HLL_SKETCH_ESTIMATE_WITH_ERROR_BOUNDS(DS_HLL(dim2), 2),\n"
+ " HLL_SKETCH_ESTIMATE_WITH_ERROR_BOUNDS(DS_HLL(dim2)),\n"
+ " DS_HLL(POWER(ABS(m1 + 100), 2)),\n"
+ " APPROX_COUNT_DISTINCT_DS_HLL(dim2),\n"
+ " HLL_SKETCH_TO_STRING(DS_HLL(dim2)),\n"
+ " UPPER(HLL_SKETCH_TO_STRING(DS_HLL(dim2))),\n"
+ " HLL_SKETCH_ESTIMATE(DS_HLL(dim2), true)\n"
+ "FROM druid.foo",
queryContext,
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL)
.virtualColumns(
new ExpressionVirtualColumn(
"v0",
"concat(\"dim2\",'hello')",
ColumnType.STRING,
TestExprMacroTable.INSTANCE
),
new ExpressionVirtualColumn(
"v1",
"pow(abs((\"m1\" + 100)),2)",
ColumnType.DOUBLE,
TestExprMacroTable.INSTANCE
)
)
.aggregators(
ImmutableList.of(
new HllSketchBuildAggregatorFactory(
"a0",
"dim2",
null,
null,
null,
true
),
new HllSketchBuildAggregatorFactory(
"a1",
"m1",
null,
null,
null,
true
),
new HllSketchBuildAggregatorFactory(
"a2",
"v0",
null,
null,
null,
true
),
new HllSketchBuildAggregatorFactory(
"a3",
"v1",
null,
null,
null,
true
),
new HllSketchBuildAggregatorFactory(
"a4",
"dim2",
null,
null,
null,
true
)
)
)
.postAggregators(
ImmutableList.of(
new HllSketchToEstimatePostAggregator("p1", new FieldAccessPostAggregator("p0", "a0"), false),
new HllSketchToEstimatePostAggregator("p3", new FieldAccessPostAggregator("p2", "a0"), false),
new ExpressionPostAggregator("p4", "(\"p3\" + 1)", null, TestExprMacroTable.INSTANCE),
new HllSketchToEstimatePostAggregator("p6", new FieldAccessPostAggregator("p5", "a2"), false),
new HllSketchToEstimatePostAggregator(
"p8",
new FieldAccessPostAggregator("p7", "a0"),
false
),
new ExpressionPostAggregator("p9", "abs(\"p8\")", null, TestExprMacroTable.INSTANCE),
new HllSketchToEstimateWithBoundsPostAggregator(
"p11",
new FieldAccessPostAggregator("p10", "a0"),
2
),
new HllSketchToEstimateWithBoundsPostAggregator(
"p13",
new FieldAccessPostAggregator("p12", "a0"),
1
),
new HllSketchToStringPostAggregator("p15", new FieldAccessPostAggregator("p14", "a0")),
new HllSketchToStringPostAggregator("p17", new FieldAccessPostAggregator("p16", "a0")),
new ExpressionPostAggregator("p18", "upper(\"p17\")", null, TestExprMacroTable.INSTANCE),
new HllSketchToEstimatePostAggregator("p20", new FieldAccessPostAggregator("p19", "a0"), true)
)
)
.context(queryContext)
.build()
),
ImmutableList.of(
new Object[]{
"2",
"6",
2.000000004967054d,
3.000000004967054d,
3.000000014901161d,
2.000000004967054d,
"[2.000000004967054,2.0,2.0001997319422404]",
"[2.000000004967054,2.0,2.000099863468538]",
"6",
2L,
sketchSummary,
otherSketchSummary,
2.0
}
)
);
}
@Test
public void testtHllSketchPostAggsPostSort()
{
@ -561,16 +731,16 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
"dim2",
null,
null,
false,
true
)
)
)
.postAggregators(
ImmutableList.of(
new FieldAccessPostAggregator("p0", "a0"),
new HllSketchToEstimatePostAggregator("p2", new FieldAccessPostAggregator("p1", "a0"), false),
new HllSketchToEstimatePostAggregator("s1", new FieldAccessPostAggregator("s0", "p0"), false),
new HllSketchToStringPostAggregator("s3", new FieldAccessPostAggregator("s2", "p0"))
new HllSketchToEstimatePostAggregator("p1", new FieldAccessPostAggregator("p0", "a0"), false),
new HllSketchToEstimatePostAggregator("s1", new FieldAccessPostAggregator("s0", "a0"), false),
new HllSketchToStringPostAggregator("s3", new FieldAccessPostAggregator("s2", "a0"))
)
)
.context(QUERY_CONTEXT_DEFAULT)
@ -606,6 +776,7 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
"dim2",
null,
null,
null,
true
),
new HllSketchBuildAggregatorFactory(
@ -613,13 +784,14 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
"dim2",
null,
null,
false,
true
)
)
)
.context(QUERY_CONTEXT_DEFAULT)
.build()),
ImmutableList.of(new Object[]{0L, "0"})
ImmutableList.of(new Object[]{0L, "\"AgEHDAMMAAA=\""})
);
}
@ -648,6 +820,7 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
"v0",
null,
null,
null,
true
),
selector("dim1", "nonexistent", null)
@ -658,6 +831,7 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
"v0",
null,
null,
false,
true
),
selector("dim1", "nonexistent", null)
@ -667,6 +841,63 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(new Object[]{"a", 0L, "\"AgEHDAMMAAA=\""})
);
}
@Test
public void testGroupByAggregatorDefaultValuesFinalizeOuterSketches()
{
final ImmutableMap<String, Object> queryContext =
ImmutableMap.<String, Object>builder()
.putAll(QUERY_CONTEXT_DEFAULT)
.put(SketchQueryContext.CTX_FINALIZE_OUTER_SKETCHES, true)
.build();
testQuery(
"SELECT\n"
+ "dim2,\n"
+ "APPROX_COUNT_DISTINCT_DS_HLL(dim2) FILTER(WHERE dim1 = 'nonexistent'),"
+ "DS_HLL(dim2) FILTER(WHERE dim1 = 'nonexistent')"
+ "FROM foo WHERE dim2 = 'a' GROUP BY dim2",
queryContext,
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setDimFilter(selector("dim2", "a", null))
.setGranularity(Granularities.ALL)
.setVirtualColumns(expressionVirtualColumn("v0", "'a'", ColumnType.STRING))
.setDimensions(new DefaultDimensionSpec("v0", "d0", ColumnType.STRING))
.setAggregatorSpecs(
aggregators(
new FilteredAggregatorFactory(
new HllSketchBuildAggregatorFactory(
"a0",
"v0",
null,
null,
null,
true
),
selector("dim1", "nonexistent", null)
),
new FilteredAggregatorFactory(
new HllSketchBuildAggregatorFactory(
"a1",
"v0",
null,
null,
null,
true
),
selector("dim1", "nonexistent", null)
)
)
)
.setContext(queryContext)
.build()
),
ImmutableList.of(new Object[]{"a", 0L, "0"})
);
}

View File

@ -59,7 +59,8 @@ public class DoublesSketchAggregatorFactoryTest
"myFactory",
"myField",
1024,
1000L
1000L,
null
);
final byte[] json = mapper.writeValueAsBytes(factory);
final DoublesSketchAggregatorFactory fromJson = (DoublesSketchAggregatorFactory) mapper.readValue(
@ -76,6 +77,7 @@ public class DoublesSketchAggregatorFactoryTest
"myFactory",
"myField",
null,
null,
null
);
@ -90,6 +92,7 @@ public class DoublesSketchAggregatorFactoryTest
"myFactory",
"myField",
128,
null,
null
);
Assert.assertEquals(64, factory.guessAggregatorHeapFootprint(1));
@ -105,6 +108,7 @@ public class DoublesSketchAggregatorFactoryTest
"myFactory",
"myField",
128,
null,
null
);
Assert.assertEquals(24608L, factory.getMaxIntermediateSize());
@ -113,7 +117,8 @@ public class DoublesSketchAggregatorFactoryTest
"myFactory",
"myField",
128,
1_000_000_000_000L
1_000_000_000_000L,
null
);
Assert.assertEquals(34848L, factory.getMaxIntermediateSize());
}
@ -161,7 +166,8 @@ public class DoublesSketchAggregatorFactoryTest
"myFactory",
"myField",
1024,
1000L
1000L,
null
);
Assert.assertEquals(factory, factory.withName("myFactory"));
Assert.assertEquals("newTest", factory.withName("newTest").getName());

View File

@ -51,7 +51,8 @@ public class DoublesSketchMergeAggregatorFactoryTest
final DoublesSketchMergeAggregatorFactory factory = new DoublesSketchMergeAggregatorFactory(
"myFactory",
1024,
1000L
1000L,
null
);
final byte[] json = mapper.writeValueAsBytes(factory);
final DoublesSketchMergeAggregatorFactory fromJson = (DoublesSketchMergeAggregatorFactory) mapper.readValue(
@ -67,7 +68,8 @@ public class DoublesSketchMergeAggregatorFactoryTest
final DoublesSketchMergeAggregatorFactory factory = new DoublesSketchMergeAggregatorFactory(
"myFactory",
1024,
1000L
1000L,
null
);
Assert.assertEquals(factory, factory.withName("myFactory"));
Assert.assertEquals("newTest", factory.withName("newTest").getName());

View File

@ -21,6 +21,7 @@ package org.apache.druid.query.aggregation.datasketches.quantiles.sql;
import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.common.config.NullHandling;
@ -34,6 +35,7 @@ import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.datasketches.SketchQueryContext;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchModule;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToCDFPostAggregator;
@ -543,8 +545,8 @@ public class DoublesSketchSqlAggregatorTest extends BaseCalciteQueryTest
.aggregators(ImmutableList.of(
new LongSumAggregatorFactory("a0", "cnt"),
new DoublesSketchAggregatorFactory("a1:agg", "cnt", 128),
new DoublesSketchAggregatorFactory("a2:agg", "cnt", 128),
new DoublesSketchAggregatorFactory("a3:agg", "v0", 128)
new DoublesSketchAggregatorFactory("a2:agg", "cnt", 128, null, false),
new DoublesSketchAggregatorFactory("a3:agg", "v0", 128, null, false)
))
.postAggregators(
new DoublesSketchToQuantilePostAggregator(
@ -696,25 +698,24 @@ public class DoublesSketchSqlAggregatorTest extends BaseCalciteQueryTest
.granularity(Granularities.ALL)
.aggregators(
ImmutableList.of(
new DoublesSketchAggregatorFactory("a0:agg", "m1", 128)
new DoublesSketchAggregatorFactory("a0:agg", "m1", 128, null, false)
)
)
.postAggregators(
ImmutableList.of(
new FieldAccessPostAggregator("p0", "a0:agg"),
new DoublesSketchToQuantilePostAggregator(
"p2",
new FieldAccessPostAggregator("p1", "a0:agg"),
"p1",
new FieldAccessPostAggregator("p0", "a0:agg"),
0.5
),
new DoublesSketchToQuantilePostAggregator(
"s1",
new FieldAccessPostAggregator("s0", "p0"),
new FieldAccessPostAggregator("s0", "a0:agg"),
0.5
),
new DoublesSketchToQuantilePostAggregator(
"s3",
new FieldAccessPostAggregator("s2", "p0"),
new FieldAccessPostAggregator("s2", "a0:agg"),
0.9800000190734863
)
)
@ -750,8 +751,8 @@ public class DoublesSketchSqlAggregatorTest extends BaseCalciteQueryTest
.aggregators(ImmutableList.of(
new DoublesSketchAggregatorFactory("a0:agg", "m1", null),
new DoublesSketchAggregatorFactory("a1:agg", "qsketch_m1", null),
new DoublesSketchAggregatorFactory("a2:agg", "m1", null),
new DoublesSketchAggregatorFactory("a3:agg", "qsketch_m1", null)
new DoublesSketchAggregatorFactory("a2:agg", "m1", null, null, false),
new DoublesSketchAggregatorFactory("a3:agg", "qsketch_m1", null, null, false)
))
.postAggregators(
new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f),
@ -760,6 +761,53 @@ public class DoublesSketchSqlAggregatorTest extends BaseCalciteQueryTest
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{
Double.NaN,
Double.NaN,
"\"AQMIHoAAAAA=\"",
"\"AQMIHoAAAAA=\""
}
)
);
}
@Test
public void testEmptyTimeseriesResultsWithFinalizeSketches()
{
final ImmutableMap<String, Object> queryContext =
ImmutableMap.<String, Object>builder()
.putAll(QUERY_CONTEXT_DEFAULT)
.put(SketchQueryContext.CTX_FINALIZE_OUTER_SKETCHES, true)
.build();
testQuery(
"SELECT\n"
+ "APPROX_QUANTILE_DS(m1, 0.01),\n"
+ "APPROX_QUANTILE_DS(qsketch_m1, 0.01),\n"
+ "DS_QUANTILES_SKETCH(m1),\n"
+ "DS_QUANTILES_SKETCH(qsketch_m1)\n"
+ "FROM foo WHERE dim2 = 0",
queryContext,
Collections.singletonList(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL)
.filters(bound("dim2", "0", "0", false, false, null, StringComparators.NUMERIC))
.aggregators(ImmutableList.of(
new DoublesSketchAggregatorFactory("a0:agg", "m1", null),
new DoublesSketchAggregatorFactory("a1:agg", "qsketch_m1", null),
new DoublesSketchAggregatorFactory("a2:agg", "m1", null, null, true),
new DoublesSketchAggregatorFactory("a3:agg", "qsketch_m1", null, null, true)
))
.postAggregators(
new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f),
new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.01f)
)
.context(queryContext)
.build()
),
ImmutableList.of(
new Object[]{
Double.NaN,
@ -801,24 +849,99 @@ public class DoublesSketchSqlAggregatorTest extends BaseCalciteQueryTest
selector("dim1", "nonexistent", null)
),
new FilteredAggregatorFactory(
new DoublesSketchAggregatorFactory("a2:agg", "m1", null),
new DoublesSketchAggregatorFactory("a2:agg", "m1", null, null, false),
selector("dim1", "nonexistent", null)
),
new FilteredAggregatorFactory(
new DoublesSketchAggregatorFactory("a3:agg", "qsketch_m1", null),
new DoublesSketchAggregatorFactory("a3:agg", "qsketch_m1", null, null, false),
selector("dim1", "nonexistent", null)
)
)
)
.setPostAggregatorSpecs(
ImmutableList.of(
new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f),
new DoublesSketchToQuantilePostAggregator(
"a0",
makeFieldAccessPostAgg("a0:agg"),
0.01f
),
new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.01f)
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{
"a",
Double.NaN,
Double.NaN,
"\"AQMIHoAAAAA=\"",
"\"AQMIHoAAAAA=\""
}
)
);
}
@Test
public void testGroupByAggregatorDefaultValuesWithFinalizeSketches()
{
final ImmutableMap<String, Object> queryContext =
ImmutableMap.<String, Object>builder()
.putAll(QUERY_CONTEXT_DEFAULT)
.put(SketchQueryContext.CTX_FINALIZE_OUTER_SKETCHES, true)
.build();
testQuery(
"SELECT\n"
+ "dim2,\n"
+ "APPROX_QUANTILE_DS(m1, 0.01) FILTER(WHERE dim1 = 'nonexistent'),\n"
+ "APPROX_QUANTILE_DS(qsketch_m1, 0.01) FILTER(WHERE dim1 = 'nonexistent'),\n"
+ "DS_QUANTILES_SKETCH(m1) FILTER(WHERE dim1 = 'nonexistent'),\n"
+ "DS_QUANTILES_SKETCH(qsketch_m1) FILTER(WHERE dim1 = 'nonexistent')\n"
+ "FROM foo WHERE dim2 = 'a' GROUP BY dim2",
queryContext,
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setDimFilter(selector("dim2", "a", null))
.setGranularity(Granularities.ALL)
.setVirtualColumns(expressionVirtualColumn("v0", "'a'", ColumnType.STRING))
.setDimensions(new DefaultDimensionSpec("v0", "d0", ColumnType.STRING))
.setAggregatorSpecs(
aggregators(
new FilteredAggregatorFactory(
new DoublesSketchAggregatorFactory("a0:agg", "m1", null),
selector("dim1", "nonexistent", null)
),
new FilteredAggregatorFactory(
new DoublesSketchAggregatorFactory("a1:agg", "qsketch_m1", null),
selector("dim1", "nonexistent", null)
),
new FilteredAggregatorFactory(
new DoublesSketchAggregatorFactory("a2:agg", "m1", null, null, true),
selector("dim1", "nonexistent", null)
),
new FilteredAggregatorFactory(
new DoublesSketchAggregatorFactory("a3:agg", "qsketch_m1", null, null, true),
selector("dim1", "nonexistent", null)
)
)
)
.setPostAggregatorSpecs(
ImmutableList.of(
new DoublesSketchToQuantilePostAggregator(
"a0",
makeFieldAccessPostAgg("a0:agg"),
0.01f
),
new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.01f)
)
)
.setContext(queryContext)
.build()
),
ImmutableList.of(
new Object[]{
"a",
@ -851,8 +974,8 @@ public class DoublesSketchSqlAggregatorTest extends BaseCalciteQueryTest
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL)
.aggregators(ImmutableList.of(
new DoublesSketchAggregatorFactory("a0:agg", "m1", null, 1L),
new DoublesSketchAggregatorFactory("a1:agg", "cnt", null, 1L)
new DoublesSketchAggregatorFactory("a0:agg", "m1", null, 1L, null),
new DoublesSketchAggregatorFactory("a1:agg", "cnt", null, 1L, null)
))
.postAggregators(
new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f),
@ -860,13 +983,13 @@ public class DoublesSketchSqlAggregatorTest extends BaseCalciteQueryTest
)
.context(context)
.build()
),
ImmutableList.of(
new Object[]{
1.0,
1.0
}
)
),
ImmutableList.of(
new Object[]{
1.0,
1.0
}
)
);
}

View File

@ -21,12 +21,15 @@ package org.apache.druid.query.aggregation.datasketches.theta.sql;
import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.guice.ExpressionModule;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
@ -34,6 +37,7 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.SketchQueryContext;
import org.apache.druid.query.aggregation.datasketches.theta.SketchEstimatePostAggregator;
import org.apache.druid.query.aggregation.datasketches.theta.SketchMergeAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.theta.SketchModule;
@ -42,7 +46,6 @@ import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
@ -64,9 +67,11 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.joda.time.DateTimeZone;
import org.joda.time.Period;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -75,12 +80,30 @@ public class ThetaSketchSqlAggregatorTest extends BaseCalciteQueryTest
{
private static final String DATA_SOURCE = "foo";
private ExprMacroTable macroTable;
@Before
public void setUp()
{
macroTable = createMacroTable();
}
@Override
public Iterable<? extends Module> getJacksonModules()
{
return Iterables.concat(super.getJacksonModules(), new SketchModule().getJacksonModules());
}
@Override
public ExprMacroTable createMacroTable()
{
final List<ExprMacroTable.ExprMacro> exprMacros = new ArrayList<>();
for (Class<? extends ExprMacroTable.ExprMacro> clazz : ExpressionModule.EXPR_MACROS) {
exprMacros.add(CalciteTests.INJECTOR.getInstance(clazz));
}
return new ExprMacroTable(exprMacros);
}
@Override
public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker(
QueryRunnerFactoryConglomerate conglomerate
@ -210,13 +233,13 @@ public class ThetaSketchSqlAggregatorTest extends BaseCalciteQueryTest
"v0",
"substring(\"dim2\", 0, 1)",
ColumnType.STRING,
TestExprMacroTable.INSTANCE
macroTable
),
new ExpressionVirtualColumn(
"v1",
"concat(substring(\"dim2\", 0, 1),'x')",
ColumnType.STRING,
TestExprMacroTable.INSTANCE
macroTable
)
)
.aggregators(
@ -419,7 +442,184 @@ public class ThetaSketchSqlAggregatorTest extends BaseCalciteQueryTest
"v0",
"concat(\"dim2\",'hello')",
ColumnType.STRING,
TestExprMacroTable.INSTANCE
macroTable
)
)
.aggregators(
ImmutableList.of(
new LongSumAggregatorFactory("a0", "cnt"),
new SketchMergeAggregatorFactory(
"a1",
"dim2",
null,
false,
null,
null
),
new SketchMergeAggregatorFactory(
"a2",
"v0",
null,
false,
null,
null
),
new SketchMergeAggregatorFactory(
"a3",
"dim1",
null,
false,
null,
null
)
)
)
.postAggregators(
new SketchEstimatePostAggregator(
"p1",
new FieldAccessPostAggregator("p0", "a1"),
null
),
new SketchEstimatePostAggregator(
"p3",
new FieldAccessPostAggregator("p2", "a2"),
null
),
new SketchEstimatePostAggregator(
"p5",
new FieldAccessPostAggregator("p4", "a1"),
10
),
new SketchSetPostAggregator(
"p8",
"INTERSECT",
null,
ImmutableList.of(
new FieldAccessPostAggregator("p6", "a1"),
new FieldAccessPostAggregator("p7", "a3")
)
),
new SketchSetPostAggregator(
"p11",
"UNION",
null,
ImmutableList.of(
new FieldAccessPostAggregator("p9", "a1"),
new FieldAccessPostAggregator("p10", "a3")
)
),
new SketchSetPostAggregator(
"p14",
"NOT",
null,
ImmutableList.of(
new FieldAccessPostAggregator("p12", "a1"),
new FieldAccessPostAggregator("p13", "a3")
)
),
new SketchSetPostAggregator(
"p17",
"INTERSECT",
32768,
ImmutableList.of(
new FieldAccessPostAggregator("p15", "a1"),
new FieldAccessPostAggregator("p16", "a3")
)
),
new SketchEstimatePostAggregator(
"p23",
new SketchSetPostAggregator(
"p22",
"INTERSECT",
null,
ImmutableList.of(
new SketchSetPostAggregator(
"p20",
"INTERSECT",
null,
ImmutableList.of(
new FieldAccessPostAggregator("p18", "a1"),
new FieldAccessPostAggregator("p19", "a3")
)
),
new FieldAccessPostAggregator("p21", "a1")
)
),
null
)
)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
expectedResults
);
}
@Test
public void testThetaSketchPostAggsFinalizeOuterSketches()
{
final ImmutableMap<String, Object> queryContext =
ImmutableMap.<String, Object>builder()
.putAll(QUERY_CONTEXT_DEFAULT)
.put(SketchQueryContext.CTX_FINALIZE_OUTER_SKETCHES, true)
.build();
final List<Object[]> expectedResults;
if (NullHandling.replaceWithDefault()) {
expectedResults = ImmutableList.of(
new Object[]{
6L,
2.0d,
3.0d,
"{\"estimate\":2.0,\"highBound\":2.0,\"lowBound\":2.0,\"numStdDev\":10}",
"\"AQMDAAA6zJOQxkPsNomrZQ==\"",
"\"AgMDAAAazJMGAAAAAACAP1XTBztMIcMJ+HOoBBne1zKQxkPsNomrZUeWbJt3n+VpF8EdUoUHAXvxsLkOSE0lfQ==\"",
"\"AQMDAAA6zJMXwR1ShQcBew==\"",
"\"AQMDAAA6zJOQxkPsNomrZQ==\"",
1.0d
}
);
} else {
expectedResults = ImmutableList.of(
new Object[]{
6L,
2.0d,
3.0d,
"{\"estimate\":2.0,\"highBound\":2.0,\"lowBound\":2.0,\"numStdDev\":10}",
"\"AQMDAAA6zJOQxkPsNomrZQ==\"",
"\"AgMDAAAazJMGAAAAAACAP1XTBztMIcMJ+HOoBBne1zKQxkPsNomrZUeWbJt3n+VpF8EdUoUHAXvxsLkOSE0lfQ==\"",
"\"AQMDAAA6zJMXwR1ShQcBew==\"",
"\"AQMDAAA6zJOQxkPsNomrZQ==\"",
1.0d
}
);
}
testQuery(
"SELECT\n"
+ " SUM(cnt),\n"
+ " theta_sketch_estimate(DS_THETA(dim2)),\n"
+ " theta_sketch_estimate(DS_THETA(CONCAT(dim2, 'hello'))),\n"
+ " theta_sketch_estimate_with_error_bounds(DS_THETA(dim2), 10),\n"
+ " THETA_SKETCH_INTERSECT(DS_THETA(dim2), DS_THETA(dim1)),\n"
+ " THETA_SKETCH_UNION(DS_THETA(dim2), DS_THETA(dim1)),\n"
+ " THETA_SKETCH_NOT(DS_THETA(dim2), DS_THETA(dim1)),\n"
+ " THETA_SKETCH_INTERSECT(32768, DS_THETA(dim2), DS_THETA(dim1)),\n"
+ " theta_sketch_estimate(THETA_SKETCH_INTERSECT(THETA_SKETCH_INTERSECT(DS_THETA(dim2), DS_THETA(dim1)), DS_THETA(dim2)))\n"
+ "FROM druid.foo",
queryContext,
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL)
.virtualColumns(
new ExpressionVirtualColumn(
"v0",
"concat(\"dim2\",'hello')",
ColumnType.STRING,
macroTable
)
)
.aggregators(
@ -525,7 +725,7 @@ public class ThetaSketchSqlAggregatorTest extends BaseCalciteQueryTest
null
)
)
.context(QUERY_CONTEXT_DEFAULT)
.context(queryContext)
.build()
),
expectedResults
@ -545,6 +745,62 @@ public class ThetaSketchSqlAggregatorTest extends BaseCalciteQueryTest
testQuery(
StringUtils.format("SELECT THETA_SKETCH_ESTIMATE(y) from (%s)", sql),
ImmutableList.of(Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL)
.aggregators(
ImmutableList.of(
new SketchMergeAggregatorFactory(
"a0",
"dim2",
null,
false,
null,
null
)
)
)
.postAggregators(
new SketchEstimatePostAggregator(
"p1",
new FieldAccessPostAggregator("p0", "a0"),
null
),
new SketchEstimatePostAggregator(
"s1",
new FieldAccessPostAggregator("s0", "a0"),
null
)
)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
expectedResults
);
}
@Test
public void testThetaSketchPostAggsPostSortFinalizeOuterSketches()
{
final String sql = "SELECT DS_THETA(dim2) as y FROM druid.foo ORDER BY THETA_SKETCH_ESTIMATE(DS_THETA(dim2)) DESC LIMIT 10";
final ImmutableMap<String, Object> queryContext =
ImmutableMap.<String, Object>builder()
.putAll(QUERY_CONTEXT_DEFAULT)
.put(SketchQueryContext.CTX_FINALIZE_OUTER_SKETCHES, true)
.build();
final List<Object[]> expectedResults = ImmutableList.of(
new Object[]{
2.0d
}
);
testQuery(
StringUtils.format("SELECT THETA_SKETCH_ESTIMATE(y) from (%s)", sql),
queryContext,
ImmutableList.of(Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
@ -562,19 +818,18 @@ public class ThetaSketchSqlAggregatorTest extends BaseCalciteQueryTest
)
)
.postAggregators(
new FieldAccessPostAggregator("p0", "a0"),
new SketchEstimatePostAggregator(
"p2",
new FieldAccessPostAggregator("p1", "a0"),
"p1",
new FieldAccessPostAggregator("p0", "a0"),
null
),
new SketchEstimatePostAggregator(
"s1",
new FieldAccessPostAggregator("s0", "p0"),
new FieldAccessPostAggregator("s0", "a0"),
null
)
)
.context(QUERY_CONTEXT_DEFAULT)
.context(queryContext)
.build()
),
@ -620,7 +875,7 @@ public class ThetaSketchSqlAggregatorTest extends BaseCalciteQueryTest
"a2",
"dim2",
1024,
null,
false,
null,
null
),
@ -628,7 +883,7 @@ public class ThetaSketchSqlAggregatorTest extends BaseCalciteQueryTest
"a3",
"thetasketch_dim1",
1024,
null,
false,
null,
null
)
@ -637,7 +892,7 @@ public class ThetaSketchSqlAggregatorTest extends BaseCalciteQueryTest
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(new Object[]{0L, 0L, "0.0", "0.0"})
ImmutableList.of(new Object[]{0L, 0L, "\"AQMDAAAeAAA=\"", "\"AQMDAAAeAAA=\""})
);
}
@ -667,7 +922,7 @@ public class ThetaSketchSqlAggregatorTest extends BaseCalciteQueryTest
"a0",
"v0",
null,
null,
true,
null,
null
),
@ -678,7 +933,7 @@ public class ThetaSketchSqlAggregatorTest extends BaseCalciteQueryTest
"a1",
"thetasketch_dim1",
null,
null,
true,
null,
null
),
@ -689,7 +944,7 @@ public class ThetaSketchSqlAggregatorTest extends BaseCalciteQueryTest
"a2",
"v0",
1024,
null,
false,
null,
null
),
@ -700,7 +955,7 @@ public class ThetaSketchSqlAggregatorTest extends BaseCalciteQueryTest
"a3",
"thetasketch_dim1",
1024,
null,
false,
null,
null
),
@ -711,6 +966,87 @@ public class ThetaSketchSqlAggregatorTest extends BaseCalciteQueryTest
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(new Object[]{"a", 0L, 0L, "\"AQMDAAAeAAA=\"", "\"AQMDAAAeAAA=\""})
);
}
@Test
public void testGroupByAggregatorDefaultValuesFinalizeOuterSketches()
{
final ImmutableMap<String, Object> queryContext =
ImmutableMap.<String, Object>builder()
.putAll(QUERY_CONTEXT_DEFAULT)
.put(SketchQueryContext.CTX_FINALIZE_OUTER_SKETCHES, true)
.build();
testQuery(
"SELECT\n"
+ "dim2,\n"
+ " APPROX_COUNT_DISTINCT_DS_THETA(dim2) FILTER(WHERE dim1 = 'nonexistent'),\n"
+ " APPROX_COUNT_DISTINCT_DS_THETA(thetasketch_dim1) FILTER(WHERE dim1 = 'nonexistent'),\n"
+ " DS_THETA(dim2, 1024) FILTER(WHERE dim1 = 'nonexistent'),\n"
+ " DS_THETA(thetasketch_dim1, 1024) FILTER(WHERE dim1 = 'nonexistent')\n"
+ "FROM foo WHERE dim2 = 'a' GROUP BY dim2",
queryContext,
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setDimFilter(selector("dim2", "a", null))
.setGranularity(Granularities.ALL)
.setVirtualColumns(expressionVirtualColumn("v0", "'a'", ColumnType.STRING))
.setDimensions(new DefaultDimensionSpec("v0", "d0", ColumnType.STRING))
.setAggregatorSpecs(
aggregators(
new FilteredAggregatorFactory(
new SketchMergeAggregatorFactory(
"a0",
"v0",
null,
true,
null,
null
),
selector("dim1", "nonexistent", null)
),
new FilteredAggregatorFactory(
new SketchMergeAggregatorFactory(
"a1",
"thetasketch_dim1",
null,
true,
null,
null
),
selector("dim1", "nonexistent", null)
),
new FilteredAggregatorFactory(
new SketchMergeAggregatorFactory(
"a2",
"v0",
1024,
true,
null,
null
),
selector("dim1", "nonexistent", null)
),
new FilteredAggregatorFactory(
new SketchMergeAggregatorFactory(
"a3",
"thetasketch_dim1",
1024,
true,
null,
null
),
selector("dim1", "nonexistent", null)
)
)
)
.setContext(queryContext)
.build()
),
ImmutableList.of(new Object[]{"a", 0L, 0L, "0.0", "0.0"})
);
}

View File

@ -172,8 +172,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
// FloatSumAggregator combine method takes in two Float but return Double
new FloatSumAggregatorFactory("sum_added", "added"),
new SketchMergeAggregatorFactory("thetaSketch", "user", 16384, true, false, null),
new HllSketchBuildAggregatorFactory("HLLSketchBuild", "user", 12, TgtHllType.HLL_4.name(), false),
new DoublesSketchAggregatorFactory("quantilesDoublesSketch", "delta", 128, 1000000000L)
new HllSketchBuildAggregatorFactory("HLLSketchBuild", "user", 12, TgtHllType.HLL_4.name(), false, false),
new DoublesSketchAggregatorFactory("quantilesDoublesSketch", "delta", 128, 1000000000L, null)
},
false
);
@ -266,8 +266,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
new CountAggregatorFactory("count"),
new LongSumAggregatorFactory("sum_added", "added"),
new SketchMergeAggregatorFactory("thetaSketch", "user", 16384, true, false, null),
new HllSketchBuildAggregatorFactory("HLLSketchBuild", "user", 12, TgtHllType.HLL_4.name(), false),
new DoublesSketchAggregatorFactory("quantilesDoublesSketch", "delta", 128, 1000000000L)
new HllSketchBuildAggregatorFactory("HLLSketchBuild", "user", 12, TgtHllType.HLL_4.name(), false, false),
new DoublesSketchAggregatorFactory("quantilesDoublesSketch", "delta", 128, 1000000000L, null)
},
false
);
@ -458,8 +458,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
fullDatasourceName,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
0,
14762,
14761,
14586,
14585,
0,
2,
2,
@ -476,7 +476,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
fullDatasourceName,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
0,
23156,
22892,
0,
0,
3,
@ -592,8 +592,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
getAndAssertCompactionStatus(
fullDatasourceName,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
14762,
14761,
14586,
14585,
0,
2,
2,
@ -601,7 +601,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
1,
1,
0);
Assert.assertEquals(compactionResource.getCompactionProgress(fullDatasourceName).get("remainingSegmentSize"), "14762");
Assert.assertEquals(compactionResource.getCompactionProgress(fullDatasourceName).get("remainingSegmentSize"), "14586");
// Run compaction again to compact the remaining day
// Remaining day compacted (1 new segment). Now both days compacted (2 total)
forceTriggerAutoCompaction(2);
@ -612,7 +612,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
fullDatasourceName,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
0,
23156,
22892,
0,
0,
3,

View File

@ -54,6 +54,7 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.post.ExpressionPostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.planner.Calcites;
@ -239,6 +240,7 @@ public class OperatorConversions
* @param rexNode expression meant to be applied on top of the rows
* @param postAggregatorVisitor visitor that manages postagg names and tracks postaggs that were created
* by the translation
* @param useExpressions whether we should consider {@link ExpressionPostAggregator} as a target
*
* @return rexNode referring to fields in rowOrder, or null if not possible
*/
@ -247,7 +249,8 @@ public class OperatorConversions
final PlannerContext plannerContext,
final RowSignature rowSignature,
final RexNode rexNode,
final PostAggregatorVisitor postAggregatorVisitor
final PostAggregatorVisitor postAggregatorVisitor,
final boolean useExpressions
)
{
final SqlKind kind = rexNode.getKind();
@ -268,17 +271,41 @@ public class OperatorConversions
final SqlOperatorConversion conversion = plannerContext.getOperatorTable()
.lookupOperatorConversion(operator);
if (conversion == null) {
return null;
} else {
return conversion.toPostAggregator(
if (conversion != null) {
// Try call-specific translation.
final PostAggregator postAggregator = conversion.toPostAggregator(
plannerContext,
rowSignature,
rexNode,
postAggregatorVisitor
);
if (postAggregator != null) {
return postAggregator;
}
}
} else if (kind == SqlKind.LITERAL) {
}
if (useExpressions) {
// Try to translate to expression postaggregator.
final DruidExpression druidExpression = Expressions.toDruidExpression(
plannerContext,
rowSignature,
rexNode
);
if (druidExpression != null) {
return new ExpressionPostAggregator(
postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(),
druidExpression.getExpression(),
null,
plannerContext.getExprMacroTable()
);
}
}
// Could not translate.
if (rexNode instanceof RexCall || kind == SqlKind.LITERAL) {
return null;
} else {
throw new IAE("Unknown rexnode kind: " + kind);
@ -722,7 +749,7 @@ public class OperatorConversions
}
}
private static boolean throwOrReturn(
public static boolean throwOrReturn(
final boolean throwOnFailure,
final SqlCallBinding callBinding,
final Function<SqlCallBinding, CalciteException> exceptionMapper

View File

@ -30,7 +30,6 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.math.expr.ExpressionType;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.post.ExpressionPostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
@ -128,7 +127,8 @@ public class Projection
plannerContext,
inputRowSignature,
postAggregatorRexNode,
postAggregatorVisitor
postAggregatorVisitor,
false
);
if (pagg != null) {
@ -166,16 +166,7 @@ public class Projection
final DruidExpression postAggregatorExpression
)
{
if (postAggregatorComplexDirectColumnIsOk(inputRowSignature, postAggregatorExpression, postAggregatorRexNode)) {
// Direct column access on a COMPLEX column, expressions cannot operate on complex columns, only postaggs
// Wrap the column access in a field access postagg so that other postaggs can use it
final PostAggregator postAggregator = new FieldAccessPostAggregator(
postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(),
postAggregatorExpression.getDirectColumn()
);
postAggregatorVisitor.addPostAgg(postAggregator);
rowOrder.add(postAggregator.getName());
} else if (postAggregatorDirectColumnIsOk(inputRowSignature, postAggregatorExpression, postAggregatorRexNode)) {
if (postAggregatorDirectColumnIsOk(inputRowSignature, postAggregatorExpression, postAggregatorRexNode)) {
// Direct column access, without any type cast as far as Druid's runtime is concerned.
// (There might be a SQL-level type cast that we don't care about)
rowOrder.add(postAggregatorExpression.getDirectColumn());
@ -326,38 +317,6 @@ public class Projection
return toExprType.equals(fromExprType);
}
/**
* Returns true if a post-aggregation "expression" can be realized as a direct field access. This is true if it's
* a direct column access that doesn't require an implicit cast.
*
* @param aggregateRowSignature signature of the aggregation
* @param expression post-aggregation expression
* @param rexNode RexNode for the post-aggregation expression
*
* @return yes or no
*/
private static boolean postAggregatorComplexDirectColumnIsOk(
final RowSignature aggregateRowSignature,
final DruidExpression expression,
final RexNode rexNode
)
{
if (!expression.isDirectColumnAccess()) {
return false;
}
// Check if a cast is necessary.
final ColumnType toValueType =
aggregateRowSignature.getColumnType(expression.getDirectColumn())
.orElseThrow(
() -> new ISE("Encountered null type for column[%s]", expression.getDirectColumn())
);
final ColumnType fromValueType = Calcites.getColumnTypeForRelDataType(rexNode.getType());
return toValueType.is(ValueType.COMPLEX) && toValueType.equals(fromValueType);
}
public List<PostAggregator> getPostAggregators()
{
// If you ever see this error, it probably means a Projection was created in pre-aggregation mode, but then