diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index 990f9ce79c9..221cccdc2b9 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -1418,6 +1418,7 @@ The Druid SQL server is configured through the following properties on the Broke |`druid.sql.planner.useFallback`|Whether to evaluate operations on the Broker when they cannot be expressed as Druid queries. This option is not recommended for production since it can generate unscalable query plans. If false, SQL queries that cannot be translated to Druid queries will fail.|false| |`druid.sql.planner.requireTimeCondition`|Whether to require SQL to have filter conditions on __time column so that all generated native queries will have user specified intervals. If true, all queries wihout filter condition on __time column will fail|false| |`druid.sql.planner.sqlTimeZone`|Sets the default time zone for the server, which will affect how time functions and timestamp literals behave. Should be a time zone name like "America/Los_Angeles" or offset like "-08:00".|UTC| +|`druid.sql.planner.serializeComplexValues`|Whether to serialize "complex" output values, false will return the class name instead of the serialized value.|true| #### Broker Caching diff --git a/docs/content/development/extensions-core/bloom-filter.md b/docs/content/development/extensions-core/bloom-filter.md index f878e75b2b3..651cc305962 100644 --- a/docs/content/development/extensions-core/bloom-filter.md +++ b/docs/content/development/extensions-core/bloom-filter.md @@ -89,9 +89,9 @@ This string can then be used in the native or sql Druid query. Note: `org.apache.hive.common.util.BloomKFilter` provides a serialize method which can be used to serialize bloom filters to outputStream. -### SQL Queries +### Filtering SQL Queries -Bloom filters are supported in SQL via the `bloom_filter_test` operator: +Bloom filters can be used in SQL `WHERE` clauses via the `bloom_filter_test` operator: ```sql SELECT COUNT(*) FROM druid.foo WHERE bloom_filter_test(, '') @@ -108,7 +108,11 @@ bloom_filter_test(, '') ## Bloom Filter Query Aggregator -Input for a `bloomKFilter` can also be created from a druid query with the `bloom` aggregator. +Input for a `bloomKFilter` can also be created from a druid query with the `bloom` aggregator. Note that it is very +important to set a reasonable value for the `maxNumEntries` parameter, which is the maximum number of distinct entries +that the bloom filter can represent without increasing the false postive rate. It may be worth performing a query using +one of the unique count sketches to calculate the value for this parameter in order to build a bloom filter appropriate +for the query. ### JSON Specification of Bloom Filter Aggregator @@ -157,8 +161,19 @@ response [{"timestamp":"2015-09-12T00:00:00.000Z","result":{"userBloom":"BAAAJhAAAA..."}}] ``` -These values can then be set in the filter specification above. +These values can then be set in the filter specification described above. Ordering results by a bloom filter aggregator, for example in a TopN query, will perform a comparatively expensive linear scan _of the filter itself_ to count the number of set bits as a means of approximating how many items have been -added to the set. As such, ordering by an alternate aggregation is recommended if possible. \ No newline at end of file +added to the set. As such, ordering by an alternate aggregation is recommended if possible. + + +### SQL Bloom Filter Aggregator +Bloom filters can be computed in SQL expressions with the `bloom_filter` aggregator: + +```sql +SELECT BLOOM_FILTER(, ) FROM druid.foo WHERE dim2 = 'abc' +``` + +but requires the setting `druid.sql.planner.serializeComplexValues` to be set to `true`. Bloom filter results in an SQL + response are serialized into a base64 string, which can then be used in subsequent queries as a filter. \ No newline at end of file diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md index 6c9674c66ec..358c1ec3f1a 100644 --- a/docs/content/querying/sql.md +++ b/docs/content/querying/sql.md @@ -120,6 +120,7 @@ Only the COUNT aggregation can accept DISTINCT. |`AVG(expr)`|Averages numbers.| |`APPROX_COUNT_DISTINCT(expr)`|Counts distinct values of expr, which can be a regular column or a hyperUnique column. This is always approximate, regardless of the value of "useApproximateCountDistinct". See also `COUNT(DISTINCT expr)`.| |`APPROX_QUANTILE(expr, probability, [resolution])`|Computes approximate quantiles on numeric or approxHistogram exprs. The "probability" should be between 0 and 1 (exclusive). The "resolution" is the number of centroids to use for the computation. Higher resolutions will give more precise results but also have higher overhead. If not provided, the default resolution is 50. The [approximate histogram extension](../development/extensions-core/approximate-histograms.html) must be loaded to use this function.| +|`BLOOM_FILTER(expr, numEntries)`|Computes a bloom filter from values produced by `expr`, with `numEntries` maximum number of distinct values before false positve rate increases. See [bloom filter extension](../development/extensions-core/bloom-filter.html) documentation for additional details.| ### Numeric functions diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/guice/BloomFilterExtensionModule.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/guice/BloomFilterExtensionModule.java index 324488926f2..8bef477ce57 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/guice/BloomFilterExtensionModule.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/guice/BloomFilterExtensionModule.java @@ -22,6 +22,7 @@ package org.apache.druid.guice; import com.fasterxml.jackson.databind.Module; import com.google.inject.Binder; import org.apache.druid.initialization.DruidModule; +import org.apache.druid.query.aggregation.bloom.sql.BloomFilterSqlAggregator; import org.apache.druid.query.expressions.BloomFilterExprMacro; import org.apache.druid.query.filter.sql.BloomFilterOperatorConversion; import org.apache.druid.sql.guice.SqlBindings; @@ -42,7 +43,7 @@ public class BloomFilterExtensionModule implements DruidModule public void configure(Binder binder) { SqlBindings.addOperatorConversion(binder, BloomFilterOperatorConversion.class); - + SqlBindings.addAggregator(binder, BloomFilterSqlAggregator.class); ExpressionModule.addExprMacro(binder, BloomFilterExprMacro.class); } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregator.java new file mode 100644 index 00000000000..0a37dad945f --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregator.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.bloom.sql; + +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlTypeFamily; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.bloom.BloomFilterAggregatorFactory; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.dimension.ExtractionDimensionSpec; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.sql.calcite.aggregation.Aggregation; +import org.apache.druid.sql.calcite.aggregation.SqlAggregator; +import org.apache.druid.sql.calcite.expression.DruidExpression; +import org.apache.druid.sql.calcite.expression.Expressions; +import org.apache.druid.sql.calcite.planner.Calcites; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.table.RowSignature; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; + +public class BloomFilterSqlAggregator implements SqlAggregator +{ + private static final SqlAggFunction FUNCTION_INSTANCE = new BloomFilterSqlAggFunction(); + private static final String NAME = "BLOOM_FILTER"; + + @Override + public SqlAggFunction calciteFunction() + { + return FUNCTION_INSTANCE; + } + + @Nullable + @Override + public Aggregation toDruidAggregation( + PlannerContext plannerContext, + RowSignature rowSignature, + RexBuilder rexBuilder, + String name, + AggregateCall aggregateCall, + Project project, + List existingAggregations, + boolean finalizeAggregations + ) + { + final RexNode inputOperand = Expressions.fromFieldAccess( + rowSignature, + project, + aggregateCall.getArgList().get(0) + ); + final DruidExpression input = Expressions.toDruidExpression( + plannerContext, + rowSignature, + inputOperand + ); + if (input == null) { + return null; + } + + final AggregatorFactory aggregatorFactory; + final String aggName = StringUtils.format("%s:agg", name); + final RexNode maxNumEntriesOperand = Expressions.fromFieldAccess( + rowSignature, + project, + aggregateCall.getArgList().get(1) + ); + + if (!maxNumEntriesOperand.isA(SqlKind.LITERAL)) { + // maxNumEntriesOperand must be a literal in order to plan. + return null; + } + + final int maxNumEntries = ((Number) RexLiteral.value(maxNumEntriesOperand)).intValue(); + + // Look for existing matching aggregatorFactory. + for (final Aggregation existing : existingAggregations) { + for (AggregatorFactory factory : existing.getAggregatorFactories()) { + if (factory instanceof BloomFilterAggregatorFactory) { + final BloomFilterAggregatorFactory theFactory = (BloomFilterAggregatorFactory) factory; + + // Check input for equivalence. + final boolean inputMatches; + final VirtualColumn virtualInput = + existing.getVirtualColumns() + .stream() + .filter(virtualColumn -> + virtualColumn.getOutputName().equals(theFactory.getField().getOutputName()) + ) + .findFirst() + .orElse(null); + + if (virtualInput == null) { + if (input.isDirectColumnAccess()) { + inputMatches = + input.getDirectColumn().equals(theFactory.getField().getDimension()); + } else { + inputMatches = + input.getSimpleExtraction().getColumn().equals(theFactory.getField().getDimension()) && + input.getSimpleExtraction().getExtractionFn().equals(theFactory.getField().getExtractionFn()); + } + } else { + inputMatches = ((ExpressionVirtualColumn) virtualInput).getExpression().equals(input.getExpression()); + } + + final boolean matches = inputMatches && theFactory.getMaxNumEntries() == maxNumEntries; + + if (matches) { + // Found existing one. Use this. + return Aggregation.create( + theFactory + ); + } + } + } + } + + // No existing match found. Create a new one. + final List virtualColumns = new ArrayList<>(); + + ValueType valueType = Calcites.getValueTypeForSqlTypeName(inputOperand.getType().getSqlTypeName()); + final DimensionSpec spec; + if (input.isDirectColumnAccess()) { + spec = new DefaultDimensionSpec( + input.getSimpleExtraction().getColumn(), + StringUtils.format("%s:%s", name, input.getSimpleExtraction().getColumn()), + valueType + ); + } else if (input.isSimpleExtraction()) { + spec = new ExtractionDimensionSpec( + input.getSimpleExtraction().getColumn(), + StringUtils.format("%s:%s", name, input.getSimpleExtraction().getColumn()), + valueType, + input.getSimpleExtraction().getExtractionFn() + ); + } else { + final ExpressionVirtualColumn virtualColumn = input.toVirtualColumn( + StringUtils.format("%s:v", aggName), + valueType, + plannerContext.getExprMacroTable() + ); + virtualColumns.add(virtualColumn); + spec = new DefaultDimensionSpec(virtualColumn.getOutputName(), virtualColumn.getOutputName()); + } + + aggregatorFactory = new BloomFilterAggregatorFactory( + aggName, + spec, + maxNumEntries + ); + + return Aggregation.create( + virtualColumns, + aggregatorFactory + ); + } + + private static class BloomFilterSqlAggFunction extends SqlAggFunction + { + private static final String SIGNATURE1 = "'" + NAME + "(column, maxNumEntries)'\n"; + + BloomFilterSqlAggFunction() + { + super( + NAME, + null, + SqlKind.OTHER_FUNCTION, + ReturnTypes.explicit(SqlTypeName.OTHER), + null, + OperandTypes.and( + OperandTypes.sequence(SIGNATURE1, OperandTypes.ANY, OperandTypes.LITERAL), + OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC) + ), + SqlFunctionCategory.USER_DEFINED_FUNCTION, + false, + false + ); + } + } +} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomKFilter.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomKFilter.java index 12533a20c7c..090756a8ec9 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomKFilter.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomKFilter.java @@ -33,7 +33,7 @@ import java.util.Arrays; /** * This is a direct modification of the Apache Hive 'BloomKFilter', found at: - * https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomKFilter.java + * https://github.com/apache/hive/blob/rel/storage-release-2.7.0/storage-api/src/java/org/apache/hive/common/util/BloomKFilter.java * modified to store variables which are re-used instead of re-allocated per call as {@link ThreadLocal} so multiple * threads can share the same filter object. Note that this is snapshot at hive-storag-api version 2.7.0, latest * versions break compatibility with how int/float are stored in a bloom filter in this commit: diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java new file mode 100644 index 00000000000..4641e274899 --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java @@ -0,0 +1,642 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.bloom.sql; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Key; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.DoubleDimensionSchema; +import org.apache.druid.data.input.impl.FloatDimensionSchema; +import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.data.input.impl.LongDimensionSchema; +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.data.input.impl.TimeAndDimsParseSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.guice.BloomFilterSerializersModule; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.Druids; +import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; +import org.apache.druid.query.aggregation.bloom.BloomFilterAggregatorFactory; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.ExtractionDimensionSpec; +import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable; +import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.query.extraction.SubstringDimExtractionFn; +import org.apache.druid.query.filter.BloomKFilter; +import org.apache.druid.query.lookup.LookupReferencesManager; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.segment.IndexBuilder; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.server.security.AuthTestUtils; +import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.sql.SqlLifecycle; +import org.apache.druid.sql.SqlLifecycleFactory; +import org.apache.druid.sql.calcite.BaseCalciteQueryTest; +import org.apache.druid.sql.calcite.filtration.Filtration; +import org.apache.druid.sql.calcite.planner.DruidOperatorTable; +import org.apache.druid.sql.calcite.planner.PlannerConfig; +import org.apache.druid.sql.calcite.planner.PlannerFactory; +import org.apache.druid.sql.calcite.schema.DruidSchema; +import org.apache.druid.sql.calcite.schema.SystemSchema; +import org.apache.druid.sql.calcite.util.CalciteTests; +import org.apache.druid.sql.calcite.util.QueryLogHook; +import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.LinearShardSpec; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +public class BloomFilterSqlAggregatorTest +{ + private static final int TEST_NUM_ENTRIES = 1000; + private static AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT; + private static final Injector injector = Guice.createInjector( + binder -> { + binder.bind(Key.get(ObjectMapper.class, Json.class)).toInstance(TestHelper.makeJsonMapper()); + binder.bind(LookupReferencesManager.class).toInstance( + LookupEnabledTestExprMacroTable.createTestLookupReferencesManager( + ImmutableMap.of( + "a", "xa", + "abc", "xabc" + ) + ) + ); + } + ); + + private static ObjectMapper jsonMapper = + injector + .getInstance(Key.get(ObjectMapper.class, Json.class)) + .registerModules(Collections.singletonList(new BloomFilterSerializersModule())); + + private static final String DATA_SOURCE = "numfoo"; + + private static QueryRunnerFactoryConglomerate conglomerate; + private static Closer resourceCloser; + + @BeforeClass + public static void setUpClass() + { + final Pair conglomerateCloserPair = CalciteTests + .createQueryRunnerFactoryConglomerate(); + conglomerate = conglomerateCloserPair.lhs; + resourceCloser = conglomerateCloserPair.rhs; + } + + @AfterClass + public static void tearDownClass() throws IOException + { + resourceCloser.close(); + } + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Rule + public QueryLogHook queryLogHook = QueryLogHook.create(jsonMapper); + + private SpecificSegmentsQuerySegmentWalker walker; + private SqlLifecycleFactory sqlLifecycleFactory; + + @Before + public void setUp() throws Exception + { + InputRowParser parser = new MapInputRowParser( + new TimeAndDimsParseSpec( + new TimestampSpec("t", "iso", null), + new DimensionsSpec( + ImmutableList.builder() + .addAll(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "dim3"))) + .add(new DoubleDimensionSchema("d1")) + .add(new FloatDimensionSchema("f1")) + .add(new LongDimensionSchema("l1")) + .build(), + null, + null + ) + )); + + final QueryableIndex index = + IndexBuilder.create() + .tmpDir(temporaryFolder.newFolder()) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema( + new IncrementalIndexSchema.Builder() + .withMetrics( + new CountAggregatorFactory("cnt"), + new DoubleSumAggregatorFactory("m1", "m1") + ) + .withDimensionsSpec(parser) + .withRollup(false) + .build() + ) + .rows(CalciteTests.ROWS1_WITH_NUMERIC_DIMS) + .buildMMappedIndex(); + + + walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add( + DataSegment.builder() + .dataSource(DATA_SOURCE) + .interval(index.getDataInterval()) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .build(), + index + ); + + final PlannerConfig plannerConfig = new PlannerConfig(); + final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig); + final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker); + final DruidOperatorTable operatorTable = new DruidOperatorTable( + ImmutableSet.of(new BloomFilterSqlAggregator()), + ImmutableSet.of() + ); + + sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory( + new PlannerFactory( + druidSchema, + systemSchema, + CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), + operatorTable, + CalciteTests.createExprMacroTable(), + plannerConfig, + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + jsonMapper + ) + ); + } + + @After + public void tearDown() throws Exception + { + walker.close(); + walker = null; + } + + @Test + public void testBloomFilterAgg() throws Exception + { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + final String sql = "SELECT\n" + + "BLOOM_FILTER(dim1, 1000)\n" + + "FROM numfoo"; + + final List results = + sqlLifecycle.runSimple(sql, BaseCalciteQueryTest.QUERY_CONTEXT_DEFAULT, authenticationResult).toList(); + + BloomKFilter expected1 = new BloomKFilter(TEST_NUM_ENTRIES); + for (InputRow row : CalciteTests.ROWS1_WITH_NUMERIC_DIMS) { + String raw = NullHandling.emptyToNullIfNeeded((String) row.getRaw("dim1")); + if (raw == null) { + expected1.addBytes(null, 0, 0); + } else { + expected1.addString(raw); + } + } + + final List expectedResults = ImmutableList.of( + new Object[]{ + jsonMapper.writeValueAsString(expected1) + } + ); + Assert.assertEquals(expectedResults.size(), results.size()); + for (int i = 0; i < expectedResults.size(); i++) { + Assert.assertArrayEquals(expectedResults.get(i), results.get(i)); + } + + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE3) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .aggregators( + ImmutableList.of( + new BloomFilterAggregatorFactory( + "a0:agg", + new DefaultDimensionSpec("dim1", "a0:dim1"), + TEST_NUM_ENTRIES + ) + ) + ) + .context(BaseCalciteQueryTest.TIMESERIES_CONTEXT_DEFAULT) + .build(), + Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) + ); + } + + @Test + public void testBloomFilterTwoAggs() throws Exception + { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + final String sql = "SELECT\n" + + "BLOOM_FILTER(dim1, 1000),\n" + + "BLOOM_FILTER(dim2, 1000)\n" + + "FROM numfoo"; + + final List results = + sqlLifecycle.runSimple(sql, BaseCalciteQueryTest.QUERY_CONTEXT_DEFAULT, authenticationResult).toList(); + + BloomKFilter expected1 = new BloomKFilter(TEST_NUM_ENTRIES); + BloomKFilter expected2 = new BloomKFilter(TEST_NUM_ENTRIES); + for (InputRow row : CalciteTests.ROWS1_WITH_NUMERIC_DIMS) { + String raw = NullHandling.emptyToNullIfNeeded((String) row.getRaw("dim1")); + if (raw == null) { + expected1.addBytes(null, 0, 0); + } else { + expected1.addString(raw); + } + List lst = row.getDimension("dim2"); + if (lst.size() == 0) { + expected2.addBytes(null, 0, 0); + } + for (String s : lst) { + String val = NullHandling.emptyToNullIfNeeded(s); + if (val == null) { + expected2.addBytes(null, 0, 0); + } else { + expected2.addString(val); + } + } + } + + final List expectedResults = ImmutableList.of( + new Object[]{ + jsonMapper.writeValueAsString(expected1), + jsonMapper.writeValueAsString(expected2) + } + ); + Assert.assertEquals(expectedResults.size(), results.size()); + for (int i = 0; i < expectedResults.size(); i++) { + Assert.assertArrayEquals(expectedResults.get(i), results.get(i)); + } + + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE3) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .aggregators( + ImmutableList.of( + new BloomFilterAggregatorFactory( + "a0:agg", + new DefaultDimensionSpec("dim1", "a0:dim1"), + TEST_NUM_ENTRIES + ), + new BloomFilterAggregatorFactory( + "a1:agg", + new DefaultDimensionSpec("dim2", "a1:dim2"), + TEST_NUM_ENTRIES + ) + ) + ) + .context(BaseCalciteQueryTest.TIMESERIES_CONTEXT_DEFAULT) + .build(), + Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) + ); + } + + @Test + public void testBloomFilterAggExtractionFn() throws Exception + { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + final String sql = "SELECT\n" + + "BLOOM_FILTER(SUBSTRING(dim1, 1, 1), 1000)\n" + + "FROM numfoo"; + + final List results = + sqlLifecycle.runSimple(sql, BaseCalciteQueryTest.QUERY_CONTEXT_DEFAULT, authenticationResult).toList(); + + BloomKFilter expected1 = new BloomKFilter(TEST_NUM_ENTRIES); + for (InputRow row : CalciteTests.ROWS1_WITH_NUMERIC_DIMS) { + String raw = NullHandling.emptyToNullIfNeeded((String) row.getRaw("dim1")); + // empty string extractionFn produces null + if (raw == null || "".equals(raw)) { + expected1.addBytes(null, 0, 0); + } else { + expected1.addString(raw.substring(0, 1)); + } + } + final List expectedResults = ImmutableList.of( + new Object[]{ + jsonMapper.writeValueAsString(expected1) + } + ); + Assert.assertEquals(expectedResults.size(), results.size()); + for (int i = 0; i < expectedResults.size(); i++) { + Assert.assertArrayEquals(expectedResults.get(i), results.get(i)); + } + + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE3) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .aggregators( + ImmutableList.of( + new BloomFilterAggregatorFactory( + "a0:agg", + new ExtractionDimensionSpec( + "dim1", + "a0:dim1", + new SubstringDimExtractionFn(0, 1) + ), + TEST_NUM_ENTRIES + ) + ) + ) + .context(BaseCalciteQueryTest.TIMESERIES_CONTEXT_DEFAULT) + .build(), + Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) + ); + } + + @Test + public void testBloomFilterAggLong() throws Exception + { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + + final String sql = "SELECT\n" + + "BLOOM_FILTER(l1, 1000)\n" + + "FROM numfoo"; + + final List results = + sqlLifecycle.runSimple(sql, BaseCalciteQueryTest.QUERY_CONTEXT_DEFAULT, authenticationResult).toList(); + + + BloomKFilter expected3 = new BloomKFilter(TEST_NUM_ENTRIES); + for (InputRow row : CalciteTests.ROWS1_WITH_NUMERIC_DIMS) { + Object raw = row.getRaw("l1"); + if (raw == null) { + if (NullHandling.replaceWithDefault()) { + expected3.addLong(NullHandling.defaultLongValue()); + } else { + expected3.addBytes(null, 0, 0); + } + } else { + expected3.addLong(((Number) raw).longValue()); + } + } + final List expectedResults = ImmutableList.of( + new Object[]{ + jsonMapper.writeValueAsString(expected3) + } + ); + Assert.assertEquals(expectedResults.size(), results.size()); + for (int i = 0; i < expectedResults.size(); i++) { + Assert.assertArrayEquals(expectedResults.get(i), results.get(i)); + } + + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE3) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .aggregators( + ImmutableList.of( + new BloomFilterAggregatorFactory( + "a0:agg", + new DefaultDimensionSpec("l1", "a0:l1", ValueType.LONG), + TEST_NUM_ENTRIES + ) + ) + ) + .context(BaseCalciteQueryTest.TIMESERIES_CONTEXT_DEFAULT) + .build(), + Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) + ); + } + + @Test + public void testBloomFilterAggLongVirtualColumn() throws Exception + { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + final String sql = "SELECT\n" + + "BLOOM_FILTER(l1 * 2, 1000)\n" + + "FROM numfoo"; + + final List results = + sqlLifecycle.runSimple(sql, BaseCalciteQueryTest.QUERY_CONTEXT_DEFAULT, authenticationResult).toList(); + + BloomKFilter expected1 = new BloomKFilter(TEST_NUM_ENTRIES); + for (InputRow row : CalciteTests.ROWS1_WITH_NUMERIC_DIMS) { + Object raw = row.getRaw("l1"); + if (raw == null) { + if (NullHandling.replaceWithDefault()) { + expected1.addLong(NullHandling.defaultLongValue()); + } else { + expected1.addBytes(null, 0, 0); + } + } else { + expected1.addLong(2 * ((Number) raw).longValue()); + } + } + final List expectedResults = ImmutableList.of( + new Object[]{ + jsonMapper.writeValueAsString(expected1) + } + ); + Assert.assertEquals(expectedResults.size(), results.size()); + for (int i = 0; i < expectedResults.size(); i++) { + Assert.assertArrayEquals(expectedResults.get(i), results.get(i)); + } + + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE3) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .virtualColumns( + new ExpressionVirtualColumn( + "a0:agg:v", + "(\"l1\" * 2)", + ValueType.LONG, + TestExprMacroTable.INSTANCE + ) + ) + .aggregators( + ImmutableList.of( + new BloomFilterAggregatorFactory( + "a0:agg", + new DefaultDimensionSpec("a0:agg:v", "a0:agg:v"), + TEST_NUM_ENTRIES + ) + ) + ) + .context(BaseCalciteQueryTest.TIMESERIES_CONTEXT_DEFAULT) + .build(), + Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) + ); + } + + @Test + public void testBloomFilterAggFloatVirtualColumn() throws Exception + { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + final String sql = "SELECT\n" + + "BLOOM_FILTER(f1 * 2, 1000)\n" + + "FROM numfoo"; + + final List results = + sqlLifecycle.runSimple(sql, BaseCalciteQueryTest.QUERY_CONTEXT_DEFAULT, authenticationResult).toList(); + + BloomKFilter expected1 = new BloomKFilter(TEST_NUM_ENTRIES); + for (InputRow row : CalciteTests.ROWS1_WITH_NUMERIC_DIMS) { + Object raw = row.getRaw("f1"); + if (raw == null) { + if (NullHandling.replaceWithDefault()) { + expected1.addFloat(NullHandling.defaultFloatValue()); + } else { + expected1.addBytes(null, 0, 0); + } + } else { + expected1.addFloat(2 * ((Number) raw).floatValue()); + } + } + final List expectedResults = ImmutableList.of( + new Object[]{ + jsonMapper.writeValueAsString(expected1) + } + ); + Assert.assertEquals(expectedResults.size(), results.size()); + for (int i = 0; i < expectedResults.size(); i++) { + Assert.assertArrayEquals(expectedResults.get(i), results.get(i)); + } + + // Verify query + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE3) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .virtualColumns( + new ExpressionVirtualColumn( + "a0:agg:v", + "(\"f1\" * 2)", + ValueType.FLOAT, + TestExprMacroTable.INSTANCE + ) + ) + .aggregators( + ImmutableList.of( + new BloomFilterAggregatorFactory( + "a0:agg", + new DefaultDimensionSpec("a0:agg:v", "a0:agg:v"), + TEST_NUM_ENTRIES + ) + ) + ) + .context(BaseCalciteQueryTest.TIMESERIES_CONTEXT_DEFAULT) + .build(), + Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) + ); + } + + @Test + public void testBloomFilterAggDoubleVirtualColumn() throws Exception + { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + final String sql = "SELECT\n" + + "BLOOM_FILTER(d1 * 2, 1000)\n" + + "FROM numfoo"; + + final List results = + sqlLifecycle.runSimple(sql, BaseCalciteQueryTest.QUERY_CONTEXT_DEFAULT, authenticationResult).toList(); + + BloomKFilter expected1 = new BloomKFilter(TEST_NUM_ENTRIES); + for (InputRow row : CalciteTests.ROWS1_WITH_NUMERIC_DIMS) { + Object raw = row.getRaw("d1"); + if (raw == null) { + if (NullHandling.replaceWithDefault()) { + expected1.addDouble(NullHandling.defaultDoubleValue()); + } else { + expected1.addBytes(null, 0, 0); + } + } else { + expected1.addDouble(2 * ((Number) raw).doubleValue()); + } + } + final List expectedResults = ImmutableList.of( + new Object[]{ + jsonMapper.writeValueAsString(expected1) + } + ); + Assert.assertEquals(expectedResults.size(), results.size()); + for (int i = 0; i < expectedResults.size(); i++) { + Assert.assertArrayEquals(expectedResults.get(i), results.get(i)); + } + + // Verify query + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE3) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .virtualColumns( + new ExpressionVirtualColumn( + "a0:agg:v", + "(\"d1\" * 2)", + ValueType.DOUBLE, + TestExprMacroTable.INSTANCE + ) + ) + .aggregators( + ImmutableList.of( + new BloomFilterAggregatorFactory( + "a0:agg", + new DefaultDimensionSpec("a0:agg:v", "a0:agg:v"), + TEST_NUM_ENTRIES + ) + ) + ) + .context(BaseCalciteQueryTest.TIMESERIES_CONTEXT_DEFAULT) + .build(), + Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) + ); + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java index fe9e72ffb4b..766bf92e6d4 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java @@ -66,6 +66,9 @@ public class PlannerConfig @JsonProperty private DateTimeZone sqlTimeZone = DateTimeZone.UTC; + @JsonProperty + private boolean serializeComplexValues = true; + public Period getMetadataRefreshPeriod() { return metadataRefreshPeriod; @@ -121,6 +124,11 @@ public class PlannerConfig return awaitInitializationOnStart; } + public boolean shouldSerializeComplexValues() + { + return serializeComplexValues; + } + public PlannerConfig withOverrides(final Map context) { if (context == null) { @@ -151,6 +159,7 @@ public class PlannerConfig newConfig.requireTimeCondition = isRequireTimeCondition(); newConfig.sqlTimeZone = getSqlTimeZone(); newConfig.awaitInitializationOnStart = isAwaitInitializationOnStart(); + newConfig.serializeComplexValues = shouldSerializeComplexValues(); return newConfig; } @@ -191,6 +200,7 @@ public class PlannerConfig useFallback == that.useFallback && requireTimeCondition == that.requireTimeCondition && awaitInitializationOnStart == that.awaitInitializationOnStart && + serializeComplexValues == that.serializeComplexValues && Objects.equals(metadataRefreshPeriod, that.metadataRefreshPeriod) && Objects.equals(sqlTimeZone, that.sqlTimeZone); } @@ -210,7 +220,8 @@ public class PlannerConfig useFallback, requireTimeCondition, awaitInitializationOnStart, - sqlTimeZone + sqlTimeZone, + serializeComplexValues ); } @@ -229,6 +240,7 @@ public class PlannerConfig ", requireTimeCondition=" + requireTimeCondition + ", awaitInitializationOnStart=" + awaitInitializationOnStart + ", sqlTimeZone=" + sqlTimeZone + + ", serializeComplexValues=" + serializeComplexValues + '}'; } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java index b3f7d0b2bbd..658fb540dd4 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java @@ -19,6 +19,7 @@ package org.apache.druid.sql.calcite.rel; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Preconditions; @@ -493,8 +494,17 @@ public class QueryMaker throw new ISE("Cannot coerce[%s] to %s", value.getClass().getName(), sqlType); } } else if (sqlType == SqlTypeName.OTHER) { - // Complex type got out somehow. - coercedValue = value.getClass().getName(); + // Complex type, try to serialize if we should, else print class name + if (plannerContext.getPlannerConfig().shouldSerializeComplexValues()) { + try { + coercedValue = jsonMapper.writeValueAsString(value); + } + catch (JsonProcessingException jex) { + throw new ISE(jex, "Cannot coerce[%s] to %s", value.getClass().getName(), sqlType); + } + } else { + coercedValue = value.getClass().getName(); + } } else { throw new ISE("Cannot coerce[%s] to %s", value.getClass().getName(), sqlType); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index b5d083100c7..6b47b42993b 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -101,6 +101,14 @@ public class BaseCalciteQueryTest extends CalciteTestBase public static final Logger log = new Logger(BaseCalciteQueryTest.class); public static final PlannerConfig PLANNER_CONFIG_DEFAULT = new PlannerConfig(); + public static final PlannerConfig PLANNER_CONFIG_DEFAULT_NO_COMPLEX_SERDE = new PlannerConfig() + { + @Override + public boolean shouldSerializeComplexValues() + { + return false; + } + }; public static final PlannerConfig PLANNER_CONFIG_REQUIRE_TIME_CONDITION = new PlannerConfig() { @Override diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 46aed6e589e..b89ae45a1d4 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -422,7 +422,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest { String hyperLogLogCollectorClassName = HLLC_STRING; testQuery( + PLANNER_CONFIG_DEFAULT_NO_COMPLEX_SERDE, + QUERY_CONTEXT_DEFAULT, "SELECT * FROM druid.foo", + CalciteTests.REGULAR_USER_AUTH_RESULT, ImmutableList.of( newScanQueryBuilder() .dataSource(CalciteTests.DATASOURCE1) @@ -474,7 +477,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest "abcd", 9999.0f, NullHandling.defaultDoubleValue(), - HLLC_STRING + "\"AQAAAQAAAALFBA==\"" } ) ); @@ -518,7 +521,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest public void testSelectStarWithLimit() throws Exception { testQuery( + PLANNER_CONFIG_DEFAULT_NO_COMPLEX_SERDE, + QUERY_CONTEXT_DEFAULT, "SELECT * FROM druid.foo LIMIT 2", + CalciteTests.REGULAR_USER_AUTH_RESULT, ImmutableList.of( newScanQueryBuilder() .dataSource(CalciteTests.DATASOURCE1) @@ -565,7 +571,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest public void testSelectStarWithLimitTimeDescending() throws Exception { testQuery( + PLANNER_CONFIG_DEFAULT_NO_COMPLEX_SERDE, + QUERY_CONTEXT_DEFAULT, "SELECT * FROM druid.foo ORDER BY __time DESC LIMIT 2", + CalciteTests.REGULAR_USER_AUTH_RESULT, ImmutableList.of( Druids.newSelectQueryBuilder() .dataSource(CalciteTests.DATASOURCE1) @@ -589,7 +598,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest public void testSelectStarWithoutLimitTimeAscending() throws Exception { testQuery( + PLANNER_CONFIG_DEFAULT_NO_COMPLEX_SERDE, + QUERY_CONTEXT_DEFAULT, "SELECT * FROM druid.foo ORDER BY __time", + CalciteTests.REGULAR_USER_AUTH_RESULT, ImmutableList.of( Druids.newSelectQueryBuilder() .dataSource(CalciteTests.DATASOURCE1) @@ -1852,7 +1864,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest public void testSelectStarWithDimFilter() throws Exception { testQuery( + PLANNER_CONFIG_DEFAULT_NO_COMPLEX_SERDE, + QUERY_CONTEXT_DEFAULT, "SELECT * FROM druid.foo WHERE dim1 > 'd' OR dim2 = 'a'", + CalciteTests.REGULAR_USER_AUTH_RESULT, ImmutableList.of( newScanQueryBuilder() .dataSource(CalciteTests.DATASOURCE1) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java index 8b1c9aae0ca..14b226896d2 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java @@ -112,7 +112,14 @@ public class SqlResourceTest extends CalciteTestBase { walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder()); - final PlannerConfig plannerConfig = new PlannerConfig(); + final PlannerConfig plannerConfig = new PlannerConfig() + { + @Override + public boolean shouldSerializeComplexValues() + { + return false; + } + }; final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig); final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker); final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();