bloom filter sql (#6502)

* bloom filter sql support

* docs

* style fix

* style fixes after rebase

* use copied/patched bloomkfilter

* remove context literal lookup function, changes from review

* fix build

* rename LookupOperatorConversion to QueryLookupOperatorConversion

* remove doc

* revert unintended change

* add internal exception to bloom filter deserialization exception
This commit is contained in:
Clint Wylie 2018-11-26 22:11:18 -08:00 committed by Benedict Jin
parent 887c645675
commit efdec50847
16 changed files with 1157 additions and 676 deletions

View File

@ -62,3 +62,12 @@ Internally, this implementation of bloom filter uses Murmur3 fast non-cryptograp
- big endian longs in the BloomKFilter bitset
Note: `org.apache.hive.common.util.BloomKFilter` provides a serialize method which can be used to serialize bloom filters to outputStream.
### SQL Queries
Bloom filters are supported in SQL via the `bloom_filter_test` operator:
```sql
SELECT COUNT(*) FROM druid.foo WHERE bloom_filter_test(<dimension>, '<serialized_bytes_for_BloomKFilter>')
```
Expression virtual columns are not currently supported for the `dimension` parameter.

View File

@ -41,6 +41,12 @@
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-sql</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-storage-api</artifactId>
@ -55,11 +61,37 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-core</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-sql</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -22,6 +22,8 @@ package org.apache.druid.guice;
import com.fasterxml.jackson.databind.Module;
import com.google.inject.Binder;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.query.filter.sql.BloomFilterOperatorConversion;
import org.apache.druid.sql.guice.SqlBindings;
import java.util.Collections;
import java.util.List;
@ -38,6 +40,6 @@ public class BloomFilterExtensionModule implements DruidModule
@Override
public void configure(Binder binder)
{
SqlBindings.addOperatorConversion(binder, BloomFilterOperatorConversion.class);
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.filter.sql;
import com.google.common.io.BaseEncoding;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.druid.guice.BloomFilterSerializersModule;
import org.apache.druid.query.filter.BloomDimFilter;
import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.query.filter.BloomKFilterHolder;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
public class BloomFilterOperatorConversion implements SqlOperatorConversion
{
private static final SqlFunction SQL_FUNCTION = OperatorConversions
.operatorBuilder("BLOOM_FILTER_TEST")
.operandTypes(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)
.returnTypeInference(ReturnTypes.BOOLEAN_NULLABLE)
.build();
@Override
public SqlOperator calciteOperator()
{
return SQL_FUNCTION;
}
@Nullable
@Override
public DimFilter toDruidFilter(
final PlannerContext plannerContext,
final RowSignature rowSignature,
final RexNode rexNode
)
{
final List<RexNode> operands = ((RexCall) rexNode).getOperands();
final DruidExpression druidExpression = Expressions.toDruidExpression(
plannerContext,
rowSignature,
operands.get(0)
);
if (druidExpression == null || !druidExpression.isSimpleExtraction()) {
return null;
}
String base64EncodedBloomKFilter = RexLiteral.stringValue(operands.get(1));
final byte[] decoded = BaseEncoding.base64().decode(base64EncodedBloomKFilter);
BloomKFilter filter;
BloomKFilterHolder holder;
try {
filter = BloomFilterSerializersModule.bloomKFilterFromBytes(decoded);
holder = BloomKFilterHolder.fromBloomKFilter(filter);
}
catch (IOException ioe) {
throw new RuntimeException("Failed to deserialize bloom filter", ioe);
}
if (druidExpression.isSimpleExtraction()) {
return new BloomDimFilter(
druidExpression.getSimpleExtraction().getColumn(),
holder,
druidExpression.getSimpleExtraction().getExtractionFn()
);
} else {
// expression virtual columns not currently supported
return null;
}
}
}

View File

@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.filter.sql;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import org.apache.commons.codec.binary.Base64;
import org.apache.druid.guice.BloomFilterExtensionModule;
import org.apache.druid.guice.BloomFilterSerializersModule;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.Druids;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.query.filter.BloomDimFilter;
import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.query.filter.BloomKFilterHolder;
import org.apache.druid.query.filter.OrDimFilter;
import org.apache.druid.query.lookup.LookupReferencesManager;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.QueryLogHook;
import org.junit.Rule;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class BloomDimFilterSqlTest extends BaseCalciteQueryTest
{
private static final Injector injector = Guice.createInjector(
binder -> {
binder.bind(Key.get(ObjectMapper.class, Json.class)).toInstance(TestHelper.makeJsonMapper());
binder.bind(LookupReferencesManager.class).toInstance(
LookupEnabledTestExprMacroTable.createTestLookupReferencesManager(
ImmutableMap.of(
"a", "xa",
"abc", "xabc"
)
)
);
},
new BloomFilterExtensionModule()
);
private static ObjectMapper jsonMapper =
injector
.getInstance(Key.get(ObjectMapper.class, Json.class))
.registerModules(Collections.singletonList(new BloomFilterSerializersModule()));
@Rule
@Override
public QueryLogHook getQueryLogHook()
{
return queryLogHook = QueryLogHook.create(jsonMapper);
}
@Test
public void testBloomFilter() throws Exception
{
BloomKFilter filter = new BloomKFilter(1500);
filter.addString("def");
byte[] bytes = BloomFilterSerializersModule.bloomKFilterToBytes(filter);
String base64 = Base64.encodeBase64String(bytes);
testQuery(
StringUtils.format("SELECT COUNT(*) FROM druid.foo WHERE bloom_filter_test(dim1, '%s')", base64),
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(QSS(Filtration.eternity()))
.granularity(Granularities.ALL)
.filters(
new BloomDimFilter("dim1", BloomKFilterHolder.fromBloomKFilter(filter), null)
)
.aggregators(AGGS(new CountAggregatorFactory("a0")))
.context(TIMESERIES_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{1L}
)
);
}
@Test
public void testBloomFilters() throws Exception
{
BloomKFilter filter = new BloomKFilter(1500);
filter.addString("def");
BloomKFilter filter2 = new BloomKFilter(1500);
filter.addString("abc");
byte[] bytes = BloomFilterSerializersModule.bloomKFilterToBytes(filter);
byte[] bytes2 = BloomFilterSerializersModule.bloomKFilterToBytes(filter2);
String base64 = Base64.encodeBase64String(bytes);
String base642 = Base64.encodeBase64String(bytes2);
testQuery(
StringUtils.format("SELECT COUNT(*) FROM druid.foo WHERE bloom_filter_test(dim1, '%s') OR bloom_filter_test(dim2, '%s')", base64, base642),
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(QSS(Filtration.eternity()))
.granularity(Granularities.ALL)
.filters(
new OrDimFilter(
new BloomDimFilter("dim1", BloomKFilterHolder.fromBloomKFilter(filter), null),
new BloomDimFilter("dim2", BloomKFilterHolder.fromBloomKFilter(filter2), null)
)
)
.aggregators(AGGS(new CountAggregatorFactory("a0")))
.context(TIMESERIES_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{2L}
)
);
}
@Override
public List<Object[]> getResults(
final PlannerConfig plannerConfig,
final Map<String, Object> queryContext,
final String sql,
final AuthenticationResult authenticationResult
) throws Exception
{
final DruidOperatorTable operatorTable = new DruidOperatorTable(
ImmutableSet.of(),
ImmutableSet.of(injector.getInstance(BloomFilterOperatorConversion.class))
);
return getResults(
plannerConfig,
queryContext,
sql,
authenticationResult,
operatorTable,
CalciteTests.createExprMacroTable(),
CalciteTests.TEST_AUTHORIZER_MAPPER,
jsonMapper
);
}
}

View File

@ -45,7 +45,6 @@ import org.apache.druid.query.extraction.TimeFormatExtractionFn;
import org.apache.druid.query.filter.AndDimFilter;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.ExpressionDimFilter;
import org.apache.druid.query.filter.LikeDimFilter;
import org.apache.druid.query.filter.NotDimFilter;
import org.apache.druid.query.filter.OrDimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
@ -417,30 +416,7 @@ public class Expressions
// Create a BoundRefKey that strips the extractionFn and compares __time as a number.
final BoundRefKey boundRefKey = new BoundRefKey(column, null, StringComparators.NUMERIC);
switch (flippedKind) {
case EQUALS:
return rhsAligned
? Bounds.interval(boundRefKey, rhsInterval)
: Filtration.matchNothing();
case NOT_EQUALS:
return rhsAligned
? new NotDimFilter(Bounds.interval(boundRefKey, rhsInterval))
: Filtration.matchEverything();
case GREATER_THAN:
return Bounds.greaterThanOrEqualTo(boundRefKey, String.valueOf(rhsInterval.getEndMillis()));
case GREATER_THAN_OR_EQUAL:
return rhsAligned
? Bounds.greaterThanOrEqualTo(boundRefKey, String.valueOf(rhsInterval.getStartMillis()))
: Bounds.greaterThanOrEqualTo(boundRefKey, String.valueOf(rhsInterval.getEndMillis()));
case LESS_THAN:
return rhsAligned
? Bounds.lessThan(boundRefKey, String.valueOf(rhsInterval.getStartMillis()))
: Bounds.lessThan(boundRefKey, String.valueOf(rhsInterval.getEndMillis()));
case LESS_THAN_OR_EQUAL:
return Bounds.lessThan(boundRefKey, String.valueOf(rhsInterval.getEndMillis()));
default:
throw new IllegalStateException("WTF?! Shouldn't have got here...");
}
return getBoundTimeDimFilter(flippedKind, boundRefKey, rhsInterval, rhsAligned);
}
}
@ -492,27 +468,29 @@ public class Expressions
}
return filter;
} else if (kind == SqlKind.LIKE) {
final List<RexNode> operands = ((RexCall) rexNode).getOperands();
final DruidExpression druidExpression = toDruidExpression(
plannerContext,
rowSignature,
operands.get(0)
);
if (druidExpression == null || !druidExpression.isSimpleExtraction()) {
} else if (rexNode instanceof RexCall) {
final SqlOperator operator = ((RexCall) rexNode).getOperator();
final SqlOperatorConversion conversion =
plannerContext.getOperatorTable().lookupOperatorConversion(operator);
if (conversion == null) {
return null;
} else {
DimFilter filter = conversion.toDruidFilter(plannerContext, rowSignature, rexNode);
if (filter != null) {
return filter;
}
DruidExpression expression = conversion.toDruidExpression(plannerContext, rowSignature, rexNode);
if (expression != null) {
return new ExpressionDimFilter(expression.getExpression(), plannerContext.getExprMacroTable());
}
}
return new LikeDimFilter(
druidExpression.getSimpleExtraction().getColumn(),
RexLiteral.stringValue(operands.get(1)),
operands.size() > 2 ? RexLiteral.stringValue(operands.get(2)) : null,
druidExpression.getSimpleExtraction().getExtractionFn()
);
} else {
return null;
}
return null;
}
public static ExprType exprTypeForValueType(final ValueType valueType)
{
switch (valueType) {
@ -600,27 +578,38 @@ public class Expressions
// Is rhs aligned on granularity boundaries?
final boolean rhsAligned = rhsInterval.getStartMillis() == rhsMillis;
return getBoundTimeDimFilter(operatorKind, boundRefKey, rhsInterval, rhsAligned);
}
private static DimFilter getBoundTimeDimFilter(
SqlKind operatorKind,
BoundRefKey boundRefKey,
Interval interval,
boolean isAligned
)
{
switch (operatorKind) {
case EQUALS:
return rhsAligned
? Bounds.interval(boundRefKey, rhsInterval)
return isAligned
? Bounds.interval(boundRefKey, interval)
: Filtration.matchNothing();
case NOT_EQUALS:
return rhsAligned
? new NotDimFilter(Bounds.interval(boundRefKey, rhsInterval))
return isAligned
? new NotDimFilter(Bounds.interval(boundRefKey, interval))
: Filtration.matchEverything();
case GREATER_THAN:
return Bounds.greaterThanOrEqualTo(boundRefKey, String.valueOf(rhsInterval.getEndMillis()));
return Bounds.greaterThanOrEqualTo(boundRefKey, String.valueOf(interval.getEndMillis()));
case GREATER_THAN_OR_EQUAL:
return rhsAligned
? Bounds.greaterThanOrEqualTo(boundRefKey, String.valueOf(rhsInterval.getStartMillis()))
: Bounds.greaterThanOrEqualTo(boundRefKey, String.valueOf(rhsInterval.getEndMillis()));
return isAligned
? Bounds.greaterThanOrEqualTo(boundRefKey, String.valueOf(interval.getStartMillis()))
: Bounds.greaterThanOrEqualTo(boundRefKey, String.valueOf(interval.getEndMillis()));
case LESS_THAN:
return rhsAligned
? Bounds.lessThan(boundRefKey, String.valueOf(rhsInterval.getStartMillis()))
: Bounds.lessThan(boundRefKey, String.valueOf(rhsInterval.getEndMillis()));
return isAligned
? Bounds.lessThan(boundRefKey, String.valueOf(interval.getStartMillis()))
: Bounds.lessThan(boundRefKey, String.valueOf(interval.getEndMillis()));
case LESS_THAN_OR_EQUAL:
return Bounds.lessThan(boundRefKey, String.valueOf(rhsInterval.getEndMillis()));
return Bounds.lessThan(boundRefKey, String.valueOf(interval.getEndMillis()));
default:
throw new IllegalStateException("WTF?! Shouldn't have got here...");
}

View File

@ -21,9 +21,12 @@ package org.apache.druid.sql.calcite.expression;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
public interface SqlOperatorConversion
{
/**
@ -44,9 +47,32 @@ public interface SqlOperatorConversion
*
* @see Expressions#toDruidExpression(PlannerContext, RowSignature, RexNode)
*/
DruidExpression toDruidExpression(
@Nullable
default DruidExpression toDruidExpression(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode
);
)
{
return null;
}
/**
* Returns a Druid Aggregation corresponding to a SQL {@link SqlOperator} used to filter rows
*
* @param plannerContext SQL planner context
* @param rowSignature signature of the rows being aggregated
* @param rexNode a rexBuilder, in case you need one
*
* @return filter, or null if the call cannot be translated
*/
@Nullable
default DimFilter toDruidFilter(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode
)
{
return null;
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.expression.builtin;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.LikeDimFilter;
import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
import java.util.List;
public class LikeOperatorConversion extends DirectOperatorConversion
{
private static final SqlOperator SQL_FUNCTION = SqlStdOperatorTable.LIKE;
public LikeOperatorConversion()
{
super(SQL_FUNCTION, "like");
}
@Override
public SqlOperator calciteOperator()
{
return SQL_FUNCTION;
}
@Nullable
@Override
public DimFilter toDruidFilter(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode
)
{
final List<RexNode> operands = ((RexCall) rexNode).getOperands();
final DruidExpression druidExpression = Expressions.toDruidExpression(
plannerContext,
rowSignature,
operands.get(0)
);
if (druidExpression == null || !druidExpression.isSimpleExtraction()) {
return null;
}
return new LikeDimFilter(
druidExpression.getSimpleExtraction().getColumn(),
RexLiteral.stringValue(operands.get(1)),
operands.size() > 2 ? RexLiteral.stringValue(operands.get(2)) : null,
druidExpression.getSimpleExtraction().getExtractionFn()
);
}
}

View File

@ -35,7 +35,7 @@ import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.RowSignature;
public class LookupOperatorConversion implements SqlOperatorConversion
public class QueryLookupOperatorConversion implements SqlOperatorConversion
{
private static final SqlFunction SQL_FUNCTION = OperatorConversions
.operatorBuilder("LOOKUP")
@ -47,7 +47,7 @@ public class LookupOperatorConversion implements SqlOperatorConversion
private final LookupReferencesManager lookupReferencesManager;
@Inject
public LookupOperatorConversion(final LookupReferencesManager lookupReferencesManager)
public QueryLookupOperatorConversion(final LookupReferencesManager lookupReferencesManager)
{
this.lookupReferencesManager = lookupReferencesManager;
}

View File

@ -54,6 +54,7 @@ import org.apache.druid.sql.calcite.expression.builtin.DateTruncOperatorConversi
import org.apache.druid.sql.calcite.expression.builtin.ExtractOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.FloorOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.LTrimOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.LikeOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.MillisToTimestampOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.PositionOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.RTrimOperatorConversion;
@ -72,6 +73,7 @@ import org.apache.druid.sql.calcite.expression.builtin.TimestampToMillisOperator
import org.apache.druid.sql.calcite.expression.builtin.TrimOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.TruncateOperatorConversion;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -94,6 +96,7 @@ public class DruidOperatorTable implements SqlOperatorTable
.add(new SumZeroSqlAggregator())
.build();
// STRLEN has so many aliases.
private static final SqlOperatorConversion CHARACTER_LENGTH_CONVERSION = new DirectOperatorConversion(
SqlStdOperatorTable.CHARACTER_LENGTH,
@ -111,7 +114,6 @@ public class DruidOperatorTable implements SqlOperatorTable
.add(new DirectOperatorConversion(SqlStdOperatorTable.CONCAT, "concat"))
.add(new DirectOperatorConversion(SqlStdOperatorTable.EXP, "exp"))
.add(new DirectOperatorConversion(SqlStdOperatorTable.DIVIDE_INTEGER, "div"))
.add(new DirectOperatorConversion(SqlStdOperatorTable.LIKE, "like"))
.add(new DirectOperatorConversion(SqlStdOperatorTable.LN, "log"))
.add(new DirectOperatorConversion(SqlStdOperatorTable.LOWER, "lower"))
.add(new DirectOperatorConversion(SqlStdOperatorTable.LOG10, "log10"))
@ -140,20 +142,12 @@ public class DruidOperatorTable implements SqlOperatorTable
.add(new BinaryOperatorConversion(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, "<="))
.add(new BinaryOperatorConversion(SqlStdOperatorTable.AND, "&&"))
.add(new BinaryOperatorConversion(SqlStdOperatorTable.OR, "||"))
.add(new CastOperatorConversion())
// time operators
.add(new CeilOperatorConversion())
.add(new DateTruncOperatorConversion())
.add(new ExtractOperatorConversion())
.add(new FloorOperatorConversion())
.add(new MillisToTimestampOperatorConversion())
.add(new ReinterpretOperatorConversion())
.add(new RegexpExtractOperatorConversion())
.add(new PositionOperatorConversion())
.add(new StrposOperatorConversion())
.add(new SubstringOperatorConversion())
.add(new ConcatOperatorConversion())
.add(new TextcatOperatorConversion())
.add(new AliasedOperatorConversion(new SubstringOperatorConversion(), "SUBSTR"))
.add(new TimeArithmeticOperatorConversion.TimeMinusIntervalOperatorConversion())
.add(new TimeArithmeticOperatorConversion.TimePlusIntervalOperatorConversion())
.add(new TimeExtractOperatorConversion())
@ -162,12 +156,24 @@ public class DruidOperatorTable implements SqlOperatorTable
.add(new TimeParseOperatorConversion())
.add(new TimeShiftOperatorConversion())
.add(new TimestampToMillisOperatorConversion())
.add(new TruncateOperatorConversion())
.add(new TrimOperatorConversion())
// string operators
.add(new BTrimOperatorConversion())
.add(new LikeOperatorConversion())
.add(new LTrimOperatorConversion())
.add(new PositionOperatorConversion())
.add(new RegexpExtractOperatorConversion())
.add(new RTrimOperatorConversion())
.add(new StrposOperatorConversion())
.add(new SubstringOperatorConversion())
.add(new AliasedOperatorConversion(new SubstringOperatorConversion(), "SUBSTR"))
.add(new ConcatOperatorConversion())
.add(new TextcatOperatorConversion())
.add(new TrimOperatorConversion())
.add(new TruncateOperatorConversion())
.add(new AliasedOperatorConversion(new TruncateOperatorConversion(), "TRUNC"))
// value coercion operators
.add(new CastOperatorConversion())
.add(new ReinterpretOperatorConversion())
.build();
// Operators that have no conversion, but are handled in the convertlet table, so they still need to exist.
@ -222,6 +228,7 @@ public class DruidOperatorTable implements SqlOperatorTable
}
}
@Nullable
public SqlAggregator lookupAggregator(final SqlAggFunction aggFunction)
{
final SqlAggregator sqlAggregator = aggregators.get(OperatorKey.of(aggFunction));
@ -232,6 +239,7 @@ public class DruidOperatorTable implements SqlOperatorTable
}
}
@Nullable
public SqlOperatorConversion lookupOperatorConversion(final SqlOperator operator)
{
final SqlOperatorConversion operatorConversion = operatorConversions.get(OperatorKey.of(operator));
@ -250,6 +258,10 @@ public class DruidOperatorTable implements SqlOperatorTable
final List<SqlOperator> operatorList
)
{
if (opName == null) {
return;
}
if (opName.names.size() != 1) {
return;
}
@ -301,7 +313,7 @@ public class DruidOperatorTable implements SqlOperatorTable
private final String name;
private final SqlSyntax syntax;
public OperatorKey(final String name, final SqlSyntax syntax)
OperatorKey(final String name, final SqlSyntax syntax)
{
this.name = StringUtils.toLowerCase(Preconditions.checkNotNull(name, "name"));
this.syntax = normalizeSyntax(Preconditions.checkNotNull(syntax, "syntax"));

View File

@ -800,7 +800,6 @@ public class DruidQuery
if (sortProject != null) {
postAggregators.addAll(sortProject.getPostAggregators());
}
final Map<String, Object> theContext = new HashMap<>();
theContext.put("skipEmptyBuckets", true);
theContext.putAll(plannerContext.getQueryContext());

View File

@ -34,7 +34,7 @@ import org.apache.druid.sql.avatica.AvaticaMonitor;
import org.apache.druid.sql.avatica.AvaticaServerConfig;
import org.apache.druid.sql.avatica.DruidAvaticaHandler;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.builtin.LookupOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.QueryLookupOperatorConversion;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.schema.DruidSchema;
@ -71,8 +71,8 @@ public class SqlModule implements Module
// Add empty SqlAggregator binder.
Multibinder.newSetBinder(binder, SqlAggregator.class);
// LookupOperatorConversion isn't in DruidOperatorTable since it needs a LookupReferencesManager injected.
SqlBindings.addOperatorConversion(binder, LookupOperatorConversion.class);
// QueryLookupOperatorConversion isn't in DruidOperatorTable since it needs a LookupReferencesManager injected.
SqlBindings.addOperatorConversion(binder, QueryLookupOperatorConversion.class);
if (isJsonOverHttpEnabled()) {
Jerseys.addResource(binder, SqlResource.class);

View File

@ -45,13 +45,11 @@ import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
@ -107,50 +105,45 @@ public class SqlResource
try {
return Response
.ok(
new StreamingOutput()
{
@Override
public void write(final OutputStream outputStream) throws IOException, WebApplicationException
{
Yielder<Object[]> yielder = yielder0;
(StreamingOutput) outputStream -> {
Yielder<Object[]> yielder = yielder0;
try (final ResultFormat.Writer writer = sqlQuery.getResultFormat()
.createFormatter(outputStream, jsonMapper)) {
writer.writeResponseStart();
try (final ResultFormat.Writer writer = sqlQuery.getResultFormat()
.createFormatter(outputStream, jsonMapper)) {
writer.writeResponseStart();
if (sqlQuery.includeHeader()) {
writer.writeHeader(Arrays.asList(columnNames));
}
if (sqlQuery.includeHeader()) {
writer.writeHeader(Arrays.asList(columnNames));
}
while (!yielder.isDone()) {
final Object[] row = yielder.get();
writer.writeRowStart();
for (int i = 0; i < fieldList.size(); i++) {
final Object value;
while (!yielder.isDone()) {
final Object[] row = yielder.get();
writer.writeRowStart();
for (int i = 0; i < fieldList.size(); i++) {
final Object value;
if (timeColumns[i]) {
value = ISODateTimeFormat.dateTime().print(
Calcites.calciteTimestampToJoda((long) row[i], timeZone)
);
} else if (dateColumns[i]) {
value = ISODateTimeFormat.dateTime().print(
Calcites.calciteDateToJoda((int) row[i], timeZone)
);
} else {
value = row[i];
}
writer.writeRowField(fieldList.get(i).getName(), value);
if (timeColumns[i]) {
value = ISODateTimeFormat.dateTime().print(
Calcites.calciteTimestampToJoda((long) row[i], timeZone)
);
} else if (dateColumns[i]) {
value = ISODateTimeFormat.dateTime().print(
Calcites.calciteDateToJoda((int) row[i], timeZone)
);
} else {
value = row[i];
}
writer.writeRowEnd();
yielder = yielder.next(null);
}
writer.writeResponseEnd();
}
finally {
yielder.close();
writer.writeRowField(fieldList.get(i).getName(), value);
}
writer.writeRowEnd();
yielder = yielder.next(null);
}
writer.writeResponseEnd();
}
finally {
yielder.close();
}
}
)

View File

@ -0,0 +1,606 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.hll.VersionOneHyperLogLogCollector;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.post.ExpressionPostAggregator;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.extraction.CascadeExtractionFn;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.filter.AndDimFilter;
import org.apache.druid.query.filter.BoundDimFilter;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.ExpressionDimFilter;
import org.apache.druid.query.filter.InDimFilter;
import org.apache.druid.query.filter.NotDimFilter;
import org.apache.druid.query.filter.OrDimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.groupby.having.DimFilterHavingSpec;
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.select.PagingSpec;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.DruidPlanner;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.planner.PlannerResult;
import org.apache.druid.sql.calcite.schema.DruidSchema;
import org.apache.druid.sql.calcite.schema.SystemSchema;
import org.apache.druid.sql.calcite.util.CalciteTestBase;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.QueryLogHook;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.sql.calcite.view.InProcessViewManager;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class BaseCalciteQueryTest extends CalciteTestBase
{
public static final String NULL_VALUE = NullHandling.replaceWithDefault() ? "" : null;
public static final String HLLC_STRING = VersionOneHyperLogLogCollector.class.getName();
public static final Logger log = new Logger(BaseCalciteQueryTest.class);
public static final PlannerConfig PLANNER_CONFIG_DEFAULT = new PlannerConfig();
public static final PlannerConfig PLANNER_CONFIG_REQUIRE_TIME_CONDITION = new PlannerConfig()
{
@Override
public boolean isRequireTimeCondition()
{
return true;
}
};
public static final PlannerConfig PLANNER_CONFIG_NO_TOPN = new PlannerConfig()
{
@Override
public int getMaxTopNLimit()
{
return 0;
}
};
public static final PlannerConfig PLANNER_CONFIG_NO_HLL = new PlannerConfig()
{
@Override
public boolean isUseApproximateCountDistinct()
{
return false;
}
};
public static final PlannerConfig PLANNER_CONFIG_FALLBACK = new PlannerConfig()
{
@Override
public boolean isUseFallback()
{
return true;
}
};
public static final PlannerConfig PLANNER_CONFIG_SINGLE_NESTING_ONLY = new PlannerConfig()
{
@Override
public int getMaxQueryCount()
{
return 2;
}
};
public static final PlannerConfig PLANNER_CONFIG_NO_SUBQUERIES = new PlannerConfig()
{
@Override
public int getMaxQueryCount()
{
return 1;
}
};
public static final PlannerConfig PLANNER_CONFIG_LOS_ANGELES = new PlannerConfig()
{
@Override
public DateTimeZone getSqlTimeZone()
{
return DateTimes.inferTzfromString("America/Los_Angeles");
}
};
public static final PlannerConfig PLANNER_CONFIG_SEMI_JOIN_ROWS_LIMIT = new PlannerConfig()
{
@Override
public int getMaxSemiJoinRowsInMemory()
{
return 2;
}
};
public static final String LOS_ANGELES = "America/Los_Angeles";
public static final Map<String, Object> QUERY_CONTEXT_DEFAULT = ImmutableMap.of(
PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z",
QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS,
QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE
);
public static final Map<String, Object> QUERY_CONTEXT_DONT_SKIP_EMPTY_BUCKETS = ImmutableMap.of(
PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z",
"skipEmptyBuckets", false,
QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS,
QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE
);
public static final Map<String, Object> QUERY_CONTEXT_NO_TOPN = ImmutableMap.of(
PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z",
PlannerConfig.CTX_KEY_USE_APPROXIMATE_TOPN, "false",
QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS,
QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE
);
public static final Map<String, Object> QUERY_CONTEXT_LOS_ANGELES = ImmutableMap.of(
PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z",
PlannerContext.CTX_SQL_TIME_ZONE, LOS_ANGELES,
QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS,
QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE
);
// Matches QUERY_CONTEXT_DEFAULT
public static final Map<String, Object> TIMESERIES_CONTEXT_DEFAULT = ImmutableMap.of(
PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z",
"skipEmptyBuckets", true,
QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS,
QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE
);
// Matches QUERY_CONTEXT_LOS_ANGELES
public static final Map<String, Object> TIMESERIES_CONTEXT_LOS_ANGELES = new HashMap<>();
public static final PagingSpec FIRST_PAGING_SPEC = new PagingSpec(null, 1000, true);
public static QueryRunnerFactoryConglomerate conglomerate;
public static Closer resourceCloser;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
public SpecificSegmentsQuerySegmentWalker walker = null;
public QueryLogHook queryLogHook;
{
TIMESERIES_CONTEXT_LOS_ANGELES.put(PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z");
TIMESERIES_CONTEXT_LOS_ANGELES.put(PlannerContext.CTX_SQL_TIME_ZONE, LOS_ANGELES);
TIMESERIES_CONTEXT_LOS_ANGELES.put("skipEmptyBuckets", true);
TIMESERIES_CONTEXT_LOS_ANGELES.put(QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS);
TIMESERIES_CONTEXT_LOS_ANGELES.put(QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE);
}
// Generate timestamps for expected results
public static long T(final String timeString)
{
return Calcites.jodaToCalciteTimestamp(DateTimes.of(timeString), DateTimeZone.UTC);
}
// Generate timestamps for expected results
public static long T(final String timeString, final String timeZoneString)
{
final DateTimeZone timeZone = DateTimes.inferTzfromString(timeZoneString);
return Calcites.jodaToCalciteTimestamp(new DateTime(timeString, timeZone), timeZone);
}
// Generate day numbers for expected results
public static int D(final String dayString)
{
return (int) (Intervals.utc(T("1970"), T(dayString)).toDurationMillis() / (86400L * 1000L));
}
public static QuerySegmentSpec QSS(final Interval... intervals)
{
return new MultipleIntervalSegmentSpec(Arrays.asList(intervals));
}
public static AndDimFilter AND(DimFilter... filters)
{
return new AndDimFilter(Arrays.asList(filters));
}
public static OrDimFilter OR(DimFilter... filters)
{
return new OrDimFilter(Arrays.asList(filters));
}
public static NotDimFilter NOT(DimFilter filter)
{
return new NotDimFilter(filter);
}
public static InDimFilter IN(String dimension, List<String> values, ExtractionFn extractionFn)
{
return new InDimFilter(dimension, values, extractionFn);
}
public static SelectorDimFilter SELECTOR(final String fieldName, final String value, final ExtractionFn extractionFn)
{
return new SelectorDimFilter(fieldName, value, extractionFn);
}
public static ExpressionDimFilter EXPRESSION_FILTER(final String expression)
{
return new ExpressionDimFilter(expression, CalciteTests.createExprMacroTable());
}
public static DimFilter NUMERIC_SELECTOR(
final String fieldName,
final String value,
final ExtractionFn extractionFn
)
{
// We use Bound filters for numeric equality to achieve "10.0" = "10"
return BOUND(fieldName, value, value, false, false, extractionFn, StringComparators.NUMERIC);
}
public static BoundDimFilter BOUND(
final String fieldName,
final String lower,
final String upper,
final boolean lowerStrict,
final boolean upperStrict,
final ExtractionFn extractionFn,
final StringComparator comparator
)
{
return new BoundDimFilter(fieldName, lower, upper, lowerStrict, upperStrict, null, extractionFn, comparator);
}
public static BoundDimFilter TIME_BOUND(final Object intervalObj)
{
final Interval interval = new Interval(intervalObj, ISOChronology.getInstanceUTC());
return new BoundDimFilter(
ColumnHolder.TIME_COLUMN_NAME,
String.valueOf(interval.getStartMillis()),
String.valueOf(interval.getEndMillis()),
false,
true,
null,
null,
StringComparators.NUMERIC
);
}
public static CascadeExtractionFn CASCADE(final ExtractionFn... fns)
{
return new CascadeExtractionFn(fns);
}
public static List<DimensionSpec> DIMS(final DimensionSpec... dimensionSpecs)
{
return Arrays.asList(dimensionSpecs);
}
public static List<AggregatorFactory> AGGS(final AggregatorFactory... aggregators)
{
return Arrays.asList(aggregators);
}
public static DimFilterHavingSpec HAVING(final DimFilter filter)
{
return new DimFilterHavingSpec(filter, true);
}
public static ExpressionVirtualColumn EXPRESSION_VIRTUAL_COLUMN(
final String name,
final String expression,
final ValueType outputType
)
{
return new ExpressionVirtualColumn(name, expression, outputType, CalciteTests.createExprMacroTable());
}
public static ExpressionPostAggregator EXPRESSION_POST_AGG(final String name, final String expression)
{
return new ExpressionPostAggregator(name, expression, null, CalciteTests.createExprMacroTable());
}
public static ScanQuery.ScanQueryBuilder newScanQueryBuilder()
{
return new ScanQuery.ScanQueryBuilder().resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false);
}
@BeforeClass
public static void setUpClass()
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
conglomerate = conglomerateCloserPair.lhs;
resourceCloser = conglomerateCloserPair.rhs;
}
@AfterClass
public static void tearDownClass() throws IOException
{
resourceCloser.close();
}
@Rule
public QueryLogHook getQueryLogHook()
{
return queryLogHook = QueryLogHook.create();
}
@Before
public void setUp() throws Exception
{
walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder());
}
@After
public void tearDown() throws Exception
{
walker.close();
walker = null;
}
public void assertQueryIsUnplannable(final String sql)
{
assertQueryIsUnplannable(PLANNER_CONFIG_DEFAULT, sql);
}
public void assertQueryIsUnplannable(final PlannerConfig plannerConfig, final String sql)
{
Exception e = null;
try {
testQuery(plannerConfig, sql, CalciteTests.REGULAR_USER_AUTH_RESULT, ImmutableList.of(), ImmutableList.of());
}
catch (Exception e1) {
e = e1;
}
if (!(e instanceof RelOptPlanner.CannotPlanException)) {
log.error(e, "Expected CannotPlanException for query: %s", sql);
Assert.fail(sql);
}
}
/**
* Provided for tests that wish to check multiple queries instead of relying on ExpectedException.
*/
public void assertQueryIsForbidden(final String sql, final AuthenticationResult authenticationResult)
{
assertQueryIsForbidden(PLANNER_CONFIG_DEFAULT, sql, authenticationResult);
}
public void assertQueryIsForbidden(
final PlannerConfig plannerConfig,
final String sql,
final AuthenticationResult authenticationResult
)
{
Exception e = null;
try {
testQuery(plannerConfig, sql, authenticationResult, ImmutableList.of(), ImmutableList.of());
}
catch (Exception e1) {
e = e1;
}
if (!(e instanceof ForbiddenException)) {
log.error(e, "Expected ForbiddenException for query: %s with authResult: %s", sql, authenticationResult);
Assert.fail(sql);
}
}
public void testQuery(
final String sql,
final List<Query> expectedQueries,
final List<Object[]> expectedResults
) throws Exception
{
testQuery(
PLANNER_CONFIG_DEFAULT,
QUERY_CONTEXT_DEFAULT,
sql,
CalciteTests.REGULAR_USER_AUTH_RESULT,
expectedQueries,
expectedResults
);
}
public void testQuery(
final String sql,
final Map<String, Object> queryContext,
final List<Query> expectedQueries,
final List<Object[]> expectedResults
) throws Exception
{
testQuery(
PLANNER_CONFIG_DEFAULT,
queryContext,
sql,
CalciteTests.REGULAR_USER_AUTH_RESULT,
expectedQueries,
expectedResults
);
}
public void testQuery(
final PlannerConfig plannerConfig,
final String sql,
final AuthenticationResult authenticationResult,
final List<Query> expectedQueries,
final List<Object[]> expectedResults
) throws Exception
{
testQuery(plannerConfig, QUERY_CONTEXT_DEFAULT, sql, authenticationResult, expectedQueries, expectedResults);
}
public void testQuery(
final PlannerConfig plannerConfig,
final Map<String, Object> queryContext,
final String sql,
final AuthenticationResult authenticationResult,
final List<Query> expectedQueries,
final List<Object[]> expectedResults
) throws Exception
{
log.info("SQL: %s", sql);
queryLogHook.clearRecordedQueries();
final List<Object[]> plannerResults = getResults(plannerConfig, queryContext, sql, authenticationResult);
verifyResults(sql, expectedQueries, expectedResults, plannerResults);
}
public List<Object[]> getResults(
final PlannerConfig plannerConfig,
final Map<String, Object> queryContext,
final String sql,
final AuthenticationResult authenticationResult
) throws Exception
{
return getResults(
plannerConfig,
queryContext,
sql,
authenticationResult,
CalciteTests.createOperatorTable(),
CalciteTests.createExprMacroTable(),
CalciteTests.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper()
);
}
public List<Object[]> getResults(
final PlannerConfig plannerConfig,
final Map<String, Object> queryContext,
final String sql,
final AuthenticationResult authenticationResult,
final DruidOperatorTable operatorTable,
final ExprMacroTable macroTable,
final AuthorizerMapper authorizerMapper,
final ObjectMapper objectMapper
) throws Exception
{
final InProcessViewManager viewManager = new InProcessViewManager(CalciteTests.TEST_AUTHENTICATOR_ESCALATOR);
final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig, viewManager);
final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker);
final PlannerFactory plannerFactory = new PlannerFactory(
druidSchema,
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
macroTable,
plannerConfig,
authorizerMapper,
objectMapper
);
viewManager.createView(
plannerFactory,
"aview",
"SELECT SUBSTRING(dim1, 1, 1) AS dim1_firstchar FROM foo WHERE dim2 = 'a'"
);
viewManager.createView(
plannerFactory,
"bview",
"SELECT COUNT(*) FROM druid.foo\n"
+ "WHERE __time >= CURRENT_TIMESTAMP + INTERVAL '1' DAY AND __time < TIMESTAMP '2002-01-01 00:00:00'"
);
try (DruidPlanner planner = plannerFactory.createPlanner(queryContext)) {
final PlannerResult plan = planner.plan(sql, authenticationResult);
return plan.run().toList();
}
}
public void verifyResults(
final String sql,
final List<Query> expectedQueries,
final List<Object[]> expectedResults,
final List<Object[]> results
)
{
for (int i = 0; i < results.size(); i++) {
log.info("row #%d: %s", i, Arrays.toString(results.get(i)));
}
Assert.assertEquals(StringUtils.format("result count: %s", sql), expectedResults.size(), results.size());
for (int i = 0; i < results.size(); i++) {
Assert.assertArrayEquals(
StringUtils.format("result #%d: %s", i + 1, sql),
expectedResults.get(i),
results.get(i)
);
}
if (expectedQueries != null) {
final List<Query> recordedQueries = queryLogHook.getRecordedQueries();
Assert.assertEquals(
StringUtils.format("query count: %s", sql),
expectedQueries.size(),
recordedQueries.size()
);
for (int i = 0; i < expectedQueries.size(); i++) {
Assert.assertEquals(
StringUtils.format("query #%d: %s", i + 1, sql),
expectedQueries.get(i),
recordedQueries.get(i)
);
}
}
}
}

View File

@ -22,26 +22,16 @@ package org.apache.druid.sql.calcite;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.hll.VersionOneHyperLogLogCollector;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
@ -55,253 +45,48 @@ import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFacto
import org.apache.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
import org.apache.druid.query.aggregation.post.ExpressionPostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.dimension.ExtractionDimensionSpec;
import org.apache.druid.query.extraction.CascadeExtractionFn;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.extraction.RegexDimExtractionFn;
import org.apache.druid.query.extraction.SubstringDimExtractionFn;
import org.apache.druid.query.filter.AndDimFilter;
import org.apache.druid.query.filter.BoundDimFilter;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.ExpressionDimFilter;
import org.apache.druid.query.filter.InDimFilter;
import org.apache.druid.query.filter.LikeDimFilter;
import org.apache.druid.query.filter.NotDimFilter;
import org.apache.druid.query.filter.OrDimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.having.DimFilterHavingSpec;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec.Direction;
import org.apache.druid.query.lookup.RegisteredLookupExtractionFn;
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.select.PagingSpec;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.topn.DimensionTopNMetricSpec;
import org.apache.druid.query.topn.InvertedTopNMetricSpec;
import org.apache.druid.query.topn.NumericTopNMetricSpec;
import org.apache.druid.query.topn.TopNQueryBuilder;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.DruidPlanner;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.planner.PlannerResult;
import org.apache.druid.sql.calcite.rel.CannotBuildQueryException;
import org.apache.druid.sql.calcite.schema.DruidSchema;
import org.apache.druid.sql.calcite.schema.SystemSchema;
import org.apache.druid.sql.calcite.util.CalciteTestBase;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.QueryLogHook;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.sql.calcite.view.InProcessViewManager;
import org.hamcrest.CoreMatchers;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.joda.time.chrono.ISOChronology;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class CalciteQueryTest extends CalciteTestBase
public class CalciteQueryTest extends BaseCalciteQueryTest
{
private static final String NULL_VALUE = NullHandling.replaceWithDefault() ? "" : null;
private static final String HLLC_STRING = VersionOneHyperLogLogCollector.class.getName();
private static final Logger log = new Logger(CalciteQueryTest.class);
private static final PlannerConfig PLANNER_CONFIG_DEFAULT = new PlannerConfig();
private static final PlannerConfig PLANNER_CONFIG_REQUIRE_TIME_CONDITION = new PlannerConfig()
{
@Override
public boolean isRequireTimeCondition()
{
return true;
}
};
private static final PlannerConfig PLANNER_CONFIG_NO_TOPN = new PlannerConfig()
{
@Override
public int getMaxTopNLimit()
{
return 0;
}
};
private static final PlannerConfig PLANNER_CONFIG_NO_HLL = new PlannerConfig()
{
@Override
public boolean isUseApproximateCountDistinct()
{
return false;
}
};
private static final PlannerConfig PLANNER_CONFIG_FALLBACK = new PlannerConfig()
{
@Override
public boolean isUseFallback()
{
return true;
}
};
private static final PlannerConfig PLANNER_CONFIG_SINGLE_NESTING_ONLY = new PlannerConfig()
{
@Override
public int getMaxQueryCount()
{
return 2;
}
};
private static final PlannerConfig PLANNER_CONFIG_NO_SUBQUERIES = new PlannerConfig()
{
@Override
public int getMaxQueryCount()
{
return 1;
}
};
private static final PlannerConfig PLANNER_CONFIG_LOS_ANGELES = new PlannerConfig()
{
@Override
public DateTimeZone getSqlTimeZone()
{
return DateTimes.inferTzfromString("America/Los_Angeles");
}
};
private static final PlannerConfig PLANNER_CONFIG_SEMI_JOIN_ROWS_LIMIT = new PlannerConfig()
{
@Override
public int getMaxSemiJoinRowsInMemory()
{
return 2;
}
};
private static final String LOS_ANGELES = "America/Los_Angeles";
private static final Map<String, Object> QUERY_CONTEXT_DEFAULT = ImmutableMap.of(
PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z",
QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS,
QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE
);
private static final Map<String, Object> QUERY_CONTEXT_DONT_SKIP_EMPTY_BUCKETS = ImmutableMap.of(
PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z",
"skipEmptyBuckets", false,
QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS,
QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE
);
private static final Map<String, Object> QUERY_CONTEXT_NO_TOPN = ImmutableMap.of(
PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z",
PlannerConfig.CTX_KEY_USE_APPROXIMATE_TOPN, "false",
QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS,
QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE
);
private static final Map<String, Object> QUERY_CONTEXT_LOS_ANGELES = ImmutableMap.of(
PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z",
PlannerContext.CTX_SQL_TIME_ZONE, LOS_ANGELES,
QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS,
QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE
);
// Matches QUERY_CONTEXT_DEFAULT
public static final Map<String, Object> TIMESERIES_CONTEXT_DEFAULT = ImmutableMap.of(
PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z",
"skipEmptyBuckets", true,
QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS,
QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE
);
// Matches QUERY_CONTEXT_LOS_ANGELES
public static final Map<String, Object> TIMESERIES_CONTEXT_LOS_ANGELES = new HashMap<>();
{
TIMESERIES_CONTEXT_LOS_ANGELES.put(PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z");
TIMESERIES_CONTEXT_LOS_ANGELES.put(PlannerContext.CTX_SQL_TIME_ZONE, LOS_ANGELES);
TIMESERIES_CONTEXT_LOS_ANGELES.put("skipEmptyBuckets", true);
TIMESERIES_CONTEXT_LOS_ANGELES.put(QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS);
TIMESERIES_CONTEXT_LOS_ANGELES.put(QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE);
}
private static final PagingSpec FIRST_PAGING_SPEC = new PagingSpec(null, 1000, true);
private static QueryRunnerFactoryConglomerate conglomerate;
private static Closer resourceCloser;
@BeforeClass
public static void setUpClass()
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
conglomerate = conglomerateCloserPair.lhs;
resourceCloser = conglomerateCloserPair.rhs;
}
@AfterClass
public static void tearDownClass() throws IOException
{
resourceCloser.close();
}
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public QueryLogHook queryLogHook = QueryLogHook.create();
private SpecificSegmentsQuerySegmentWalker walker = null;
@Before
public void setUp() throws Exception
{
walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder());
}
@After
public void tearDown() throws Exception
{
walker.close();
walker = null;
}
@Test
public void testSelectConstantExpression() throws Exception
{
@ -2060,55 +1845,6 @@ public class CalciteQueryTest extends CalciteTestBase
}
}
private void assertQueryIsUnplannable(final String sql)
{
assertQueryIsUnplannable(PLANNER_CONFIG_DEFAULT, sql);
}
private void assertQueryIsUnplannable(final PlannerConfig plannerConfig, final String sql)
{
Exception e = null;
try {
testQuery(plannerConfig, sql, CalciteTests.REGULAR_USER_AUTH_RESULT, ImmutableList.of(), ImmutableList.of());
}
catch (Exception e1) {
e = e1;
}
if (!(e instanceof RelOptPlanner.CannotPlanException)) {
log.error(e, "Expected CannotPlanException for query: %s", sql);
Assert.fail(sql);
}
}
/**
* Provided for tests that wish to check multiple queries instead of relying on ExpectedException.
*/
private void assertQueryIsForbidden(final String sql, final AuthenticationResult authenticationResult)
{
assertQueryIsForbidden(PLANNER_CONFIG_DEFAULT, sql, authenticationResult);
}
private void assertQueryIsForbidden(
final PlannerConfig plannerConfig,
final String sql,
final AuthenticationResult authenticationResult
)
{
Exception e = null;
try {
testQuery(plannerConfig, sql, authenticationResult, ImmutableList.of(), ImmutableList.of());
}
catch (Exception e1) {
e = e1;
}
if (!(e instanceof ForbiddenException)) {
log.error(e, "Expected ForbiddenException for query: %s with authResult: %s", sql, authenticationResult);
Assert.fail(sql);
}
}
@Test
public void testSelectStarWithDimFilter() throws Exception
{
@ -7666,259 +7402,4 @@ public class CalciteQueryTest extends CalciteTestBase
ImmutableList.of()
);
}
private void testQuery(
final String sql,
final List<Query> expectedQueries,
final List<Object[]> expectedResults
) throws Exception
{
testQuery(
PLANNER_CONFIG_DEFAULT,
QUERY_CONTEXT_DEFAULT,
sql,
CalciteTests.REGULAR_USER_AUTH_RESULT,
expectedQueries,
expectedResults
);
}
private void testQuery(
final PlannerConfig plannerConfig,
final String sql,
final AuthenticationResult authenticationResult,
final List<Query> expectedQueries,
final List<Object[]> expectedResults
) throws Exception
{
testQuery(plannerConfig, QUERY_CONTEXT_DEFAULT, sql, authenticationResult, expectedQueries, expectedResults);
}
private void testQuery(
final PlannerConfig plannerConfig,
final Map<String, Object> queryContext,
final String sql,
final AuthenticationResult authenticationResult,
final List<Query> expectedQueries,
final List<Object[]> expectedResults
) throws Exception
{
log.info("SQL: %s", sql);
queryLogHook.clearRecordedQueries();
final List<Object[]> plannerResults = getResults(plannerConfig, queryContext, sql, authenticationResult);
verifyResults(sql, expectedQueries, expectedResults, plannerResults);
}
private List<Object[]> getResults(
final PlannerConfig plannerConfig,
final Map<String, Object> queryContext,
final String sql,
final AuthenticationResult authenticationResult
) throws Exception
{
final InProcessViewManager viewManager = new InProcessViewManager(CalciteTests.TEST_AUTHENTICATOR_ESCALATOR);
final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig, viewManager);
final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker);
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
final PlannerFactory plannerFactory = new PlannerFactory(
druidSchema,
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
macroTable,
plannerConfig,
CalciteTests.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper()
);
viewManager.createView(
plannerFactory,
"aview",
"SELECT SUBSTRING(dim1, 1, 1) AS dim1_firstchar FROM foo WHERE dim2 = 'a'"
);
viewManager.createView(
plannerFactory,
"bview",
"SELECT COUNT(*) FROM druid.foo\n"
+ "WHERE __time >= CURRENT_TIMESTAMP + INTERVAL '1' DAY AND __time < TIMESTAMP '2002-01-01 00:00:00'"
);
try (DruidPlanner planner = plannerFactory.createPlanner(queryContext)) {
final PlannerResult plan = planner.plan(sql, authenticationResult);
return plan.run().toList();
}
}
private void verifyResults(
final String sql,
final List<Query> expectedQueries,
final List<Object[]> expectedResults,
final List<Object[]> results
)
{
for (int i = 0; i < results.size(); i++) {
log.info("row #%d: %s", i, Arrays.toString(results.get(i)));
}
Assert.assertEquals(StringUtils.format("result count: %s", sql), expectedResults.size(), results.size());
for (int i = 0; i < results.size(); i++) {
Assert.assertArrayEquals(
StringUtils.format("result #%d: %s", i + 1, sql),
expectedResults.get(i),
results.get(i)
);
}
if (expectedQueries != null) {
final List<Query> recordedQueries = queryLogHook.getRecordedQueries();
Assert.assertEquals(
StringUtils.format("query count: %s", sql),
expectedQueries.size(),
recordedQueries.size()
);
for (int i = 0; i < expectedQueries.size(); i++) {
Assert.assertEquals(
StringUtils.format("query #%d: %s", i + 1, sql),
expectedQueries.get(i),
recordedQueries.get(i)
);
}
}
}
// Generate timestamps for expected results
private static long T(final String timeString)
{
return Calcites.jodaToCalciteTimestamp(DateTimes.of(timeString), DateTimeZone.UTC);
}
// Generate timestamps for expected results
private static long T(final String timeString, final String timeZoneString)
{
final DateTimeZone timeZone = DateTimes.inferTzfromString(timeZoneString);
return Calcites.jodaToCalciteTimestamp(new DateTime(timeString, timeZone), timeZone);
}
// Generate day numbers for expected results
private static int D(final String dayString)
{
return (int) (Intervals.utc(T("1970"), T(dayString)).toDurationMillis() / (86400L * 1000L));
}
private static QuerySegmentSpec QSS(final Interval... intervals)
{
return new MultipleIntervalSegmentSpec(Arrays.asList(intervals));
}
private static AndDimFilter AND(DimFilter... filters)
{
return new AndDimFilter(Arrays.asList(filters));
}
private static OrDimFilter OR(DimFilter... filters)
{
return new OrDimFilter(Arrays.asList(filters));
}
private static NotDimFilter NOT(DimFilter filter)
{
return new NotDimFilter(filter);
}
private static InDimFilter IN(String dimension, List<String> values, ExtractionFn extractionFn)
{
return new InDimFilter(dimension, values, extractionFn);
}
private static SelectorDimFilter SELECTOR(final String fieldName, final String value, final ExtractionFn extractionFn)
{
return new SelectorDimFilter(fieldName, value, extractionFn);
}
private static ExpressionDimFilter EXPRESSION_FILTER(final String expression)
{
return new ExpressionDimFilter(expression, CalciteTests.createExprMacroTable());
}
private static DimFilter NUMERIC_SELECTOR(
final String fieldName,
final String value,
final ExtractionFn extractionFn
)
{
// We use Bound filters for numeric equality to achieve "10.0" = "10"
return BOUND(fieldName, value, value, false, false, extractionFn, StringComparators.NUMERIC);
}
private static BoundDimFilter BOUND(
final String fieldName,
final String lower,
final String upper,
final boolean lowerStrict,
final boolean upperStrict,
final ExtractionFn extractionFn,
final StringComparator comparator
)
{
return new BoundDimFilter(fieldName, lower, upper, lowerStrict, upperStrict, null, extractionFn, comparator);
}
private static BoundDimFilter TIME_BOUND(final Object intervalObj)
{
final Interval interval = new Interval(intervalObj, ISOChronology.getInstanceUTC());
return new BoundDimFilter(
ColumnHolder.TIME_COLUMN_NAME,
String.valueOf(interval.getStartMillis()),
String.valueOf(interval.getEndMillis()),
false,
true,
null,
null,
StringComparators.NUMERIC
);
}
private static CascadeExtractionFn CASCADE(final ExtractionFn... fns)
{
return new CascadeExtractionFn(fns);
}
private static List<DimensionSpec> DIMS(final DimensionSpec... dimensionSpecs)
{
return Arrays.asList(dimensionSpecs);
}
private static List<AggregatorFactory> AGGS(final AggregatorFactory... aggregators)
{
return Arrays.asList(aggregators);
}
private static DimFilterHavingSpec HAVING(final DimFilter filter)
{
return new DimFilterHavingSpec(filter, true);
}
private static ExpressionVirtualColumn EXPRESSION_VIRTUAL_COLUMN(
final String name,
final String expression,
final ValueType outputType
)
{
return new ExpressionVirtualColumn(name, expression, outputType, CalciteTests.createExprMacroTable());
}
private static ExpressionPostAggregator EXPRESSION_POST_AGG(final String name, final String expression)
{
return new ExpressionPostAggregator(name, expression, null, CalciteTests.createExprMacroTable());
}
private static ScanQuery.ScanQueryBuilder newScanQueryBuilder()
{
return new ScanQuery.ScanQueryBuilder().resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false);
}
}

View File

@ -26,7 +26,6 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
@ -103,7 +102,6 @@ import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFacto
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.log.NoopRequestLogger;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AllowAllAuthenticator;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthenticationResult;
@ -113,10 +111,9 @@ import org.apache.druid.server.security.Authorizer;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.LookupOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.QueryLookupOperatorConversion;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.schema.DruidSchema;
@ -153,20 +150,15 @@ public class CalciteTests
@Override
public Authorizer getAuthorizer(String name)
{
return new Authorizer()
{
@Override
public Access authorize(AuthenticationResult authenticationResult, Resource resource, Action action)
{
if (authenticationResult.getIdentity().equals(TEST_SUPERUSER_NAME)) {
return Access.OK;
}
return (authenticationResult, resource, action) -> {
if (authenticationResult.getIdentity().equals(TEST_SUPERUSER_NAME)) {
return Access.OK;
}
if (resource.getType() == ResourceType.DATASOURCE && resource.getName().equals(FORBIDDEN_DATASOURCE)) {
return new Access(false);
} else {
return Access.OK;
}
if (resource.getType() == ResourceType.DATASOURCE && resource.getName().equals(FORBIDDEN_DATASOURCE)) {
return new Access(false);
} else {
return Access.OK;
}
};
}
@ -221,25 +213,20 @@ public class CalciteTests
);
private static final Injector INJECTOR = Guice.createInjector(
new Module()
{
@Override
public void configure(final Binder binder)
{
binder.bind(Key.get(ObjectMapper.class, Json.class)).toInstance(TestHelper.makeJsonMapper());
(Module) binder -> {
binder.bind(Key.get(ObjectMapper.class, Json.class)).toInstance(TestHelper.makeJsonMapper());
// This Module is just to get a LookupReferencesManager with a usable "lookyloo" lookup.
// This Module is just to get a LookupReferencesManager with a usable "lookyloo" lookup.
binder.bind(LookupReferencesManager.class).toInstance(
LookupEnabledTestExprMacroTable.createTestLookupReferencesManager(
ImmutableMap.of(
"a", "xa",
"abc", "xabc"
)
)
);
binder.bind(LookupReferencesManager.class).toInstance(
LookupEnabledTestExprMacroTable.createTestLookupReferencesManager(
ImmutableMap.of(
"a", "xa",
"abc", "xabc"
)
)
);
}
}
);
@ -349,14 +336,7 @@ public class CalciteTests
final Closer resourceCloser = Closer.create();
final CloseableStupidPool<ByteBuffer> stupidPool = new CloseableStupidPool<>(
"TopNQueryRunnerFactory-bufferPool",
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(10 * 1024 * 1024);
}
}
() -> ByteBuffer.allocate(10 * 1024 * 1024)
);
resourceCloser.register(stupidPool);
final Pair<GroupByQueryRunnerFactory, Closer> factoryCloserPair = GroupByQueryRunnerTest
@ -554,7 +534,7 @@ public class CalciteTests
{
try {
final Set<SqlOperatorConversion> extractionOperators = new HashSet<>();
extractionOperators.add(INJECTOR.getInstance(LookupOperatorConversion.class));
extractionOperators.add(INJECTOR.getInstance(QueryLookupOperatorConversion.class));
return new DruidOperatorTable(ImmutableSet.of(), extractionOperators);
}
catch (Exception e) {