bloom filter sql aggregator (#6950)

* adds sql aggregator for bloom filter, adds complex value serde for sql results

* fix tests

* checkstyle

* fix copy-paste
This commit is contained in:
Clint Wylie 2019-02-01 13:54:46 -08:00 committed by Jonathan Wei
parent e45f9ea5e9
commit 7a5827e12e
12 changed files with 936 additions and 12 deletions

View File

@ -1418,6 +1418,7 @@ The Druid SQL server is configured through the following properties on the Broke
|`druid.sql.planner.useFallback`|Whether to evaluate operations on the Broker when they cannot be expressed as Druid queries. This option is not recommended for production since it can generate unscalable query plans. If false, SQL queries that cannot be translated to Druid queries will fail.|false|
|`druid.sql.planner.requireTimeCondition`|Whether to require SQL to have filter conditions on __time column so that all generated native queries will have user specified intervals. If true, all queries wihout filter condition on __time column will fail|false|
|`druid.sql.planner.sqlTimeZone`|Sets the default time zone for the server, which will affect how time functions and timestamp literals behave. Should be a time zone name like "America/Los_Angeles" or offset like "-08:00".|UTC|
|`druid.sql.planner.serializeComplexValues`|Whether to serialize "complex" output values, false will return the class name instead of the serialized value.|true|
#### Broker Caching

View File

@ -89,9 +89,9 @@ This string can then be used in the native or sql Druid query.
Note: `org.apache.hive.common.util.BloomKFilter` provides a serialize method which can be used to serialize bloom filters to outputStream.
### SQL Queries
### Filtering SQL Queries
Bloom filters are supported in SQL via the `bloom_filter_test` operator:
Bloom filters can be used in SQL `WHERE` clauses via the `bloom_filter_test` operator:
```sql
SELECT COUNT(*) FROM druid.foo WHERE bloom_filter_test(<expr>, '<serialized_bytes_for_BloomKFilter>')
@ -108,7 +108,11 @@ bloom_filter_test(<expr>, '<serialized_bytes_for_BloomKFilter>')
## Bloom Filter Query Aggregator
Input for a `bloomKFilter` can also be created from a druid query with the `bloom` aggregator.
Input for a `bloomKFilter` can also be created from a druid query with the `bloom` aggregator. Note that it is very
important to set a reasonable value for the `maxNumEntries` parameter, which is the maximum number of distinct entries
that the bloom filter can represent without increasing the false postive rate. It may be worth performing a query using
one of the unique count sketches to calculate the value for this parameter in order to build a bloom filter appropriate
for the query.
### JSON Specification of Bloom Filter Aggregator
@ -157,8 +161,19 @@ response
[{"timestamp":"2015-09-12T00:00:00.000Z","result":{"userBloom":"BAAAJhAAAA..."}}]
```
These values can then be set in the filter specification above.
These values can then be set in the filter specification described above.
Ordering results by a bloom filter aggregator, for example in a TopN query, will perform a comparatively expensive
linear scan _of the filter itself_ to count the number of set bits as a means of approximating how many items have been
added to the set. As such, ordering by an alternate aggregation is recommended if possible.
### SQL Bloom Filter Aggregator
Bloom filters can be computed in SQL expressions with the `bloom_filter` aggregator:
```sql
SELECT BLOOM_FILTER(<expression>, <max number of entries>) FROM druid.foo WHERE dim2 = 'abc'
```
but requires the setting `druid.sql.planner.serializeComplexValues` to be set to `true`. Bloom filter results in an SQL
response are serialized into a base64 string, which can then be used in subsequent queries as a filter.

View File

@ -120,6 +120,7 @@ Only the COUNT aggregation can accept DISTINCT.
|`AVG(expr)`|Averages numbers.|
|`APPROX_COUNT_DISTINCT(expr)`|Counts distinct values of expr, which can be a regular column or a hyperUnique column. This is always approximate, regardless of the value of "useApproximateCountDistinct". See also `COUNT(DISTINCT expr)`.|
|`APPROX_QUANTILE(expr, probability, [resolution])`|Computes approximate quantiles on numeric or approxHistogram exprs. The "probability" should be between 0 and 1 (exclusive). The "resolution" is the number of centroids to use for the computation. Higher resolutions will give more precise results but also have higher overhead. If not provided, the default resolution is 50. The [approximate histogram extension](../development/extensions-core/approximate-histograms.html) must be loaded to use this function.|
|`BLOOM_FILTER(expr, numEntries)`|Computes a bloom filter from values produced by `expr`, with `numEntries` maximum number of distinct values before false positve rate increases. See [bloom filter extension](../development/extensions-core/bloom-filter.html) documentation for additional details.|
### Numeric functions

View File

@ -22,6 +22,7 @@ package org.apache.druid.guice;
import com.fasterxml.jackson.databind.Module;
import com.google.inject.Binder;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.query.aggregation.bloom.sql.BloomFilterSqlAggregator;
import org.apache.druid.query.expressions.BloomFilterExprMacro;
import org.apache.druid.query.filter.sql.BloomFilterOperatorConversion;
import org.apache.druid.sql.guice.SqlBindings;
@ -42,7 +43,7 @@ public class BloomFilterExtensionModule implements DruidModule
public void configure(Binder binder)
{
SqlBindings.addOperatorConversion(binder, BloomFilterOperatorConversion.class);
SqlBindings.addAggregator(binder, BloomFilterSqlAggregator.class);
ExpressionModule.addExprMacro(binder, BloomFilterExprMacro.class);
}
}

View File

@ -0,0 +1,212 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.bloom.sql;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.bloom.BloomFilterAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.dimension.ExtractionDimensionSpec;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
public class BloomFilterSqlAggregator implements SqlAggregator
{
private static final SqlAggFunction FUNCTION_INSTANCE = new BloomFilterSqlAggFunction();
private static final String NAME = "BLOOM_FILTER";
@Override
public SqlAggFunction calciteFunction()
{
return FUNCTION_INSTANCE;
}
@Nullable
@Override
public Aggregation toDruidAggregation(
PlannerContext plannerContext,
RowSignature rowSignature,
RexBuilder rexBuilder,
String name,
AggregateCall aggregateCall,
Project project,
List<Aggregation> existingAggregations,
boolean finalizeAggregations
)
{
final RexNode inputOperand = Expressions.fromFieldAccess(
rowSignature,
project,
aggregateCall.getArgList().get(0)
);
final DruidExpression input = Expressions.toDruidExpression(
plannerContext,
rowSignature,
inputOperand
);
if (input == null) {
return null;
}
final AggregatorFactory aggregatorFactory;
final String aggName = StringUtils.format("%s:agg", name);
final RexNode maxNumEntriesOperand = Expressions.fromFieldAccess(
rowSignature,
project,
aggregateCall.getArgList().get(1)
);
if (!maxNumEntriesOperand.isA(SqlKind.LITERAL)) {
// maxNumEntriesOperand must be a literal in order to plan.
return null;
}
final int maxNumEntries = ((Number) RexLiteral.value(maxNumEntriesOperand)).intValue();
// Look for existing matching aggregatorFactory.
for (final Aggregation existing : existingAggregations) {
for (AggregatorFactory factory : existing.getAggregatorFactories()) {
if (factory instanceof BloomFilterAggregatorFactory) {
final BloomFilterAggregatorFactory theFactory = (BloomFilterAggregatorFactory) factory;
// Check input for equivalence.
final boolean inputMatches;
final VirtualColumn virtualInput =
existing.getVirtualColumns()
.stream()
.filter(virtualColumn ->
virtualColumn.getOutputName().equals(theFactory.getField().getOutputName())
)
.findFirst()
.orElse(null);
if (virtualInput == null) {
if (input.isDirectColumnAccess()) {
inputMatches =
input.getDirectColumn().equals(theFactory.getField().getDimension());
} else {
inputMatches =
input.getSimpleExtraction().getColumn().equals(theFactory.getField().getDimension()) &&
input.getSimpleExtraction().getExtractionFn().equals(theFactory.getField().getExtractionFn());
}
} else {
inputMatches = ((ExpressionVirtualColumn) virtualInput).getExpression().equals(input.getExpression());
}
final boolean matches = inputMatches && theFactory.getMaxNumEntries() == maxNumEntries;
if (matches) {
// Found existing one. Use this.
return Aggregation.create(
theFactory
);
}
}
}
}
// No existing match found. Create a new one.
final List<VirtualColumn> virtualColumns = new ArrayList<>();
ValueType valueType = Calcites.getValueTypeForSqlTypeName(inputOperand.getType().getSqlTypeName());
final DimensionSpec spec;
if (input.isDirectColumnAccess()) {
spec = new DefaultDimensionSpec(
input.getSimpleExtraction().getColumn(),
StringUtils.format("%s:%s", name, input.getSimpleExtraction().getColumn()),
valueType
);
} else if (input.isSimpleExtraction()) {
spec = new ExtractionDimensionSpec(
input.getSimpleExtraction().getColumn(),
StringUtils.format("%s:%s", name, input.getSimpleExtraction().getColumn()),
valueType,
input.getSimpleExtraction().getExtractionFn()
);
} else {
final ExpressionVirtualColumn virtualColumn = input.toVirtualColumn(
StringUtils.format("%s:v", aggName),
valueType,
plannerContext.getExprMacroTable()
);
virtualColumns.add(virtualColumn);
spec = new DefaultDimensionSpec(virtualColumn.getOutputName(), virtualColumn.getOutputName());
}
aggregatorFactory = new BloomFilterAggregatorFactory(
aggName,
spec,
maxNumEntries
);
return Aggregation.create(
virtualColumns,
aggregatorFactory
);
}
private static class BloomFilterSqlAggFunction extends SqlAggFunction
{
private static final String SIGNATURE1 = "'" + NAME + "(column, maxNumEntries)'\n";
BloomFilterSqlAggFunction()
{
super(
NAME,
null,
SqlKind.OTHER_FUNCTION,
ReturnTypes.explicit(SqlTypeName.OTHER),
null,
OperandTypes.and(
OperandTypes.sequence(SIGNATURE1, OperandTypes.ANY, OperandTypes.LITERAL),
OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC)
),
SqlFunctionCategory.USER_DEFINED_FUNCTION,
false,
false
);
}
}
}

View File

@ -33,7 +33,7 @@ import java.util.Arrays;
/**
* This is a direct modification of the Apache Hive 'BloomKFilter', found at:
* https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomKFilter.java
* https://github.com/apache/hive/blob/rel/storage-release-2.7.0/storage-api/src/java/org/apache/hive/common/util/BloomKFilter.java
* modified to store variables which are re-used instead of re-allocated per call as {@link ThreadLocal} so multiple
* threads can share the same filter object. Note that this is snapshot at hive-storag-api version 2.7.0, latest
* versions break compatibility with how int/float are stored in a bloom filter in this commit:

View File

@ -0,0 +1,642 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.bloom.sql;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.guice.BloomFilterSerializersModule;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.bloom.BloomFilterAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.ExtractionDimensionSpec;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.extraction.SubstringDimExtractionFn;
import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.query.lookup.LookupReferencesManager;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.sql.SqlLifecycle;
import org.apache.druid.sql.SqlLifecycleFactory;
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.schema.DruidSchema;
import org.apache.druid.sql.calcite.schema.SystemSchema;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.QueryLogHook;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
public class BloomFilterSqlAggregatorTest
{
private static final int TEST_NUM_ENTRIES = 1000;
private static AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT;
private static final Injector injector = Guice.createInjector(
binder -> {
binder.bind(Key.get(ObjectMapper.class, Json.class)).toInstance(TestHelper.makeJsonMapper());
binder.bind(LookupReferencesManager.class).toInstance(
LookupEnabledTestExprMacroTable.createTestLookupReferencesManager(
ImmutableMap.of(
"a", "xa",
"abc", "xabc"
)
)
);
}
);
private static ObjectMapper jsonMapper =
injector
.getInstance(Key.get(ObjectMapper.class, Json.class))
.registerModules(Collections.singletonList(new BloomFilterSerializersModule()));
private static final String DATA_SOURCE = "numfoo";
private static QueryRunnerFactoryConglomerate conglomerate;
private static Closer resourceCloser;
@BeforeClass
public static void setUpClass()
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
conglomerate = conglomerateCloserPair.lhs;
resourceCloser = conglomerateCloserPair.rhs;
}
@AfterClass
public static void tearDownClass() throws IOException
{
resourceCloser.close();
}
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public QueryLogHook queryLogHook = QueryLogHook.create(jsonMapper);
private SpecificSegmentsQuerySegmentWalker walker;
private SqlLifecycleFactory sqlLifecycleFactory;
@Before
public void setUp() throws Exception
{
InputRowParser parser = new MapInputRowParser(
new TimeAndDimsParseSpec(
new TimestampSpec("t", "iso", null),
new DimensionsSpec(
ImmutableList.<DimensionSchema>builder()
.addAll(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "dim3")))
.add(new DoubleDimensionSchema("d1"))
.add(new FloatDimensionSchema("f1"))
.add(new LongDimensionSchema("l1"))
.build(),
null,
null
)
));
final QueryableIndex index =
IndexBuilder.create()
.tmpDir(temporaryFolder.newFolder())
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
.schema(
new IncrementalIndexSchema.Builder()
.withMetrics(
new CountAggregatorFactory("cnt"),
new DoubleSumAggregatorFactory("m1", "m1")
)
.withDimensionsSpec(parser)
.withRollup(false)
.build()
)
.rows(CalciteTests.ROWS1_WITH_NUMERIC_DIMS)
.buildMMappedIndex();
walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(index.getDataInterval())
.version("1")
.shardSpec(new LinearShardSpec(0))
.build(),
index
);
final PlannerConfig plannerConfig = new PlannerConfig();
final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker);
final DruidOperatorTable operatorTable = new DruidOperatorTable(
ImmutableSet.of(new BloomFilterSqlAggregator()),
ImmutableSet.of()
);
sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
new PlannerFactory(
druidSchema,
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
CalciteTests.createExprMacroTable(),
plannerConfig,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
jsonMapper
)
);
}
@After
public void tearDown() throws Exception
{
walker.close();
walker = null;
}
@Test
public void testBloomFilterAgg() throws Exception
{
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT\n"
+ "BLOOM_FILTER(dim1, 1000)\n"
+ "FROM numfoo";
final List<Object[]> results =
sqlLifecycle.runSimple(sql, BaseCalciteQueryTest.QUERY_CONTEXT_DEFAULT, authenticationResult).toList();
BloomKFilter expected1 = new BloomKFilter(TEST_NUM_ENTRIES);
for (InputRow row : CalciteTests.ROWS1_WITH_NUMERIC_DIMS) {
String raw = NullHandling.emptyToNullIfNeeded((String) row.getRaw("dim1"));
if (raw == null) {
expected1.addBytes(null, 0, 0);
} else {
expected1.addString(raw);
}
}
final List<Object[]> expectedResults = ImmutableList.of(
new Object[]{
jsonMapper.writeValueAsString(expected1)
}
);
Assert.assertEquals(expectedResults.size(), results.size());
for (int i = 0; i < expectedResults.size(); i++) {
Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
}
Assert.assertEquals(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE3)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL)
.aggregators(
ImmutableList.of(
new BloomFilterAggregatorFactory(
"a0:agg",
new DefaultDimensionSpec("dim1", "a0:dim1"),
TEST_NUM_ENTRIES
)
)
)
.context(BaseCalciteQueryTest.TIMESERIES_CONTEXT_DEFAULT)
.build(),
Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
);
}
@Test
public void testBloomFilterTwoAggs() throws Exception
{
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT\n"
+ "BLOOM_FILTER(dim1, 1000),\n"
+ "BLOOM_FILTER(dim2, 1000)\n"
+ "FROM numfoo";
final List<Object[]> results =
sqlLifecycle.runSimple(sql, BaseCalciteQueryTest.QUERY_CONTEXT_DEFAULT, authenticationResult).toList();
BloomKFilter expected1 = new BloomKFilter(TEST_NUM_ENTRIES);
BloomKFilter expected2 = new BloomKFilter(TEST_NUM_ENTRIES);
for (InputRow row : CalciteTests.ROWS1_WITH_NUMERIC_DIMS) {
String raw = NullHandling.emptyToNullIfNeeded((String) row.getRaw("dim1"));
if (raw == null) {
expected1.addBytes(null, 0, 0);
} else {
expected1.addString(raw);
}
List<String> lst = row.getDimension("dim2");
if (lst.size() == 0) {
expected2.addBytes(null, 0, 0);
}
for (String s : lst) {
String val = NullHandling.emptyToNullIfNeeded(s);
if (val == null) {
expected2.addBytes(null, 0, 0);
} else {
expected2.addString(val);
}
}
}
final List<Object[]> expectedResults = ImmutableList.of(
new Object[]{
jsonMapper.writeValueAsString(expected1),
jsonMapper.writeValueAsString(expected2)
}
);
Assert.assertEquals(expectedResults.size(), results.size());
for (int i = 0; i < expectedResults.size(); i++) {
Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
}
Assert.assertEquals(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE3)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL)
.aggregators(
ImmutableList.of(
new BloomFilterAggregatorFactory(
"a0:agg",
new DefaultDimensionSpec("dim1", "a0:dim1"),
TEST_NUM_ENTRIES
),
new BloomFilterAggregatorFactory(
"a1:agg",
new DefaultDimensionSpec("dim2", "a1:dim2"),
TEST_NUM_ENTRIES
)
)
)
.context(BaseCalciteQueryTest.TIMESERIES_CONTEXT_DEFAULT)
.build(),
Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
);
}
@Test
public void testBloomFilterAggExtractionFn() throws Exception
{
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT\n"
+ "BLOOM_FILTER(SUBSTRING(dim1, 1, 1), 1000)\n"
+ "FROM numfoo";
final List<Object[]> results =
sqlLifecycle.runSimple(sql, BaseCalciteQueryTest.QUERY_CONTEXT_DEFAULT, authenticationResult).toList();
BloomKFilter expected1 = new BloomKFilter(TEST_NUM_ENTRIES);
for (InputRow row : CalciteTests.ROWS1_WITH_NUMERIC_DIMS) {
String raw = NullHandling.emptyToNullIfNeeded((String) row.getRaw("dim1"));
// empty string extractionFn produces null
if (raw == null || "".equals(raw)) {
expected1.addBytes(null, 0, 0);
} else {
expected1.addString(raw.substring(0, 1));
}
}
final List<Object[]> expectedResults = ImmutableList.of(
new Object[]{
jsonMapper.writeValueAsString(expected1)
}
);
Assert.assertEquals(expectedResults.size(), results.size());
for (int i = 0; i < expectedResults.size(); i++) {
Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
}
Assert.assertEquals(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE3)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL)
.aggregators(
ImmutableList.of(
new BloomFilterAggregatorFactory(
"a0:agg",
new ExtractionDimensionSpec(
"dim1",
"a0:dim1",
new SubstringDimExtractionFn(0, 1)
),
TEST_NUM_ENTRIES
)
)
)
.context(BaseCalciteQueryTest.TIMESERIES_CONTEXT_DEFAULT)
.build(),
Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
);
}
@Test
public void testBloomFilterAggLong() throws Exception
{
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT\n"
+ "BLOOM_FILTER(l1, 1000)\n"
+ "FROM numfoo";
final List<Object[]> results =
sqlLifecycle.runSimple(sql, BaseCalciteQueryTest.QUERY_CONTEXT_DEFAULT, authenticationResult).toList();
BloomKFilter expected3 = new BloomKFilter(TEST_NUM_ENTRIES);
for (InputRow row : CalciteTests.ROWS1_WITH_NUMERIC_DIMS) {
Object raw = row.getRaw("l1");
if (raw == null) {
if (NullHandling.replaceWithDefault()) {
expected3.addLong(NullHandling.defaultLongValue());
} else {
expected3.addBytes(null, 0, 0);
}
} else {
expected3.addLong(((Number) raw).longValue());
}
}
final List<Object[]> expectedResults = ImmutableList.of(
new Object[]{
jsonMapper.writeValueAsString(expected3)
}
);
Assert.assertEquals(expectedResults.size(), results.size());
for (int i = 0; i < expectedResults.size(); i++) {
Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
}
Assert.assertEquals(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE3)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL)
.aggregators(
ImmutableList.of(
new BloomFilterAggregatorFactory(
"a0:agg",
new DefaultDimensionSpec("l1", "a0:l1", ValueType.LONG),
TEST_NUM_ENTRIES
)
)
)
.context(BaseCalciteQueryTest.TIMESERIES_CONTEXT_DEFAULT)
.build(),
Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
);
}
@Test
public void testBloomFilterAggLongVirtualColumn() throws Exception
{
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT\n"
+ "BLOOM_FILTER(l1 * 2, 1000)\n"
+ "FROM numfoo";
final List<Object[]> results =
sqlLifecycle.runSimple(sql, BaseCalciteQueryTest.QUERY_CONTEXT_DEFAULT, authenticationResult).toList();
BloomKFilter expected1 = new BloomKFilter(TEST_NUM_ENTRIES);
for (InputRow row : CalciteTests.ROWS1_WITH_NUMERIC_DIMS) {
Object raw = row.getRaw("l1");
if (raw == null) {
if (NullHandling.replaceWithDefault()) {
expected1.addLong(NullHandling.defaultLongValue());
} else {
expected1.addBytes(null, 0, 0);
}
} else {
expected1.addLong(2 * ((Number) raw).longValue());
}
}
final List<Object[]> expectedResults = ImmutableList.of(
new Object[]{
jsonMapper.writeValueAsString(expected1)
}
);
Assert.assertEquals(expectedResults.size(), results.size());
for (int i = 0; i < expectedResults.size(); i++) {
Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
}
Assert.assertEquals(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE3)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL)
.virtualColumns(
new ExpressionVirtualColumn(
"a0:agg:v",
"(\"l1\" * 2)",
ValueType.LONG,
TestExprMacroTable.INSTANCE
)
)
.aggregators(
ImmutableList.of(
new BloomFilterAggregatorFactory(
"a0:agg",
new DefaultDimensionSpec("a0:agg:v", "a0:agg:v"),
TEST_NUM_ENTRIES
)
)
)
.context(BaseCalciteQueryTest.TIMESERIES_CONTEXT_DEFAULT)
.build(),
Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
);
}
@Test
public void testBloomFilterAggFloatVirtualColumn() throws Exception
{
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT\n"
+ "BLOOM_FILTER(f1 * 2, 1000)\n"
+ "FROM numfoo";
final List<Object[]> results =
sqlLifecycle.runSimple(sql, BaseCalciteQueryTest.QUERY_CONTEXT_DEFAULT, authenticationResult).toList();
BloomKFilter expected1 = new BloomKFilter(TEST_NUM_ENTRIES);
for (InputRow row : CalciteTests.ROWS1_WITH_NUMERIC_DIMS) {
Object raw = row.getRaw("f1");
if (raw == null) {
if (NullHandling.replaceWithDefault()) {
expected1.addFloat(NullHandling.defaultFloatValue());
} else {
expected1.addBytes(null, 0, 0);
}
} else {
expected1.addFloat(2 * ((Number) raw).floatValue());
}
}
final List<Object[]> expectedResults = ImmutableList.of(
new Object[]{
jsonMapper.writeValueAsString(expected1)
}
);
Assert.assertEquals(expectedResults.size(), results.size());
for (int i = 0; i < expectedResults.size(); i++) {
Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
}
// Verify query
Assert.assertEquals(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE3)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL)
.virtualColumns(
new ExpressionVirtualColumn(
"a0:agg:v",
"(\"f1\" * 2)",
ValueType.FLOAT,
TestExprMacroTable.INSTANCE
)
)
.aggregators(
ImmutableList.of(
new BloomFilterAggregatorFactory(
"a0:agg",
new DefaultDimensionSpec("a0:agg:v", "a0:agg:v"),
TEST_NUM_ENTRIES
)
)
)
.context(BaseCalciteQueryTest.TIMESERIES_CONTEXT_DEFAULT)
.build(),
Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
);
}
@Test
public void testBloomFilterAggDoubleVirtualColumn() throws Exception
{
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT\n"
+ "BLOOM_FILTER(d1 * 2, 1000)\n"
+ "FROM numfoo";
final List<Object[]> results =
sqlLifecycle.runSimple(sql, BaseCalciteQueryTest.QUERY_CONTEXT_DEFAULT, authenticationResult).toList();
BloomKFilter expected1 = new BloomKFilter(TEST_NUM_ENTRIES);
for (InputRow row : CalciteTests.ROWS1_WITH_NUMERIC_DIMS) {
Object raw = row.getRaw("d1");
if (raw == null) {
if (NullHandling.replaceWithDefault()) {
expected1.addDouble(NullHandling.defaultDoubleValue());
} else {
expected1.addBytes(null, 0, 0);
}
} else {
expected1.addDouble(2 * ((Number) raw).doubleValue());
}
}
final List<Object[]> expectedResults = ImmutableList.of(
new Object[]{
jsonMapper.writeValueAsString(expected1)
}
);
Assert.assertEquals(expectedResults.size(), results.size());
for (int i = 0; i < expectedResults.size(); i++) {
Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
}
// Verify query
Assert.assertEquals(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE3)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL)
.virtualColumns(
new ExpressionVirtualColumn(
"a0:agg:v",
"(\"d1\" * 2)",
ValueType.DOUBLE,
TestExprMacroTable.INSTANCE
)
)
.aggregators(
ImmutableList.of(
new BloomFilterAggregatorFactory(
"a0:agg",
new DefaultDimensionSpec("a0:agg:v", "a0:agg:v"),
TEST_NUM_ENTRIES
)
)
)
.context(BaseCalciteQueryTest.TIMESERIES_CONTEXT_DEFAULT)
.build(),
Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
);
}
}

View File

@ -66,6 +66,9 @@ public class PlannerConfig
@JsonProperty
private DateTimeZone sqlTimeZone = DateTimeZone.UTC;
@JsonProperty
private boolean serializeComplexValues = true;
public Period getMetadataRefreshPeriod()
{
return metadataRefreshPeriod;
@ -121,6 +124,11 @@ public class PlannerConfig
return awaitInitializationOnStart;
}
public boolean shouldSerializeComplexValues()
{
return serializeComplexValues;
}
public PlannerConfig withOverrides(final Map<String, Object> context)
{
if (context == null) {
@ -151,6 +159,7 @@ public class PlannerConfig
newConfig.requireTimeCondition = isRequireTimeCondition();
newConfig.sqlTimeZone = getSqlTimeZone();
newConfig.awaitInitializationOnStart = isAwaitInitializationOnStart();
newConfig.serializeComplexValues = shouldSerializeComplexValues();
return newConfig;
}
@ -191,6 +200,7 @@ public class PlannerConfig
useFallback == that.useFallback &&
requireTimeCondition == that.requireTimeCondition &&
awaitInitializationOnStart == that.awaitInitializationOnStart &&
serializeComplexValues == that.serializeComplexValues &&
Objects.equals(metadataRefreshPeriod, that.metadataRefreshPeriod) &&
Objects.equals(sqlTimeZone, that.sqlTimeZone);
}
@ -210,7 +220,8 @@ public class PlannerConfig
useFallback,
requireTimeCondition,
awaitInitializationOnStart,
sqlTimeZone
sqlTimeZone,
serializeComplexValues
);
}
@ -229,6 +240,7 @@ public class PlannerConfig
", requireTimeCondition=" + requireTimeCondition +
", awaitInitializationOnStart=" + awaitInitializationOnStart +
", sqlTimeZone=" + sqlTimeZone +
", serializeComplexValues=" + serializeComplexValues +
'}';
}
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.sql.calcite.rel;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
@ -493,8 +494,17 @@ public class QueryMaker
throw new ISE("Cannot coerce[%s] to %s", value.getClass().getName(), sqlType);
}
} else if (sqlType == SqlTypeName.OTHER) {
// Complex type got out somehow.
// Complex type, try to serialize if we should, else print class name
if (plannerContext.getPlannerConfig().shouldSerializeComplexValues()) {
try {
coercedValue = jsonMapper.writeValueAsString(value);
}
catch (JsonProcessingException jex) {
throw new ISE(jex, "Cannot coerce[%s] to %s", value.getClass().getName(), sqlType);
}
} else {
coercedValue = value.getClass().getName();
}
} else {
throw new ISE("Cannot coerce[%s] to %s", value.getClass().getName(), sqlType);
}

View File

@ -101,6 +101,14 @@ public class BaseCalciteQueryTest extends CalciteTestBase
public static final Logger log = new Logger(BaseCalciteQueryTest.class);
public static final PlannerConfig PLANNER_CONFIG_DEFAULT = new PlannerConfig();
public static final PlannerConfig PLANNER_CONFIG_DEFAULT_NO_COMPLEX_SERDE = new PlannerConfig()
{
@Override
public boolean shouldSerializeComplexValues()
{
return false;
}
};
public static final PlannerConfig PLANNER_CONFIG_REQUIRE_TIME_CONDITION = new PlannerConfig()
{
@Override

View File

@ -422,7 +422,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
{
String hyperLogLogCollectorClassName = HLLC_STRING;
testQuery(
PLANNER_CONFIG_DEFAULT_NO_COMPLEX_SERDE,
QUERY_CONTEXT_DEFAULT,
"SELECT * FROM druid.foo",
CalciteTests.REGULAR_USER_AUTH_RESULT,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
@ -474,7 +477,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
"abcd",
9999.0f,
NullHandling.defaultDoubleValue(),
HLLC_STRING
"\"AQAAAQAAAALFBA==\""
}
)
);
@ -518,7 +521,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
public void testSelectStarWithLimit() throws Exception
{
testQuery(
PLANNER_CONFIG_DEFAULT_NO_COMPLEX_SERDE,
QUERY_CONTEXT_DEFAULT,
"SELECT * FROM druid.foo LIMIT 2",
CalciteTests.REGULAR_USER_AUTH_RESULT,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
@ -565,7 +571,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
public void testSelectStarWithLimitTimeDescending() throws Exception
{
testQuery(
PLANNER_CONFIG_DEFAULT_NO_COMPLEX_SERDE,
QUERY_CONTEXT_DEFAULT,
"SELECT * FROM druid.foo ORDER BY __time DESC LIMIT 2",
CalciteTests.REGULAR_USER_AUTH_RESULT,
ImmutableList.of(
Druids.newSelectQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
@ -589,7 +598,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
public void testSelectStarWithoutLimitTimeAscending() throws Exception
{
testQuery(
PLANNER_CONFIG_DEFAULT_NO_COMPLEX_SERDE,
QUERY_CONTEXT_DEFAULT,
"SELECT * FROM druid.foo ORDER BY __time",
CalciteTests.REGULAR_USER_AUTH_RESULT,
ImmutableList.of(
Druids.newSelectQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
@ -1852,7 +1864,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
public void testSelectStarWithDimFilter() throws Exception
{
testQuery(
PLANNER_CONFIG_DEFAULT_NO_COMPLEX_SERDE,
QUERY_CONTEXT_DEFAULT,
"SELECT * FROM druid.foo WHERE dim1 > 'd' OR dim2 = 'a'",
CalciteTests.REGULAR_USER_AUTH_RESULT,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)

View File

@ -112,7 +112,14 @@ public class SqlResourceTest extends CalciteTestBase
{
walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder());
final PlannerConfig plannerConfig = new PlannerConfig();
final PlannerConfig plannerConfig = new PlannerConfig()
{
@Override
public boolean shouldSerializeComplexValues()
{
return false;
}
};
final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker);
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();