add bloom filter druid expression (#6904)

* add "bloom_filter_test" druid expression to support bloom filters in ExpressionVirtualColumn and ExpressionDimFilter and sql expressions

* more docs

* use java.util.Base64, doc fixes
This commit is contained in:
Clint Wylie 2019-01-28 05:41:45 -08:00 committed by Gian Merlino
parent 2b73644340
commit af3cbc3687
7 changed files with 239 additions and 16 deletions

View File

@ -58,7 +58,9 @@ Internally, this implementation of bloom filter uses Murmur3 fast non-cryptograp
### Serialized Format for BloomKFilter
Serialized BloomKFilter format:
- 1 byte for the number of hash functions.
- 1 big endian int(That is how OutputStream works) for the number of longs in the bitset
- big endian longs in the BloomKFilter bitset
@ -66,10 +68,19 @@ Internally, this implementation of bloom filter uses Murmur3 fast non-cryptograp
Note: `org.apache.hive.common.util.BloomKFilter` provides a serialize method which can be used to serialize bloom filters to outputStream.
### SQL Queries
Bloom filters are supported in SQL via the `bloom_filter_test` operator:
```sql
SELECT COUNT(*) FROM druid.foo WHERE bloom_filter_test(<dimension>, '<serialized_bytes_for_BloomKFilter>')
SELECT COUNT(*) FROM druid.foo WHERE bloom_filter_test(<expr>, '<serialized_bytes_for_BloomKFilter>')
```
Expression virtual columns are not currently supported for the `dimension` parameter.
### Expression and Virtual Column Support
The bloom filter extension also adds a bloom filter [Druid expression](../../misc/math-expr.html) which shares syntax
with the SQL operator.
```sql
bloom_filter_test(<expr>, '<serialized_bytes_for_BloomKFilter>')
```

View File

@ -60,6 +60,7 @@ The following built-in functions are available.
|like|like(expr, pattern[, escape]) is equivalent to SQL `expr LIKE pattern`|
|case_searched|case_searched(expr1, result1, \[\[expr2, result2, ...\], else-result\])|
|case_simple|case_simple(expr, value1, result1, \[\[value2, result2, ...\], else-result\])|
|bloom_filter_test|bloom_filter_test(expr, filter) tests the value of 'expr' against 'filter', a bloom filter serialized as a base64 string. See [bloom filter extension](../development/extensions-core/bloom-filter.html) documentation for additional details.|
## String functions

View File

@ -22,6 +22,7 @@ package org.apache.druid.guice;
import com.fasterxml.jackson.databind.Module;
import com.google.inject.Binder;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.query.expressions.BloomFilterExprMacro;
import org.apache.druid.query.filter.sql.BloomFilterOperatorConversion;
import org.apache.druid.sql.guice.SqlBindings;
@ -41,5 +42,7 @@ public class BloomFilterExtensionModule implements DruidModule
public void configure(Binder binder)
{
SqlBindings.addOperatorConversion(binder, BloomFilterOperatorConversion.class);
ExpressionModule.addExprMacro(binder, BloomFilterExprMacro.class);
}
}

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.expressions;
import org.apache.druid.guice.BloomFilterSerializersModule;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.math.expr.Expr;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.math.expr.ExprType;
import org.apache.druid.query.filter.BloomKFilter;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.List;
public class BloomFilterExprMacro implements ExprMacroTable.ExprMacro
{
public static String FN_NAME = "bloom_filter_test";
@Override
public String name()
{
return FN_NAME;
}
@Override
public Expr apply(List<Expr> args)
{
if (args.size() != 2) {
throw new IAE("Function[%s] must have 2 arguments", name());
}
final Expr arg = args.get(0);
final Expr filterExpr = args.get(1);
if (!filterExpr.isLiteral() || filterExpr.getLiteralValue() == null) {
throw new IAE("Function[%s] second argument must be a base64 serialized bloom filter", name());
}
final String serializedFilter = filterExpr.getLiteralValue().toString();
final byte[] decoded = StringUtils.decodeBase64String(serializedFilter);
BloomKFilter filter;
try {
filter = BloomFilterSerializersModule.bloomKFilterFromBytes(decoded);
}
catch (IOException ioe) {
throw new RuntimeException("Failed to deserialize bloom filter", ioe);
}
class BloomExpr implements Expr
{
@Nonnull
@Override
public ExprEval eval(final ObjectBinding bindings)
{
ExprEval evaluated = arg.eval(bindings);
boolean matches = false;
switch (evaluated.type()) {
case STRING:
String stringVal = (String) evaluated.value();
if (stringVal == null) {
matches = nullMatch();
} else {
matches = filter.testString(stringVal);
}
break;
case DOUBLE:
Double doubleVal = (Double) evaluated.value();
if (doubleVal == null) {
matches = nullMatch();
} else {
matches = filter.testDouble(doubleVal);
}
break;
case LONG:
Long longVal = (Long) evaluated.value();
if (longVal == null) {
matches = nullMatch();
} else {
matches = filter.testLong(longVal);
}
break;
}
return ExprEval.of(matches, ExprType.LONG);
}
private boolean nullMatch()
{
return filter.testBytes(null, 0, 0);
}
@Override
public void visit(final Visitor visitor)
{
arg.visit(visitor);
visitor.visit(this);
}
}
return new BloomExpr();
}
}

View File

@ -28,14 +28,15 @@ import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.druid.guice.BloomFilterSerializersModule;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.expressions.BloomFilterExprMacro;
import org.apache.druid.query.filter.BloomDimFilter;
import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.query.filter.BloomKFilterHolder;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.RowSignature;
@ -43,14 +44,19 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
public class BloomFilterOperatorConversion implements SqlOperatorConversion
public class BloomFilterOperatorConversion extends DirectOperatorConversion
{
private static final SqlFunction SQL_FUNCTION = OperatorConversions
.operatorBuilder("BLOOM_FILTER_TEST")
.operandTypes(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)
.operatorBuilder(StringUtils.toUpperCase(BloomFilterExprMacro.FN_NAME))
.operandTypes(SqlTypeFamily.ANY, SqlTypeFamily.CHARACTER)
.returnTypeInference(ReturnTypes.BOOLEAN_NULLABLE)
.build();
public BloomFilterOperatorConversion()
{
super(SQL_FUNCTION, BloomFilterExprMacro.FN_NAME);
}
@Override
public SqlOperator calciteOperator()
{

View File

@ -26,17 +26,23 @@ import com.google.common.collect.ImmutableSet;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.guice.BloomFilterExtensionModule;
import org.apache.druid.guice.BloomFilterSerializersModule;
import org.apache.druid.guice.ExpressionModule;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.Druids;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.query.expression.LookupExprMacro;
import org.apache.druid.query.expressions.BloomFilterExprMacro;
import org.apache.druid.query.filter.BloomDimFilter;
import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.query.filter.BloomKFilterHolder;
import org.apache.druid.query.filter.ExpressionDimFilter;
import org.apache.druid.query.filter.OrDimFilter;
import org.apache.druid.query.lookup.LookupReferencesManager;
import org.apache.druid.segment.TestHelper;
@ -50,6 +56,7 @@ import org.apache.druid.sql.calcite.util.QueryLogHook;
import org.junit.Rule;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -76,6 +83,17 @@ public class BloomDimFilterSqlTest extends BaseCalciteQueryTest
.getInstance(Key.get(ObjectMapper.class, Json.class))
.registerModules(Collections.singletonList(new BloomFilterSerializersModule()));
public static ExprMacroTable createExprMacroTable()
{
final List<ExprMacroTable.ExprMacro> exprMacros = new ArrayList<>();
for (Class<? extends ExprMacroTable.ExprMacro> clazz : ExpressionModule.EXPR_MACROS) {
exprMacros.add(injector.getInstance(clazz));
}
exprMacros.add(injector.getInstance(BloomFilterExprMacro.class));
exprMacros.add(injector.getInstance(LookupExprMacro.class));
return new ExprMacroTable(exprMacros);
}
@Rule
@Override
public QueryLogHook getQueryLogHook()
@ -111,6 +129,74 @@ public class BloomDimFilterSqlTest extends BaseCalciteQueryTest
);
}
@Test
public void testBloomFilterVirtualColumn() throws Exception
{
BloomKFilter filter = new BloomKFilter(1500);
filter.addString("a-foo");
filter.addString("-foo");
if (!NullHandling.replaceWithDefault()) {
filter.addBytes(null, 0, 0);
}
byte[] bytes = BloomFilterSerializersModule.bloomKFilterToBytes(filter);
String base64 = StringUtils.encodeBase64String(bytes);
testQuery(
StringUtils.format("SELECT COUNT(*) FROM druid.foo WHERE bloom_filter_test(concat(dim2, '-foo'), '%s')", base64),
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(QSS(Filtration.eternity()))
.granularity(Granularities.ALL)
.virtualColumns()
.filters(
new ExpressionDimFilter(
StringUtils.format("bloom_filter_test(concat(\"dim2\",'-foo'),'%s')", base64),
createExprMacroTable()
)
)
.aggregators(AGGS(new CountAggregatorFactory("a0")))
.context(TIMESERIES_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{5L}
)
);
}
@Test
public void testBloomFilterVirtualColumnNumber() throws Exception
{
BloomKFilter filter = new BloomKFilter(1500);
filter.addDouble(20.2);
byte[] bytes = BloomFilterSerializersModule.bloomKFilterToBytes(filter);
String base64 = StringUtils.encodeBase64String(bytes);
testQuery(
StringUtils.format("SELECT COUNT(*) FROM druid.foo WHERE bloom_filter_test(2 * CAST(dim1 AS float), '%s')", base64),
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(QSS(Filtration.eternity()))
.granularity(Granularities.ALL)
.virtualColumns()
.filters(
new ExpressionDimFilter(
StringUtils.format("bloom_filter_test((2 * CAST(\"dim1\", 'DOUBLE')),'%s')", base64),
createExprMacroTable()
)
)
.aggregators(AGGS(new CountAggregatorFactory("a0")))
.context(TIMESERIES_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{1L}
)
);
}
@Test
public void testBloomFilters() throws Exception
{
@ -123,7 +209,6 @@ public class BloomDimFilterSqlTest extends BaseCalciteQueryTest
String base64 = StringUtils.encodeBase64String(bytes);
String base642 = StringUtils.encodeBase64String(bytes2);
testQuery(
StringUtils.format("SELECT COUNT(*) FROM druid.foo WHERE bloom_filter_test(dim1, '%s') OR bloom_filter_test(dim2, '%s')", base64, base642),
ImmutableList.of(
@ -165,7 +250,7 @@ public class BloomDimFilterSqlTest extends BaseCalciteQueryTest
sql,
authenticationResult,
operatorTable,
CalciteTests.createExprMacroTable(),
createExprMacroTable(),
CalciteTests.TEST_AUTHORIZER_MAPPER,
jsonMapper
);

View File

@ -48,14 +48,7 @@ public interface SqlOperatorConversion
* @see Expressions#toDruidExpression(PlannerContext, RowSignature, RexNode)
*/
@Nullable
default DruidExpression toDruidExpression(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode
)
{
return null;
}
DruidExpression toDruidExpression(PlannerContext plannerContext, RowSignature rowSignature, RexNode rexNode);
/**
* Returns a Druid Aggregation corresponding to a SQL {@link SqlOperator} used to filter rows