Fix getResultType for HLL, quantiles aggregators. (#15043)

The aggregators had incorrect types for getResultType when shouldFinalze
is false. They had the finalized type, but they should have had the
intermediate type.

Also includes a refactor of how ExprMacroTable is handled in tests, to make
it easier to add tests for this to the MSQ module. The bug was originally
noticed because the incorrect result types caused MSQ queries with DS_HLL
to behave erratically.
This commit is contained in:
Gian Merlino 2023-09-26 20:21:14 -07:00 committed by GitHub
parent 7301e60a9c
commit 3dabfead05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 196 additions and 105 deletions

View File

@ -194,7 +194,11 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
@Override
public ColumnType getResultType()
{
return round ? ColumnType.LONG : ColumnType.DOUBLE;
if (shouldFinalize) {
return round ? ColumnType.LONG : ColumnType.DOUBLE;
} else {
return getIntermediateType();
}
}
@Nullable

View File

@ -410,7 +410,11 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory
@Override
public ColumnType getResultType()
{
return ColumnType.LONG;
if (shouldFinalize) {
return ColumnType.LONG;
} else {
return getIntermediateType();
}
}
@Override

View File

@ -293,6 +293,15 @@ public class HllSketchAggregatorFactoryTest
null,
null,
true
),
new HllSketchMergeAggregatorFactory(
"hllMergeNoFinalize",
"col",
null,
null,
null,
false,
false
)
)
.postAggregators(
@ -303,7 +312,14 @@ public class HllSketchAggregatorFactoryTest
new FieldAccessPostAggregator("hllMerge-access", "hllMerge"),
new FinalizingFieldAccessPostAggregator("hllMerge-finalize", "hllMerge"),
new FieldAccessPostAggregator("hllMergeRound-access", "hllMergeRound"),
new FinalizingFieldAccessPostAggregator("hllMergeRound-finalize", "hllMergeRound")
new FinalizingFieldAccessPostAggregator("hllMergeRound-finalize", "hllMergeRound"),
new FieldAccessPostAggregator("hllMergeNoFinalize-access", "hllMergeNoFinalize"),
new FinalizingFieldAccessPostAggregator("hllMergeNoFinalize-finalize", "hllMergeNoFinalize"),
new HllSketchToEstimatePostAggregator(
"hllMergeNoFinalize-estimate",
new FieldAccessPostAggregator(null, "hllMergeNoFinalize"),
false
)
)
.build();
@ -315,6 +331,7 @@ public class HllSketchAggregatorFactoryTest
.add("hllBuildRound", null)
.add("hllMerge", null)
.add("hllMergeRound", null)
.add("hllMergeNoFinalize", HllSketchMergeAggregatorFactory.TYPE)
.add("hllBuild-access", HllSketchBuildAggregatorFactory.TYPE)
.add("hllBuild-finalize", ColumnType.DOUBLE)
.add("hllBuildRound-access", HllSketchBuildAggregatorFactory.TYPE)
@ -323,6 +340,9 @@ public class HllSketchAggregatorFactoryTest
.add("hllMerge-finalize", ColumnType.DOUBLE)
.add("hllMergeRound-access", HllSketchMergeAggregatorFactory.TYPE)
.add("hllMergeRound-finalize", ColumnType.LONG)
.add("hllMergeNoFinalize-access", HllSketchMergeAggregatorFactory.TYPE)
.add("hllMergeNoFinalize-finalize", HllSketchMergeAggregatorFactory.TYPE)
.add("hllMergeNoFinalize-estimate", ColumnType.DOUBLE)
.build(),
new TimeseriesQueryQueryToolChest().resultArraySignature(query)
);

View File

@ -137,13 +137,16 @@ public class DoublesSketchAggregatorFactoryTest
.aggregators(
new CountAggregatorFactory("count"),
new DoublesSketchAggregatorFactory("doublesSketch", "col", 8),
new DoublesSketchMergeAggregatorFactory("doublesSketchMerge", 8)
new DoublesSketchMergeAggregatorFactory("doublesSketchMerge", 8),
new DoublesSketchMergeAggregatorFactory("doublesSketchNoFinalize", 8, null, false)
)
.postAggregators(
new FieldAccessPostAggregator("doublesSketch-access", "doublesSketch"),
new FinalizingFieldAccessPostAggregator("doublesSketch-finalize", "doublesSketch"),
new FieldAccessPostAggregator("doublesSketchMerge-access", "doublesSketchMerge"),
new FinalizingFieldAccessPostAggregator("doublesSketchMerge-finalize", "doublesSketchMerge")
new FinalizingFieldAccessPostAggregator("doublesSketchMerge-finalize", "doublesSketchMerge"),
new FieldAccessPostAggregator("doublesSketchNoFinalize-access", "doublesSketchNoFinalize"),
new FinalizingFieldAccessPostAggregator("doublesSketchNoFinalize-finalize", "doublesSketchNoFinalize")
)
.build();
@ -153,10 +156,13 @@ public class DoublesSketchAggregatorFactoryTest
.add("count", ColumnType.LONG)
.add("doublesSketch", null)
.add("doublesSketchMerge", null)
.add("doublesSketchNoFinalize", DoublesSketchModule.TYPE)
.add("doublesSketch-access", DoublesSketchModule.TYPE)
.add("doublesSketch-finalize", ColumnType.LONG)
.add("doublesSketchMerge-access", DoublesSketchModule.TYPE)
.add("doublesSketchMerge-finalize", ColumnType.LONG)
.add("doublesSketchNoFinalize-access", DoublesSketchModule.TYPE)
.add("doublesSketchNoFinalize-finalize", DoublesSketchModule.TYPE)
.build(),
new TimeseriesQueryQueryToolChest().resultArraySignature(query)
);

View File

@ -245,6 +245,12 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-datasketches</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-server</artifactId>

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.exec;
import com.google.common.collect.ImmutableList;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.MSQTuningConfig;
import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.junit.Test;
/**
* Tests of MSQ with functions from the "druid-datasketches" extension.
*/
public class MSQDataSketchesTest extends MSQTestBase
{
@Test
public void testHavingOnDsHll()
{
RowSignature resultSignature =
RowSignature.builder()
.add("dim2", ColumnType.STRING)
.add("col", ColumnType.ofComplex("HLLSketchBuild"))
.build();
GroupByQuery query =
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0")))
.setAggregatorSpecs(
aggregators(
new HllSketchBuildAggregatorFactory("a0", "m1", 12, "HLL_4", null, false, true)
)
)
.setHavingSpec(having(expressionFilter(("(hll_sketch_estimate(\"a0\") > 1)"))))
.setContext(DEFAULT_MSQ_CONTEXT)
.build();
testSelectQuery()
.setSql("SELECT dim2, DS_HLL(m1) as col\n"
+ "FROM foo\n"
+ "GROUP BY dim2\n"
+ "HAVING HLL_SKETCH_ESTIMATE(col) > 1")
.setExpectedMSQSpec(MSQSpec.builder()
.query(query)
.columnMappings(new ColumnMappings(ImmutableList.of(
new ColumnMapping("d0", "dim2"),
new ColumnMapping("a0", "col")
)))
.tuningConfig(MSQTuningConfig.defaultConfig())
.destination(TaskReportMSQDestination.INSTANCE)
.build())
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedRowSignature(resultSignature)
.setExpectedResultRows(
NullHandling.sqlCompatible()
? ImmutableList.of(
new Object[]{null, "\"AgEHDAMIAgCOlN8Fp9xhBA==\""},
new Object[]{"a", "\"AgEHDAMIAgALpZ0PPgu1BA==\""}
)
: ImmutableList.of(
new Object[]{"", "\"AgEHDAMIAwCOlN8FjkSVCqfcYQQ=\""},
new Object[]{"a", "\"AgEHDAMIAgALpZ0PPgu1BA==\""}
)
)
.verifyResults();
}
}

View File

@ -48,6 +48,7 @@ import org.apache.druid.frame.channel.FrameChannelSequence;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.guice.DruidSecondaryModule;
import org.apache.druid.guice.ExpressionModule;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.IndexingServiceTuningConfigModule;
import org.apache.druid.guice.JoinableFactoryModule;
@ -75,7 +76,6 @@ import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.input.InputSourceModule;
import org.apache.druid.msq.counters.CounterNames;
import org.apache.druid.msq.counters.CounterSnapshots;
@ -120,6 +120,7 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchModule;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
@ -314,26 +315,28 @@ public class MSQTestBase extends BaseCalciteQueryTest
{
super.configureGuice(builder);
builder.addModule(new DruidModule()
{
// Small subset of MsqSqlModule
@Override
public void configure(Binder binder)
{
// We want this module to bring InputSourceModule along for the ride.
binder.install(new InputSourceModule());
binder.install(new NestedDataModule());
NestedDataModule.registerHandlersAndSerde();
SqlBindings.addOperatorConversion(binder, ExternalOperatorConversion.class);
}
builder
.addModule(new HllSketchModule())
.addModule(new DruidModule()
{
// Small subset of MsqSqlModule
@Override
public void configure(Binder binder)
{
// We want this module to bring InputSourceModule along for the ride.
binder.install(new InputSourceModule());
binder.install(new NestedDataModule());
NestedDataModule.registerHandlersAndSerde();
SqlBindings.addOperatorConversion(binder, ExternalOperatorConversion.class);
}
@Override
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
{
// We want this module to bring input sources along for the ride.
return new InputSourceModule().getJacksonModules();
}
});
@Override
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
{
// We want this module to bring input sources along for the ride.
return new InputSourceModule().getJacksonModules();
}
});
}
@After
@ -367,10 +370,9 @@ public class MSQTestBase extends BaseCalciteQueryTest
SqlTestFramework qf = queryFramework();
Injector secondInjector = GuiceInjectors.makeStartupInjectorWithModules(
ImmutableList.of(
binder -> {
binder.bind(ExprMacroTable.class).toInstance(CalciteTests.createExprMacroTable());
binder.bind(DataSegment.PruneSpecsHolder.class).toInstance(DataSegment.PruneSpecsHolder.DEFAULT);
}
new ExpressionModule(),
(Module) binder ->
binder.bind(DataSegment.PruneSpecsHolder.class).toInstance(DataSegment.PruneSpecsHolder.DEFAULT)
)
);
@ -453,7 +455,6 @@ public class MSQTestBase extends BaseCalciteQueryTest
throw new ISE(e, "Unable to create setup storage connector");
}
binder.bind(ExprMacroTable.class).toInstance(CalciteTests.createExprMacroTable());
binder.bind(DataSegment.PruneSpecsHolder.class).toInstance(DataSegment.PruneSpecsHolder.DEFAULT);
},
// Requirement of WorkerMemoryParameters.createProductionInstanceForWorker(injector)
@ -474,9 +475,11 @@ public class MSQTestBase extends BaseCalciteQueryTest
binder.bind(MSQTaskSqlEngine.class).toProvider(Providers.of(null));
}
),
new ExpressionModule(),
new MSQExternalDataSourceModule(),
new LookylooModule(),
new SegmentWranglerModule(),
new HllSketchModule(),
binder -> binder.bind(BrokerClient.class).toInstance(brokerClient)
);
// adding node role injection to the modules, since CliPeon would also do that through run method

View File

@ -47,6 +47,10 @@ import org.apache.druid.query.expression.TrimExprMacro;
import java.util.List;
/**
* Module that binds {@link ExprMacroTable} to {@link GuiceExprMacroTable} and configures a starter set of
* {@link ExprMacroTable.ExprMacro} for all macros defined in the "druid-processing" module.
*/
public class ExpressionModule implements Module
{
public static final List<Class<? extends ExprMacroTable.ExprMacro>> EXPR_MACROS =

View File

@ -20,53 +20,26 @@
package org.apache.druid.query.expression;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import org.apache.druid.guice.DruidGuiceExtensions;
import org.apache.druid.guice.ExpressionModule;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.math.expr.ExprMacroTable;
public class TestExprMacroTable extends ExprMacroTable
public class TestExprMacroTable
{
public static final ExprMacroTable INSTANCE = new TestExprMacroTable();
public static final ExprMacroTable INSTANCE;
private TestExprMacroTable()
{
this(new DefaultObjectMapper());
}
private TestExprMacroTable(ObjectMapper jsonMapper)
{
super(
ImmutableList.of(
new ArrayQuantileExprMacro(),
new IPv4AddressMatchExprMacro(),
new IPv4AddressParseExprMacro(),
new IPv4AddressStringifyExprMacro(),
new LikeExprMacro(),
new RegexpLikeExprMacro(),
new RegexpExtractExprMacro(),
new RegexpReplaceExprMacro(),
new TimestampCeilExprMacro(),
new TimestampExtractExprMacro(),
new TimestampFloorExprMacro(),
new TimestampFormatExprMacro(),
new TimestampParseExprMacro(),
new TimestampShiftExprMacro(),
new TrimExprMacro.BothTrimExprMacro(),
new TrimExprMacro.LeftTrimExprMacro(),
new TrimExprMacro.RightTrimExprMacro(),
new HyperUniqueExpressions.HllCreateExprMacro(),
new HyperUniqueExpressions.HllAddExprMacro(),
new HyperUniqueExpressions.HllEstimateExprMacro(),
new HyperUniqueExpressions.HllRoundEstimateExprMacro(),
new NestedDataExpressions.JsonObjectExprMacro(),
new NestedDataExpressions.JsonKeysExprMacro(),
new NestedDataExpressions.JsonPathsExprMacro(),
new NestedDataExpressions.JsonValueExprMacro(),
new NestedDataExpressions.JsonQueryExprMacro(),
new NestedDataExpressions.ToJsonStringExprMacro(jsonMapper),
new NestedDataExpressions.ParseJsonExprMacro(jsonMapper),
new NestedDataExpressions.TryParseJsonExprMacro(jsonMapper)
)
static {
final Injector injector = Guice.createInjector(
new DruidGuiceExtensions(),
binder -> binder.bind(Key.get(ObjectMapper.class, Json.class)).toInstance(new DefaultObjectMapper()),
new ExpressionModule()
);
INSTANCE = injector.getInstance(ExprMacroTable.class);
}
}

View File

@ -20,11 +20,10 @@
package org.apache.druid.sql.calcite.util;
import com.google.inject.Injector;
import org.apache.druid.guice.ExpressionModule;
import org.apache.druid.guice.SegmentWranglerModule;
import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.initialization.CoreInjectorBuilder;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.sql.calcite.aggregation.SqlAggregationModule;
import org.apache.druid.sql.calcite.util.testoperator.CalciteTestOperatorModule;
@ -37,9 +36,10 @@ public class CalciteTestInjectorBuilder extends CoreInjectorBuilder
public CalciteTestInjectorBuilder()
{
super(new StartupInjectorBuilder()
.withEmptyProperties()
.build());
.withEmptyProperties()
.build());
add(
new ExpressionModule(),
new SegmentWranglerModule(),
new LookylooModule(),
new SqlAggregationModule(),
@ -47,14 +47,6 @@ public class CalciteTestInjectorBuilder extends CoreInjectorBuilder
);
}
public CalciteTestInjectorBuilder withDefaultMacroTable()
{
addModule(binder ->
binder.bind(ExprMacroTable.class).toInstance(TestExprMacroTable.INSTANCE)
);
return this;
}
@Override
public Injector build()
{

View File

@ -220,9 +220,7 @@ public class CalciteTests
null
);
public static final Injector INJECTOR = new CalciteTestInjectorBuilder()
.withDefaultMacroTable()
.build();
public static final Injector INJECTOR = new CalciteTestInjectorBuilder().build();
private CalciteTests()
{
@ -318,7 +316,7 @@ public class CalciteTests
public static ExprMacroTable createExprMacroTable()
{
return QueryFrameworkUtils.createExprMacroTable(INJECTOR);
return INJECTOR.getInstance(ExprMacroTable.class);
}
public static JoinableFactoryWrapper createJoinableFactoryWrapper()

View File

@ -26,10 +26,8 @@ import com.google.inject.Injector;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.druid.client.BrokerInternalQueryConfig;
import org.apache.druid.guice.ExpressionModule;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.DefaultQueryConfig;
import org.apache.druid.query.GlobalTableDataSource;
@ -38,7 +36,6 @@ import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.expression.LookupExprMacro;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.SegmentLoader;
@ -75,10 +72,7 @@ import org.apache.druid.sql.calcite.view.ViewManager;
import org.easymock.EasyMock;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@ -249,17 +243,6 @@ public class QueryFrameworkUtils
);
}
public static ExprMacroTable createExprMacroTable(final Injector injector)
{
final List<ExprMacroTable.ExprMacro> exprMacros = new ArrayList<>();
for (Class<? extends ExprMacroTable.ExprMacro> clazz : ExpressionModule.EXPR_MACROS) {
exprMacros.add(injector.getInstance(clazz));
}
exprMacros.add(injector.getInstance(LookupExprMacro.class));
return new ExprMacroTable(exprMacros);
}
public static DruidOperatorTable createOperatorTable(final Injector injector)
{
try {