SQL join support for lookups. (#9294)

* SQL join support for lookups.

1) Add LookupSchema to SQL, so lookups show up in the catalog.
2) Add join-related rels and rules to SQL, allowing joins to be planned into
   native Druid queries.

* Add two missing LookupSchema calls in tests.

* Fix tests.

* Fix typo.
This commit is contained in:
Gian Merlino 2020-01-31 23:51:16 -08:00 committed by GitHub
parent 660f8838f4
commit b411443d22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 2112 additions and 257 deletions

View File

@ -198,6 +198,7 @@ public class SqlBenchmark
plannerFactory = new PlannerFactory(
druidSchema,
CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate.lhs),
CalciteTests.createOperatorTable(),

View File

@ -121,6 +121,7 @@ public class SqlVsNativeBenchmark
plannerFactory = new PlannerFactory(
druidSchema,
CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
CalciteTests.createOperatorTable(),

View File

@ -160,6 +160,7 @@ public class TDigestSketchSqlAggregatorTest extends CalciteTestBase
sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
new PlannerFactory(
druidSchema,
CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,

View File

@ -188,6 +188,7 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
new PlannerFactory(
druidSchema,
CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,

View File

@ -189,6 +189,7 @@ public class DoublesSketchSqlAggregatorTest extends CalciteTestBase
sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
new PlannerFactory(
druidSchema,
CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,

View File

@ -186,6 +186,7 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
new PlannerFactory(
druidSchema,
CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,

View File

@ -202,6 +202,7 @@ public class BloomFilterSqlAggregatorTest extends InitializedNullHandlingTest
sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
new PlannerFactory(
druidSchema,
CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,

View File

@ -172,6 +172,7 @@ public class FixedBucketsHistogramQuantileSqlAggregatorTest extends CalciteTestB
sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
new PlannerFactory(
druidSchema,
CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,

View File

@ -171,6 +171,7 @@ public class QuantileSqlAggregatorTest extends CalciteTestBase
sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
new PlannerFactory(
druidSchema,
CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,

View File

@ -174,6 +174,7 @@ public class VarianceSqlAggregatorTest extends InitializedNullHandlingTest
sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
new PlannerFactory(
druidSchema,
CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,

View File

@ -36,7 +36,7 @@ public class MapJoinableFactory implements JoinableFactory
private final Map<Class<? extends DataSource>, JoinableFactory> joinableFactories;
@Inject
MapJoinableFactory(Map<Class<? extends DataSource>, JoinableFactory> joinableFactories)
public MapJoinableFactory(Map<Class<? extends DataSource>, JoinableFactory> joinableFactories)
{
// Accesses to IdentityHashMap should be faster than to HashMap or ImmutableMap.
// Class doesn't override Object.equals().

View File

@ -48,7 +48,7 @@ import org.joda.time.Interval;
public class ClientQuerySegmentWalker implements QuerySegmentWalker
{
private final ServiceEmitter emitter;
private final CachingClusteredClient baseClient;
private final QuerySegmentWalker baseClient;
private final QueryToolChestWarehouse warehouse;
private final RetryQueryRunnerConfig retryConfig;
private final ObjectMapper objectMapper;
@ -56,10 +56,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
private final Cache cache;
private final CacheConfig cacheConfig;
@Inject
public ClientQuerySegmentWalker(
ServiceEmitter emitter,
CachingClusteredClient baseClient,
QuerySegmentWalker baseClient,
QueryToolChestWarehouse warehouse,
RetryQueryRunnerConfig retryConfig,
ObjectMapper objectMapper,
@ -78,6 +77,30 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
this.cacheConfig = cacheConfig;
}
@Inject
ClientQuerySegmentWalker(
ServiceEmitter emitter,
CachingClusteredClient baseClient,
QueryToolChestWarehouse warehouse,
RetryQueryRunnerConfig retryConfig,
ObjectMapper objectMapper,
ServerConfig serverConfig,
Cache cache,
CacheConfig cacheConfig
)
{
this(
emitter,
(QuerySegmentWalker) baseClient,
warehouse,
retryConfig,
objectMapper,
serverConfig,
cache,
cacheConfig
);
}
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{

View File

@ -20,6 +20,7 @@
package org.apache.druid.sql.calcite.planner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.io.BaseEncoding;
import com.google.common.primitives.Chars;
import org.apache.calcite.jdbc.CalciteSchema;
@ -46,6 +47,7 @@ import org.apache.druid.segment.column.ValueType;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.sql.calcite.schema.DruidSchema;
import org.apache.druid.sql.calcite.schema.InformationSchema;
import org.apache.druid.sql.calcite.schema.LookupSchema;
import org.apache.druid.sql.calcite.schema.SystemSchema;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@ -55,8 +57,10 @@ import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.DateTimeFormatterBuilder;
import org.joda.time.format.ISODateTimeFormat;
import javax.annotation.Nullable;
import java.nio.charset.Charset;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.regex.Pattern;
/**
@ -104,12 +108,14 @@ public class Calcites
public static SchemaPlus createRootSchema(
final DruidSchema druidSchema,
final LookupSchema lookupSchema,
final SystemSchema systemSchema,
final AuthorizerMapper authorizerMapper
)
{
final SchemaPlus rootSchema = CalciteSchema.createRootSchema(false, false).plus();
rootSchema.add(DruidSchema.NAME, druidSchema);
rootSchema.add(LookupSchema.NAME, lookupSchema);
rootSchema.add(InformationSchema.NAME, new InformationSchema(rootSchema, authorizerMapper));
rootSchema.add(SystemSchema.NAME, systemSchema);
return rootSchema;
@ -137,6 +143,7 @@ public class Calcites
}
@Nullable
public static ValueType getValueTypeForSqlTypeName(SqlTypeName sqlTypeName)
{
if (SqlTypeName.FLOAT == sqlTypeName) {
@ -345,23 +352,23 @@ public class Calcites
}
/**
* Checks if a RexNode is a literal int or not. If this returns true, then {@code RexLiteral.intValue(literal)} can be
* used to get the value of the literal.
*
* @param rexNode the node
*
* @return true if this is an int
* Find a string that is either equal to "basePrefix", or basePrefix prepended by underscores, and where nothing in
* "strings" starts with prefix plus a digit.
*/
public static boolean isIntLiteral(final RexNode rexNode)
public static String findUnusedPrefixForDigits(final String basePrefix, final Iterable<String> strings)
{
return rexNode instanceof RexLiteral && SqlTypeName.INT_TYPES.contains(rexNode.getType().getSqlTypeName());
}
final NavigableSet<String> navigableStrings;
if (strings instanceof NavigableSet) {
navigableStrings = (NavigableSet<String>) strings;
} else {
navigableStrings = new TreeSet<>();
Iterables.addAll(navigableStrings, strings);
}
public static String findUnusedPrefix(final String basePrefix, final NavigableSet<String> strings)
{
String prefix = basePrefix;
while (!isUnusedPrefix(prefix, strings)) {
while (!isUnusedPrefix(prefix, navigableStrings)) {
prefix = "_" + prefix;
}

View File

@ -42,6 +42,7 @@ import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.sql.calcite.rel.QueryMaker;
import org.apache.druid.sql.calcite.schema.DruidSchema;
import org.apache.druid.sql.calcite.schema.LookupSchema;
import org.apache.druid.sql.calcite.schema.SystemSchema;
import java.util.Map;
@ -59,6 +60,7 @@ public class PlannerFactory
.build();
private final DruidSchema druidSchema;
private final LookupSchema lookupSchema;
private final SystemSchema systemSchema;
private final QueryLifecycleFactory queryLifecycleFactory;
private final DruidOperatorTable operatorTable;
@ -70,6 +72,7 @@ public class PlannerFactory
@Inject
public PlannerFactory(
final DruidSchema druidSchema,
final LookupSchema lookupSchema,
final SystemSchema systemSchema,
final QueryLifecycleFactory queryLifecycleFactory,
final DruidOperatorTable operatorTable,
@ -80,6 +83,7 @@ public class PlannerFactory
)
{
this.druidSchema = druidSchema;
this.lookupSchema = lookupSchema;
this.systemSchema = systemSchema;
this.queryLifecycleFactory = queryLifecycleFactory;
this.operatorTable = operatorTable;
@ -96,6 +100,7 @@ public class PlannerFactory
{
final SchemaPlus rootSchema = Calcites.createRootSchema(
druidSchema,
lookupSchema,
systemSchema,
authorizerMapper
);

View File

@ -101,7 +101,7 @@ public class Rules
JoinPushExpressionsRule.INSTANCE,
FilterAggregateTransposeRule.INSTANCE,
ProjectWindowTransposeRule.INSTANCE,
JoinCommuteRule.INSTANCE,
JoinCommuteRule.SWAP_OUTER,
JoinPushThroughJoinRule.RIGHT,
JoinPushThroughJoinRule.LEFT,
SortProjectTransposeRule.INSTANCE,
@ -130,13 +130,13 @@ public class Rules
AggregateValuesRule.INSTANCE
);
// Rules from VolcanoPlanner's registerAbstractRelationalRules.
// Rules from VolcanoPlanner's registerAbstractRelationalRules, minus JoinCommuteRule since it's already
// in DEFAULT_RULES.
private static final List<RelOptRule> VOLCANO_ABSTRACT_RULES =
ImmutableList.of(
FilterJoinRule.FILTER_ON_JOIN,
FilterJoinRule.JOIN,
AbstractConverter.ExpandConversionRule.INSTANCE,
JoinCommuteRule.INSTANCE,
AggregateRemoveRule.INSTANCE,
UnionToDistinctRule.INSTANCE,
ProjectRemoveRule.INSTANCE,

View File

@ -0,0 +1,344 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.rel;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.table.RowSignature;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* DruidRel that uses a {@link JoinDataSource}.
*/
public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
{
private static final TableDataSource DUMMY_DATA_SOURCE = new TableDataSource("__join__");
private static final double COST_FACTOR = 100.0;
private final PartialDruidQuery partialQuery;
private final Join joinRel;
private RelNode left;
private RelNode right;
private DruidJoinQueryRel(
RelOptCluster cluster,
RelTraitSet traitSet,
Join joinRel,
PartialDruidQuery partialQuery,
QueryMaker queryMaker
)
{
super(cluster, traitSet, queryMaker);
this.joinRel = joinRel;
this.left = joinRel.getLeft();
this.right = joinRel.getRight();
this.partialQuery = partialQuery;
}
public static DruidJoinQueryRel create(final Join joinRel, final QueryMaker queryMaker)
{
return new DruidJoinQueryRel(
joinRel.getCluster(),
joinRel.getTraitSet(),
joinRel,
PartialDruidQuery.create(joinRel),
queryMaker
);
}
@Override
public PartialDruidQuery getPartialDruidQuery()
{
return partialQuery;
}
@Override
public Sequence<Object[]> runQuery()
{
// runQuery doesn't need to finalize aggregations, because the fact that runQuery is happening suggests this
// is the outermost query and it will actually get run as a native query. Druid's native query layer will
// finalize aggregations for the outermost query even if we don't explicitly ask it to.
final DruidQuery query = toDruidQuery(false);
return getQueryMaker().runQuery(query);
}
@Override
public DruidJoinQueryRel withPartialQuery(final PartialDruidQuery newQueryBuilder)
{
return new DruidJoinQueryRel(
getCluster(),
getTraitSet().plusAll(newQueryBuilder.getRelTraits()),
joinRel,
newQueryBuilder,
getQueryMaker()
);
}
@Override
public int getQueryCount()
{
return ((DruidRel<?>) left).getQueryCount() + ((DruidRel<?>) right).getQueryCount();
}
@Override
public DruidQuery toDruidQuery(final boolean finalizeAggregations)
{
final DruidRel<?> leftDruidRel = (DruidRel<?>) left;
final DruidQuery leftQuery = Preconditions.checkNotNull((leftDruidRel).toDruidQuery(false), "leftQuery");
final RowSignature leftSignature = leftQuery.getOutputRowSignature();
final DataSource leftDataSource;
final DruidRel<?> rightDruidRel = (DruidRel<?>) right;
final DruidQuery rightQuery = Preconditions.checkNotNull(rightDruidRel.toDruidQuery(false), "rightQuery");
final RowSignature rightSignature = rightQuery.getOutputRowSignature();
final DataSource rightDataSource;
// Left rel: allow direct embedding of scans/mappings including those of joins.
if (DruidRels.isScanOrMapping(leftDruidRel, true)) {
leftDataSource = leftQuery.getDataSource();
} else {
leftDataSource = new QueryDataSource(leftQuery.getQuery());
}
// Right rel: allow direct embedding of scans/mappings, excluding joins (those must be done as subqueries).
if (DruidRels.isScanOrMapping(rightDruidRel, false)) {
rightDataSource = rightQuery.getDataSource();
} else {
rightDataSource = new QueryDataSource(rightQuery.getQuery());
}
final Pair<String, RowSignature> prefixSignaturePair = computeJoinRowSignature(leftSignature, rightSignature);
// Generate the condition for this join as a Druid expression.
final DruidExpression condition = Expressions.toDruidExpression(
getPlannerContext(),
prefixSignaturePair.rhs,
joinRel.getCondition()
);
// DruidJoinRule should not have created us if "condition" is null. Check defensively anyway, which also
// quiets static code analysis.
if (condition == null) {
throw new CannotBuildQueryException(joinRel, joinRel.getCondition());
}
return partialQuery.build(
JoinDataSource.create(
leftDataSource,
rightDataSource,
prefixSignaturePair.lhs,
condition.getExpression(),
toDruidJoinType(joinRel.getJoinType()),
getPlannerContext().getExprMacroTable()
),
prefixSignaturePair.rhs,
getPlannerContext(),
getCluster().getRexBuilder(),
finalizeAggregations
);
}
@Override
public DruidQuery toDruidQueryForExplaining()
{
return partialQuery.build(
DUMMY_DATA_SOURCE,
RowSignature.from(
joinRel.getRowType().getFieldNames(),
joinRel.getRowType()
),
getPlannerContext(),
getCluster().getRexBuilder(),
false
);
}
@Override
public DruidJoinQueryRel asDruidConvention()
{
return new DruidJoinQueryRel(
getCluster(),
getTraitSet().replace(DruidConvention.instance()),
joinRel.copy(
joinRel.getTraitSet(),
joinRel.getInputs()
.stream()
.map(input -> RelOptRule.convert(input, DruidConvention.instance()))
.collect(Collectors.toList())
),
partialQuery,
getQueryMaker()
);
}
@Override
public List<RelNode> getInputs()
{
return ImmutableList.of(left, right);
}
@Override
public void replaceInput(int ordinalInParent, RelNode p)
{
joinRel.replaceInput(ordinalInParent, p);
if (ordinalInParent == 0) {
this.left = p;
} else if (ordinalInParent == 1) {
this.right = p;
} else {
throw new IndexOutOfBoundsException(StringUtils.format("Invalid ordinalInParent[%s]", ordinalInParent));
}
}
@Override
public List<RexNode> getChildExps()
{
return ImmutableList.of(joinRel.getCondition());
}
@Override
public RelNode copy(final RelTraitSet traitSet, final List<RelNode> inputs)
{
return new DruidJoinQueryRel(
getCluster(),
traitSet,
joinRel.copy(joinRel.getTraitSet(), inputs),
getPartialDruidQuery(),
getQueryMaker()
);
}
@Override
public Set<String> getDataSourceNames()
{
final Set<String> retVal = new HashSet<>();
retVal.addAll(((DruidRel<?>) left).getDataSourceNames());
retVal.addAll(((DruidRel<?>) right).getDataSourceNames());
return retVal;
}
@Override
public RelWriter explainTerms(RelWriter pw)
{
final String queryString;
final DruidQuery druidQuery = toDruidQueryForExplaining();
try {
queryString = getQueryMaker().getJsonMapper().writeValueAsString(druidQuery.getQuery());
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return pw.input("left", left)
.input("right", right)
.item("condition", joinRel.getCondition())
.item("joinType", joinRel.getJoinType())
.item("query", queryString)
.item("signature", druidQuery.getOutputRowSignature());
}
@Override
protected RelDataType deriveRowType()
{
return partialQuery.getRowType();
}
@Override
public RelOptCost computeSelfCost(final RelOptPlanner planner, final RelMetadataQuery mq)
{
return planner.getCostFactory()
.makeCost(mq.getRowCount(left), 0, 0)
.plus(planner.getCostFactory().makeCost(mq.getRowCount(right), 0, 0))
.multiplyBy(COST_FACTOR);
}
private static JoinType toDruidJoinType(JoinRelType calciteJoinType)
{
switch (calciteJoinType) {
case LEFT:
return JoinType.LEFT;
case RIGHT:
return JoinType.RIGHT;
case FULL:
return JoinType.FULL;
case INNER:
return JoinType.INNER;
default:
throw new IAE("Cannot handle joinType[%s]", calciteJoinType);
}
}
/**
* Returns a Pair of "rightPrefix" (for JoinDataSource) and the signature of rows that will result from
* applying that prefix.
*/
private static Pair<String, RowSignature> computeJoinRowSignature(
final RowSignature leftSignature,
final RowSignature rightSignature
)
{
final RowSignature.Builder signatureBuilder = RowSignature.builder();
for (final String column : leftSignature.getRowOrder()) {
signatureBuilder.add(column, leftSignature.getColumnType(column));
}
// Need to include the "0" since findUnusedPrefixForDigits only guarantees safety for digit-initiated suffixes
final String rightPrefix = Calcites.findUnusedPrefixForDigits("j", leftSignature.getRowOrder()) + "0.";
for (final String column : rightSignature.getRowOrder()) {
signatureBuilder.add(rightPrefix + column, rightSignature.getColumnType(column));
}
return Pair.of(rightPrefix, signatureBuilder.build());
}
}

View File

@ -44,7 +44,7 @@ import java.util.List;
import java.util.Set;
/**
* DruidRel that uses a "query" dataSource.
* DruidRel that uses a {@link QueryDataSource}.
*/
public class DruidOuterQueryRel extends DruidRel<DruidOuterQueryRel>
{

View File

@ -117,7 +117,32 @@ public class DruidQuery
private final RelDataType outputRowType;
private final VirtualColumnRegistry virtualColumnRegistry;
public DruidQuery(
private DruidQuery(
final DataSource dataSource,
final PlannerContext plannerContext,
@Nullable final DimFilter filter,
@Nullable final Projection selectProjection,
@Nullable final Grouping grouping,
@Nullable final Sorting sorting,
final RowSignature sourceRowSignature,
final RelDataType outputRowType,
final VirtualColumnRegistry virtualColumnRegistry
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.plannerContext = Preconditions.checkNotNull(plannerContext, "plannerContext");
this.filter = filter;
this.selectProjection = selectProjection;
this.grouping = grouping;
this.sorting = sorting;
this.sourceRowSignature = Preconditions.checkNotNull(sourceRowSignature, "sourceRowSignature");
this.outputRowSignature = computeOutputRowSignature(sourceRowSignature, selectProjection, grouping, sorting);
this.outputRowType = Preconditions.checkNotNull(outputRowType, "outputRowType");
this.virtualColumnRegistry = Preconditions.checkNotNull(virtualColumnRegistry, "virtualColumnRegistry");
this.query = computeQuery();
}
public static DruidQuery fromPartialQuery(
final PartialDruidQuery partialQuery,
final DataSource dataSource,
final RowSignature sourceRowSignature,
@ -126,15 +151,17 @@ public class DruidQuery
final boolean finalizeAggregations
)
{
this.dataSource = dataSource;
this.outputRowType = partialQuery.leafRel().getRowType();
this.sourceRowSignature = sourceRowSignature;
this.virtualColumnRegistry = VirtualColumnRegistry.create(sourceRowSignature);
this.plannerContext = plannerContext;
final RelDataType outputRowType = partialQuery.leafRel().getRowType();
final VirtualColumnRegistry virtualColumnRegistry = VirtualColumnRegistry.create(sourceRowSignature);
// Now the fun begins.
final DimFilter filter;
final Projection selectProjection;
final Grouping grouping;
final Sorting sorting;
if (partialQuery.getWhereFilter() != null) {
this.filter = Preconditions.checkNotNull(
filter = Preconditions.checkNotNull(
computeWhereFilter(
partialQuery,
plannerContext,
@ -143,55 +170,64 @@ public class DruidQuery
)
);
} else {
this.filter = null;
filter = null;
}
// Only compute "selectProjection" if this is a non-aggregating query. (For aggregating queries, "grouping" will
// reflect select-project from partialQuery on its own.)
if (partialQuery.getSelectProject() != null && partialQuery.getAggregate() == null) {
this.selectProjection = Preconditions.checkNotNull(
selectProjection = Preconditions.checkNotNull(
computeSelectProjection(
partialQuery,
plannerContext,
computeOutputRowSignature(),
computeOutputRowSignature(sourceRowSignature, null, null, null),
virtualColumnRegistry
)
);
} else {
this.selectProjection = null;
selectProjection = null;
}
if (partialQuery.getAggregate() != null) {
this.grouping = Preconditions.checkNotNull(
grouping = Preconditions.checkNotNull(
computeGrouping(
partialQuery,
plannerContext,
computeOutputRowSignature(),
computeOutputRowSignature(sourceRowSignature, selectProjection, null, null),
virtualColumnRegistry,
rexBuilder,
finalizeAggregations
)
);
} else {
this.grouping = null;
grouping = null;
}
if (partialQuery.getSort() != null) {
this.sorting = Preconditions.checkNotNull(
sorting = Preconditions.checkNotNull(
computeSorting(
partialQuery,
plannerContext,
computeOutputRowSignature(),
computeOutputRowSignature(sourceRowSignature, selectProjection, grouping, null),
// When sorting follows grouping, virtual columns cannot be used
partialQuery.getAggregate() != null ? null : virtualColumnRegistry
)
);
} else {
this.sorting = null;
sorting = null;
}
this.outputRowSignature = computeOutputRowSignature();
this.query = computeQuery();
return new DruidQuery(
dataSource,
plannerContext,
filter,
selectProjection,
grouping,
sorting,
sourceRowSignature,
outputRowType,
virtualColumnRegistry
);
}
@Nonnull
@ -357,7 +393,7 @@ public class DruidQuery
{
final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate());
final List<DimensionExpression> dimensions = new ArrayList<>();
final String outputNamePrefix = Calcites.findUnusedPrefix("d", new TreeSet<>(rowSignature.getRowOrder()));
final String outputNamePrefix = Calcites.findUnusedPrefixForDigits("d", new TreeSet<>(rowSignature.getRowOrder()));
int outputNameCounter = 0;
for (int i : aggregate.getGroupSet()) {
@ -426,7 +462,7 @@ public class DruidQuery
{
final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate());
final List<Aggregation> aggregations = new ArrayList<>();
final String outputNamePrefix = Calcites.findUnusedPrefix("a", new TreeSet<>(rowSignature.getRowOrder()));
final String outputNamePrefix = Calcites.findUnusedPrefixForDigits("a", new TreeSet<>(rowSignature.getRowOrder()));
for (int i = 0; i < aggregate.getAggCallList().size(); i++) {
final String aggName = outputNamePrefix + i;
@ -525,6 +561,29 @@ public class DruidQuery
return Sorting.create(orderBys, limit, projection);
}
/**
* Return the {@link RowSignature} corresponding to the output of a query with the given parameters.
*/
private static RowSignature computeOutputRowSignature(
final RowSignature sourceRowSignature,
@Nullable final Projection selectProjection,
@Nullable final Grouping grouping,
@Nullable final Sorting sorting
)
{
if (sorting != null && sorting.getProjection() != null) {
return sorting.getProjection().getOutputRowSignature();
} else if (grouping != null) {
// Sanity check: cannot have both "grouping" and "selectProjection".
Preconditions.checkState(selectProjection == null, "Cannot have both 'grouping' and 'selectProjection'");
return grouping.getOutputRowSignature();
} else if (selectProjection != null) {
return selectProjection.getOutputRowSignature();
} else {
return sourceRowSignature;
}
}
private VirtualColumns getVirtualColumns(final boolean includeDimensions)
{
// 'sourceRowSignature' could provide a list of all defined virtual columns while constructing a query, but we
@ -570,6 +629,11 @@ public class DruidQuery
return VirtualColumns.create(columns);
}
public DataSource getDataSource()
{
return dataSource;
}
@Nullable
public Grouping getGrouping()
{
@ -591,26 +655,6 @@ public class DruidQuery
return query;
}
/**
* Return the {@link RowSignature} corresponding to the output of this query. This method may be called during
* construction, in which case it returns the output row signature at whatever phase of construction this method
* is called at. At the end of construction, the final result is assigned to {@link #outputRowSignature}.
*/
private RowSignature computeOutputRowSignature()
{
if (sorting != null && sorting.getProjection() != null) {
return sorting.getProjection().getOutputRowSignature();
} else if (grouping != null) {
// Sanity check: cannot have both "grouping" and "selectProjection".
Preconditions.checkState(selectProjection == null, "Cannot have both 'grouping' and 'selectProjection'");
return grouping.getOutputRowSignature();
} else if (selectProjection != null) {
return selectProjection.getOutputRowSignature();
} else {
return sourceRowSignature;
}
}
/**
* Return this query as some kind of Druid query. The returned query will either be {@link TopNQuery},
* {@link TimeseriesQuery}, {@link GroupByQuery}, {@link ScanQuery}

View File

@ -38,7 +38,7 @@ import javax.annotation.Nonnull;
import java.util.Set;
/**
* DruidRel that uses a "table" dataSource.
* DruidRel that operates on top of a {@link DruidTable} directly (no joining or subqueries).
*/
public class DruidQueryRel extends DruidRel<DruidQueryRel>
{

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.rel;
import org.apache.druid.query.DataSource;
import org.apache.druid.sql.calcite.table.DruidTable;
import java.util.Optional;
public class DruidRels
{
/**
* Returns the DataSource involved in a leaf query of class {@link DruidQueryRel}.
*/
public static Optional<DataSource> dataSourceIfLeafRel(final DruidRel<?> druidRel)
{
if (druidRel instanceof DruidQueryRel) {
return Optional.of(druidRel.getTable().unwrap(DruidTable.class).getDataSource());
} else {
return Optional.empty();
}
}
/**
* Check if a druidRel is a simple table scan, or a projection that merely remaps columns without transforming them.
* Like {@link #isScanOrProject} but more restrictive: only remappings are allowed.
*
* @param druidRel the rel to check
* @param canBeJoin consider a 'join' that doesn't do anything fancy to be a scan-or-mapping too.
*/
public static boolean isScanOrMapping(final DruidRel<?> druidRel, final boolean canBeJoin)
{
if (isScanOrProject(druidRel, canBeJoin)) {
// Like isScanOrProject, but don't allow transforming projections.
final PartialDruidQuery partialQuery = druidRel.getPartialDruidQuery();
return partialQuery.getSelectProject() == null || partialQuery.getSelectProject().isMapping();
} else {
return false;
}
}
/**
* Check if a druidRel is a simple table scan or a scan + projection.
*
* @param druidRel the rel to check
* @param canBeJoin consider a 'join' that doesn't do anything fancy to be a scan-or-mapping too.
*/
private static boolean isScanOrProject(final DruidRel<?> druidRel, final boolean canBeJoin)
{
if (druidRel instanceof DruidQueryRel || (canBeJoin && druidRel instanceof DruidJoinQueryRel)) {
final PartialDruidQuery partialQuery = druidRel.getPartialDruidQuery();
final PartialDruidQuery.Stage stage = partialQuery.stage();
return (stage == PartialDruidQuery.Stage.SCAN || stage == PartialDruidQuery.Stage.SELECT_PROJECT)
&& partialQuery.getWhereFilter() == null;
} else {
return false;
}
}
}

View File

@ -103,7 +103,7 @@ public class PartialDruidQuery
{
final Supplier<RelBuilder> builderSupplier = () -> RelFactories.LOGICAL_BUILDER.create(
scanRel.getCluster(),
scanRel.getTable().getRelOptSchema()
scanRel.getTable() != null ? scanRel.getTable().getRelOptSchema() : null
);
return new PartialDruidQuery(builderSupplier, scanRel, null, null, null, null, null, null, null);
}
@ -303,7 +303,14 @@ public class PartialDruidQuery
final boolean finalizeAggregations
)
{
return new DruidQuery(this, dataSource, sourceRowSignature, plannerContext, rexBuilder, finalizeAggregations);
return DruidQuery.fromPartialQuery(
this,
dataSource,
sourceRowSignature,
plannerContext,
rexBuilder,
finalizeAggregations
);
}
public boolean canAccept(final Stage stage)

View File

@ -197,7 +197,7 @@ public class Projection
)
{
final List<String> rowOrder = new ArrayList<>();
final String outputNamePrefix = Calcites.findUnusedPrefix(
final String outputNamePrefix = Calcites.findUnusedPrefixForDigits(
basePrefix,
new TreeSet<>(inputRowSignature.getRowOrder())
);

View File

@ -60,7 +60,7 @@ public class VirtualColumnRegistry
{
return new VirtualColumnRegistry(
rowSignature,
Calcites.findUnusedPrefix("v", new TreeSet<>(rowSignature.getRowOrder())),
Calcites.findUnusedPrefixForDigits("v", new TreeSet<>(rowSignature.getRowOrder())),
new HashMap<>(),
new HashMap<>()
);

View File

@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.rule;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.druid.query.DataSource;
import org.apache.druid.sql.calcite.rel.DruidJoinQueryRel;
import org.apache.druid.sql.calcite.rel.DruidRel;
import org.apache.druid.sql.calcite.rel.DruidRels;
import java.util.ArrayList;
import java.util.List;
import java.util.Stack;
public class DruidJoinRule extends RelOptRule
{
private static final DruidJoinRule INSTANCE = new DruidJoinRule();
private DruidJoinRule()
{
super(
operand(
Join.class,
operand(DruidRel.class, none()),
operand(DruidRel.class, none())
)
);
}
public static DruidJoinRule instance()
{
return INSTANCE;
}
@Override
public boolean matches(RelOptRuleCall call)
{
final Join join = call.rel(0);
final DruidRel<?> left = call.rel(1);
final DruidRel<?> right = call.rel(2);
// 1) Condition must be handleable.
// 2) Left must be a scan or a join.
// 3) If left is not a join, it must be concrete.
// 4) Right must be a scan (and *cannot* be a join; we want to generate left-heavy trees).
// 5) Right must be global.
return
canHandleCondition(join.getCondition(), join.getLeft().getRowType())
&& DruidRels.isScanOrMapping(left, true)
&& DruidRels.isScanOrMapping(right, false)
&& (left instanceof DruidJoinQueryRel
|| DruidRels.dataSourceIfLeafRel(left).filter(DataSource::isConcrete).isPresent())
&& DruidRels.dataSourceIfLeafRel(right).filter(DataSource::isGlobal).isPresent();
}
@Override
public void onMatch(RelOptRuleCall call)
{
final Join join = call.rel(0);
final DruidRel<?> left = call.rel(1);
// Preconditions were already verified in "matches".
call.transformTo(
DruidJoinQueryRel.create(join, left.getQueryMaker())
);
}
/**
* Returns true if this condition is an AND of equality conditions of the form: f(LeftRel) = RightColumn.
*
* @see org.apache.druid.segment.join.JoinConditionAnalysis where "equiCondition" is the same concept.
*/
@VisibleForTesting
static boolean canHandleCondition(final RexNode condition, final RelDataType leftRowType)
{
final List<RexNode> subConditions = decomposeAnd(condition);
for (RexNode subCondition : subConditions) {
if (subCondition.isA(SqlKind.LITERAL)) {
// Literals are always OK.
continue;
}
if (!subCondition.isA(SqlKind.EQUALS)) {
// If it's not EQUALS, it's not supported.
return false;
}
final List<RexNode> operands = ((RexCall) subCondition).getOperands();
Preconditions.checkState(operands.size() == 2, "Expected 2 operands, got[%,d]", operands.size());
final int numLeftFields = leftRowType.getFieldList().size();
final boolean rhsIsFieldOfRightRel =
operands.get(1).isA(SqlKind.INPUT_REF)
&& ((RexInputRef) operands.get(1)).getIndex() >= numLeftFields;
final boolean lhsIsExpressionOfLeftRel =
RelOptUtil.InputFinder.bits(operands.get(0)).intersects(ImmutableBitSet.range(numLeftFields));
if (!(lhsIsExpressionOfLeftRel && rhsIsFieldOfRightRel)) {
// Cannot handle this condition.
return false;
}
}
return true;
}
@VisibleForTesting
static List<RexNode> decomposeAnd(final RexNode condition)
{
final List<RexNode> retVal = new ArrayList<>();
final Stack<RexNode> stack = new Stack<>();
stack.push(condition);
while (!stack.empty()) {
final RexNode current = stack.pop();
if (current.isA(SqlKind.AND)) {
final List<RexNode> operands = ((RexCall) current).getOperands();
// Add right-to-left, so when we unwind the stack, the operands are in the original order.
for (int i = operands.size() - 1; i >= 0; i--) {
stack.push(operands.get(i));
}
} else {
retVal.add(current);
}
}
return retVal;
}
}

View File

@ -90,7 +90,8 @@ public class DruidRules
DruidOuterQueryRule.PROJECT_AGGREGATE,
DruidOuterQueryRule.AGGREGATE_SORT_PROJECT,
DruidUnionRule.instance(),
DruidSortUnionRule.instance()
DruidSortUnionRule.instance(),
DruidJoinRule.instance()
);
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.schema;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.join.lookup.LookupColumnSelectorFactory;
import org.apache.druid.sql.calcite.table.DruidTable;
import org.apache.druid.sql.calcite.table.RowSignature;
import java.util.Map;
/**
* Creates the "lookup" schema in Druid SQL, composed of all available {@link LookupDataSource}.
*/
public class LookupSchema extends AbstractSchema
{
public static final String NAME = "lookup";
private static final RowSignature ROW_SIGNATURE =
RowSignature.builder()
.add(LookupColumnSelectorFactory.KEY_COLUMN, ValueType.STRING)
.add(LookupColumnSelectorFactory.VALUE_COLUMN, ValueType.STRING)
.build();
private final LookupExtractorFactoryContainerProvider lookupProvider;
@Inject
public LookupSchema(final LookupExtractorFactoryContainerProvider lookupProvider)
{
this.lookupProvider = lookupProvider;
}
@Override
protected Map<String, Table> getTableMap()
{
final ImmutableMap.Builder<String, Table> tableMapBuilder = ImmutableMap.builder();
for (final String lookupName : lookupProvider.getAllLookupNames()) {
tableMapBuilder.put(lookupName, new DruidTable(new LookupDataSource(lookupName), ROW_SIGNATURE));
}
return tableMapBuilder.build();
}
}

View File

@ -187,6 +187,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
testRequestLogger = new TestRequestLogger();
final PlannerFactory plannerFactory = new PlannerFactory(
druidSchema,
CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
@ -827,6 +828,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
CalciteTests.createSqlLifecycleFactory(
new PlannerFactory(
druidSchema,
CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,

View File

@ -93,6 +93,7 @@ public class DruidStatementTest extends CalciteTestBase
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
final PlannerFactory plannerFactory = new PlannerFactory(
druidSchema,
CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,

View File

@ -32,7 +32,9 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Druids;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
@ -58,11 +60,13 @@ import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.SqlLifecycleFactory;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
@ -94,10 +98,13 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class BaseCalciteQueryTest extends CalciteTestBase
{
public static final String NULL_VALUE = NullHandling.replaceWithDefault() ? "" : null;
public static final String NULL_STRING = NullHandling.defaultStringValue();
public static final Float NULL_FLOAT = NullHandling.defaultFloatValue();
public static final Long NULL_LONG = NullHandling.defaultLongValue();
public static final String HLLC_STRING = VersionOneHyperLogLogCollector.class.getName();
public static final Logger log = new Logger(BaseCalciteQueryTest.class);
@ -329,7 +336,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
false,
true,
null,
null,
null,
StringComparators.NUMERIC
);
}
@ -363,6 +370,29 @@ public class BaseCalciteQueryTest extends CalciteTestBase
return new ExpressionVirtualColumn(name, expression, outputType, CalciteTests.createExprMacroTable());
}
public static JoinDataSource join(
DataSource left,
DataSource right,
String rightPrefix,
String condition,
JoinType joinType
)
{
return JoinDataSource.create(
left,
right,
rightPrefix,
condition,
joinType,
CalciteTests.createExprMacroTable()
);
}
public static String equalsCondition(DruidExpression left, DruidExpression right)
{
return StringUtils.format("(%s == %s)", left.getExpression(), right.getExpression());
}
public static ExpressionPostAggregator expressionPostAgg(final String name, final String expression)
{
return new ExpressionPostAggregator(name, expression, null, CalciteTests.createExprMacroTable());
@ -371,7 +401,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
public static Druids.ScanQueryBuilder newScanQueryBuilder()
{
return new Druids.ScanQueryBuilder().resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false);
.legacy(false);
}
@BeforeClass
@ -501,17 +531,31 @@ public class BaseCalciteQueryTest extends CalciteTestBase
testQuery(plannerConfig, QUERY_CONTEXT_DEFAULT, sql, authenticationResult, expectedQueries, expectedResults);
}
private Query recursivelyOverrideContext(final Query q, final Map<String, Object> context)
/**
* Override not just the outer query context, but also the contexts of all subqueries.
*/
private <T> Query<T> recursivelyOverrideContext(final Query<T> query, final Map<String, Object> context)
{
final Query q2;
if (q.getDataSource() instanceof QueryDataSource) {
final Query subQuery = ((QueryDataSource) q.getDataSource()).getQuery();
q2 = q.withDataSource(new QueryDataSource(recursivelyOverrideContext(subQuery, context)));
} else {
q2 = q;
}
return query.withDataSource(recursivelyOverrideContext(query.getDataSource(), context))
.withOverriddenContext(context);
}
return q2.withOverriddenContext(context);
/**
* Override the contexts of all subqueries of a particular datasource.
*/
private DataSource recursivelyOverrideContext(final DataSource dataSource, final Map<String, Object> context)
{
if (dataSource instanceof QueryDataSource) {
final Query subquery = ((QueryDataSource) dataSource).getQuery();
return new QueryDataSource(recursivelyOverrideContext(subquery, context));
} else {
return dataSource.withChildren(
dataSource.getChildren()
.stream()
.map(ds -> recursivelyOverrideContext(ds, context))
.collect(Collectors.toList())
);
}
}
public void testQuery(
@ -595,6 +639,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
final PlannerFactory plannerFactory = new PlannerFactory(
druidSchema,
CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,

View File

@ -32,8 +32,10 @@ import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.Druids;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
@ -82,6 +84,7 @@ import org.apache.druid.query.topn.InvertedTopNMetricSpec;
import org.apache.druid.query.topn.NumericTopNMetricSpec;
import org.apache.druid.query.topn.TopNQueryBuilder;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.Calcites;
@ -318,6 +321,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
"SELECT DISTINCT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA",
ImmutableList.of(),
ImmutableList.of(
new Object[]{"lookup"},
new Object[]{"druid"},
new Object[]{"sys"},
new Object[]{"INFORMATION_SCHEMA"}
@ -344,6 +348,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE"})
.add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE"})
.add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"})
.add(new Object[]{"lookup", "lookyloo", "TABLE"})
.add(new Object[]{"sys", "segments", "SYSTEM_TABLE"})
.add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"})
.add(new Object[]{"sys", "servers", "SYSTEM_TABLE"})
@ -371,6 +376,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE"})
.add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE"})
.add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"})
.add(new Object[]{"lookup", "lookyloo", "TABLE"})
.add(new Object[]{"sys", "segments", "SYSTEM_TABLE"})
.add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"})
.add(new Object[]{"sys", "servers", "SYSTEM_TABLE"})
@ -488,7 +494,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
@Test
public void testSelectStar() throws Exception
{
String hyperLogLogCollectorClassName = HLLC_STRING;
testQuery(
PLANNER_CONFIG_DEFAULT_NO_COMPLEX_SERDE,
QUERY_CONTEXT_DEFAULT,
@ -504,23 +509,12 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.build()
),
ImmutableList.of(
new Object[]{timestamp("2000-01-01"), 1L, "", "a", "[\"a\",\"b\"]", 1f, 1.0, hyperLogLogCollectorClassName},
new Object[]{
timestamp("2000-01-02"), 1L, "10.1", NULL_VALUE, "[\"b\",\"c\"]", 2f, 2.0, hyperLogLogCollectorClassName
},
new Object[]{timestamp("2000-01-03"), 1L, "2", "", "d", 3f, 3.0, hyperLogLogCollectorClassName},
new Object[]{timestamp("2001-01-01"), 1L, "1", "a", "", 4f, 4.0, hyperLogLogCollectorClassName},
new Object[]{timestamp("2001-01-02"), 1L, "def", "abc", NULL_VALUE, 5f, 5.0, hyperLogLogCollectorClassName},
new Object[]{
timestamp("2001-01-03"),
1L,
"abc",
NULL_VALUE,
NULL_VALUE,
6f,
6.0,
hyperLogLogCollectorClassName
}
new Object[]{timestamp("2000-01-01"), 1L, "", "a", "[\"a\",\"b\"]", 1f, 1.0, HLLC_STRING},
new Object[]{timestamp("2000-01-02"), 1L, "10.1", NULL_STRING, "[\"b\",\"c\"]", 2f, 2.0, HLLC_STRING},
new Object[]{timestamp("2000-01-03"), 1L, "2", "", "d", 3f, 3.0, HLLC_STRING},
new Object[]{timestamp("2001-01-01"), 1L, "1", "a", "", 4f, 4.0, HLLC_STRING},
new Object[]{timestamp("2001-01-02"), 1L, "def", "abc", NULL_STRING, 5f, 5.0, HLLC_STRING},
new Object[]{timestamp("2001-01-03"), 1L, "abc", NULL_STRING, NULL_STRING, 6f, 6.0, HLLC_STRING}
)
);
}
@ -617,7 +611,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
),
ImmutableList.of(
new Object[]{timestamp("2000-01-01"), 1L, "", "a", "[\"a\",\"b\"]", 1.0f, 1.0, HLLC_STRING},
new Object[]{timestamp("2000-01-02"), 1L, "10.1", NULL_VALUE, "[\"b\",\"c\"]", 2.0f, 2.0, HLLC_STRING}
new Object[]{timestamp("2000-01-02"), 1L, "10.1", NULL_STRING, "[\"b\",\"c\"]", 2.0f, 2.0, HLLC_STRING}
)
);
}
@ -642,7 +636,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
),
ImmutableList.of(
new Object[]{"a"},
new Object[]{NULL_VALUE}
new Object[]{NULL_STRING}
)
);
}
@ -691,8 +685,8 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.build()
),
ImmutableList.of(
new Object[]{timestamp("2001-01-03"), 1L, "abc", NULL_VALUE, NULL_VALUE, 6f, 6d, HLLC_STRING},
new Object[]{timestamp("2001-01-02"), 1L, "def", "abc", NULL_VALUE, 5f, 5d, HLLC_STRING}
new Object[]{timestamp("2001-01-03"), 1L, "abc", NULL_STRING, NULL_STRING, 6f, 6d, HLLC_STRING},
new Object[]{timestamp("2001-01-02"), 1L, "def", "abc", NULL_STRING, 5f, 5d, HLLC_STRING}
)
);
}
@ -718,11 +712,11 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
),
ImmutableList.of(
new Object[]{timestamp("2000-01-01"), 1L, "", "a", "[\"a\",\"b\"]", 1f, 1.0, HLLC_STRING},
new Object[]{timestamp("2000-01-02"), 1L, "10.1", NULL_VALUE, "[\"b\",\"c\"]", 2f, 2.0, HLLC_STRING},
new Object[]{timestamp("2000-01-02"), 1L, "10.1", NULL_STRING, "[\"b\",\"c\"]", 2f, 2.0, HLLC_STRING},
new Object[]{timestamp("2000-01-03"), 1L, "2", "", "d", 3f, 3.0, HLLC_STRING},
new Object[]{timestamp("2001-01-01"), 1L, "1", "a", "", 4f, 4.0, HLLC_STRING},
new Object[]{timestamp("2001-01-02"), 1L, "def", "abc", NULL_VALUE, 5f, 5.0, HLLC_STRING},
new Object[]{timestamp("2001-01-03"), 1L, "abc", NULL_VALUE, NULL_VALUE, 6f, 6.0, HLLC_STRING}
new Object[]{timestamp("2001-01-02"), 1L, "def", "abc", NULL_STRING, 5f, 5.0, HLLC_STRING},
new Object[]{timestamp("2001-01-03"), 1L, "abc", NULL_STRING, NULL_STRING, 6f, 6.0, HLLC_STRING}
)
);
}
@ -744,7 +738,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
),
ImmutableList.of(
new Object[]{"a", "a"},
new Object[]{NULL_VALUE, NULL_VALUE}
new Object[]{NULL_STRING, NULL_STRING}
)
);
}
@ -2612,10 +2606,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.build()
),
ImmutableList.of(
new Object[]{"", NULL_VALUE},
new Object[]{"1", NULL_VALUE},
new Object[]{"", NULL_STRING},
new Object[]{"1", NULL_STRING},
new Object[]{"10.1", "0.1"},
new Object[]{"2", NULL_VALUE},
new Object[]{"2", NULL_STRING},
new Object[]{"abc", "bc"},
new Object[]{"def", "ef"}
)
@ -2664,9 +2658,9 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
new Object[]{"10.1", "0.1"},
new Object[]{"abc", "bc"},
new Object[]{"def", "ef"},
new Object[]{"1", NULL_VALUE},
new Object[]{"2", NULL_VALUE},
new Object[]{"", NULL_VALUE}
new Object[]{"1", NULL_STRING},
new Object[]{"2", NULL_STRING},
new Object[]{"", NULL_STRING}
)
);
}
@ -2694,10 +2688,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.build()
),
ImmutableList.of(
new Object[]{"", NULL_VALUE},
new Object[]{"1", NULL_VALUE},
new Object[]{"", NULL_STRING},
new Object[]{"1", NULL_STRING},
new Object[]{"10.1", "0.1"},
new Object[]{"2", NULL_VALUE},
new Object[]{"2", NULL_STRING},
new Object[]{"abc", "bc"},
new Object[]{"def", "ef"}
)
@ -2735,9 +2729,9 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
new Object[]{"10.1", "0.1"},
new Object[]{"abc", "bc"},
new Object[]{"def", "ef"},
new Object[]{"1", NULL_VALUE},
new Object[]{"2", NULL_VALUE},
new Object[]{"", NULL_VALUE}
new Object[]{"1", NULL_STRING},
new Object[]{"2", NULL_STRING},
new Object[]{"", NULL_STRING}
)
);
}
@ -3452,7 +3446,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
ImmutableList.of(
new Object[]{timestamp("2000-01-01"), 1L, "", "a", "[\"a\",\"b\"]", 1.0f, 1.0d, HLLC_STRING},
new Object[]{timestamp("2001-01-01"), 1L, "1", "a", "", 4.0f, 4.0d, HLLC_STRING},
new Object[]{timestamp("2001-01-02"), 1L, "def", "abc", NULL_VALUE, 5.0f, 5.0d, HLLC_STRING}
new Object[]{timestamp("2001-01-02"), 1L, "def", "abc", NULL_STRING, 5.0f, 5.0d, HLLC_STRING}
)
);
}
@ -7601,6 +7595,416 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@Test
public void testFilterAndGroupByLookupUsingJoinOperatorAllowNulls() throws Exception
{
// Cannot vectorize JOIN operator.
cannotVectorize();
testQuery(
"SELECT lookyloo.v, COUNT(*)\n"
+ "FROM foo LEFT JOIN lookup.lookyloo ON foo.dim2 = lookyloo.k\n"
+ "WHERE lookyloo.v <> 'xa' OR lookyloo.v IS NULL\n"
+ "GROUP BY lookyloo.v",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new LookupDataSource("lookyloo"),
"j0.",
equalsCondition(DruidExpression.fromColumn("dim2"), DruidExpression.fromColumn("j0.k")),
JoinType.LEFT
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimFilter(or(not(selector("j0.v", "xa", null)), selector("j0.v", null, null)))
.setDimensions(dimensions(new DefaultDimensionSpec("j0.v", "d0")))
.setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{NULL_STRING, 3L},
new Object[]{"xabc", 1L}
)
);
}
@Test
public void testFilterAndGroupByLookupUsingJoinOperator() throws Exception
{
// Cannot vectorize JOIN operator.
cannotVectorize();
testQuery(
"SELECT lookyloo.v, COUNT(*)\n"
+ "FROM foo LEFT JOIN lookup.lookyloo ON foo.dim2 = lookyloo.k\n"
+ "WHERE lookyloo.v <> 'xa'\n"
+ "GROUP BY lookyloo.v",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new LookupDataSource("lookyloo"),
"j0.",
equalsCondition(DruidExpression.fromColumn("dim2"), DruidExpression.fromColumn("j0.k")),
JoinType.LEFT
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setDimFilter(not(selector("j0.v", "xa", null)))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("j0.v", "d0")))
.setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{NULL_STRING, 3L},
new Object[]{"xabc", 1L}
)
);
}
@Test
public void testFilterAndGroupByLookupUsingJoinOperatorBackwards() throws Exception
{
// Cannot vectorize JOIN operator.
cannotVectorize();
// Like "testFilterAndGroupByLookupUsingJoinOperator", but with the table and lookup reversed.
testQuery(
"SELECT lookyloo.v, COUNT(*)\n"
+ "FROM lookup.lookyloo RIGHT JOIN foo ON foo.dim2 = lookyloo.k\n"
+ "WHERE lookyloo.v <> 'xa'\n"
+ "GROUP BY lookyloo.v",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new LookupDataSource("lookyloo"),
"j0.",
equalsCondition(DruidExpression.fromColumn("dim2"), DruidExpression.fromColumn("j0.k")),
JoinType.LEFT
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setDimFilter(not(selector("j0.v", "xa", null)))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("j0.v", "d0")))
.setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{NULL_STRING, 3L},
new Object[]{"xabc", 1L}
)
);
}
@Test
public void testGroupByInnerJoinOnLookupUsingJoinOperator() throws Exception
{
// Cannot vectorize JOIN operator.
cannotVectorize();
testQuery(
"SELECT lookyloo.v, COUNT(*)\n"
+ "FROM foo INNER JOIN lookup.lookyloo ON foo.dim1 = lookyloo.k\n"
+ "GROUP BY lookyloo.v",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new LookupDataSource("lookyloo"),
"j0.",
equalsCondition(DruidExpression.fromColumn("dim1"), DruidExpression.fromColumn("j0.k")),
JoinType.INNER
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("j0.v", "d0")))
.setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"xabc", 1L}
)
);
}
@Test
public void testSelectOnLookupUsingInnerJoinOperator() throws Exception
{
testQuery(
"SELECT dim2, lookyloo.*\n"
+ "FROM foo INNER JOIN lookup.lookyloo ON foo.dim2 = lookyloo.k\n",
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new LookupDataSource("lookyloo"),
"j0.",
equalsCondition(DruidExpression.fromColumn("dim2"), DruidExpression.fromColumn("j0.k")),
JoinType.INNER
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("dim2", "j0.k", "j0.v")
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"a", "a", "xa"},
new Object[]{"a", "a", "xa"},
new Object[]{"abc", "abc", "xabc"}
)
);
}
@Test
public void testLeftJoinTwoLookupsUsingJoinOperator() throws Exception
{
testQuery(
"SELECT dim1, dim2, l1.v, l2.v\n"
+ "FROM foo\n"
+ "LEFT JOIN lookup.lookyloo l1 ON foo.dim1 = l1.k\n"
+ "LEFT JOIN lookup.lookyloo l2 ON foo.dim2 = l2.k\n",
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new LookupDataSource("lookyloo"),
"j0.",
equalsCondition(DruidExpression.fromColumn("dim1"), DruidExpression.fromColumn("j0.k")),
JoinType.LEFT
),
new LookupDataSource("lookyloo"),
"_j0.",
equalsCondition(DruidExpression.fromColumn("dim2"), DruidExpression.fromColumn("_j0.k")),
JoinType.LEFT
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("_j0.v", "dim1", "dim2", "j0.v")
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"", "a", NULL_STRING, "xa"},
new Object[]{"10.1", NULL_STRING, NULL_STRING, NULL_STRING},
new Object[]{"2", "", NULL_STRING, NULL_STRING},
new Object[]{"1", "a", NULL_STRING, "xa"},
new Object[]{"def", "abc", NULL_STRING, "xabc"},
new Object[]{"abc", NULL_STRING, "xabc", NULL_STRING}
)
);
}
@Test
public void testLeftJoinLookupOntoLookupUsingJoinOperator() throws Exception
{
testQuery(
"SELECT dim2, l1.v, l2.v\n"
+ "FROM foo\n"
+ "LEFT JOIN lookup.lookyloo l1 ON foo.dim2 = l1.k\n"
+ "LEFT JOIN lookup.lookyloo l2 ON l1.k = l2.k",
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new LookupDataSource("lookyloo"),
"j0.",
equalsCondition(DruidExpression.fromColumn("dim2"), DruidExpression.fromColumn("j0.k")),
JoinType.LEFT
),
new LookupDataSource("lookyloo"),
"_j0.",
equalsCondition(DruidExpression.fromColumn("j0.k"), DruidExpression.fromColumn("_j0.k")),
JoinType.LEFT
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("_j0.v", "dim2", "j0.v")
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"a", "xa", "xa"},
new Object[]{NULL_STRING, NULL_STRING, NULL_STRING},
new Object[]{"", NULL_STRING, NULL_STRING},
new Object[]{"a", "xa", "xa"},
new Object[]{"abc", "xabc", "xabc"},
new Object[]{NULL_STRING, NULL_STRING, NULL_STRING}
)
);
}
@Test
public void testLeftJoinThreeLookupsUsingJoinOperator() throws Exception
{
testQuery(
"SELECT dim1, dim2, l1.v, l2.v, l3.v\n"
+ "FROM foo\n"
+ "LEFT JOIN lookup.lookyloo l1 ON foo.dim1 = l1.k\n"
+ "LEFT JOIN lookup.lookyloo l2 ON foo.dim2 = l2.k\n"
+ "LEFT JOIN lookup.lookyloo l3 ON l2.k = l3.k",
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
join(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new LookupDataSource("lookyloo"),
"j0.",
equalsCondition(DruidExpression.fromColumn("dim1"), DruidExpression.fromColumn("j0.k")),
JoinType.LEFT
),
new LookupDataSource("lookyloo"),
"_j0.",
equalsCondition(DruidExpression.fromColumn("dim2"), DruidExpression.fromColumn("_j0.k")),
JoinType.LEFT
),
new LookupDataSource("lookyloo"),
"__j0.",
equalsCondition(DruidExpression.fromColumn("_j0.k"), DruidExpression.fromColumn("__j0.k")),
JoinType.LEFT
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__j0.v", "_j0.v", "dim1", "dim2", "j0.v")
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"", "a", NULL_STRING, "xa", "xa"},
new Object[]{"10.1", NULL_STRING, NULL_STRING, NULL_STRING, NULL_STRING},
new Object[]{"2", "", NULL_STRING, NULL_STRING, NULL_STRING},
new Object[]{"1", "a", NULL_STRING, "xa", "xa"},
new Object[]{"def", "abc", NULL_STRING, "xabc", "xabc"},
new Object[]{"abc", NULL_STRING, "xabc", NULL_STRING, NULL_STRING}
)
);
}
@Test
public void testSelectOnLookupUsingLeftJoinOperator() throws Exception
{
testQuery(
"SELECT dim1, lookyloo.*\n"
+ "FROM foo LEFT JOIN lookup.lookyloo ON foo.dim1 = lookyloo.k\n"
+ "WHERE lookyloo.v <> 'xxx' OR lookyloo.v IS NULL",
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new LookupDataSource("lookyloo"),
"j0.",
equalsCondition(DruidExpression.fromColumn("dim1"), DruidExpression.fromColumn("j0.k")),
JoinType.LEFT
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(or(not(selector("j0.v", "xxx", null)), selector("j0.v", null, null)))
.columns("dim1", "j0.k", "j0.v")
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"", NULL_STRING, NULL_STRING},
new Object[]{"10.1", NULL_STRING, NULL_STRING},
new Object[]{"2", NULL_STRING, NULL_STRING},
new Object[]{"1", NULL_STRING, NULL_STRING},
new Object[]{"def", NULL_STRING, NULL_STRING},
new Object[]{"abc", "abc", "xabc"}
)
);
}
@Test
public void testSelectOnLookupUsingRightJoinOperator() throws Exception
{
testQuery(
"SELECT dim1, lookyloo.*\n"
+ "FROM foo RIGHT JOIN lookup.lookyloo ON foo.dim1 = lookyloo.k\n"
+ "WHERE lookyloo.v <> 'xxx' OR lookyloo.v IS NULL",
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new LookupDataSource("lookyloo"),
"j0.",
equalsCondition(DruidExpression.fromColumn("dim1"), DruidExpression.fromColumn("j0.k")),
JoinType.RIGHT
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(or(not(selector("j0.v", "xxx", null)), selector("j0.v", null, null)))
.columns("dim1", "j0.k", "j0.v")
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"abc", "abc", "xabc"},
new Object[]{NULL_STRING, "a", "xa"},
new Object[]{NULL_STRING, "nosuchkey", "mysteryvalue"}
)
);
}
@Test
public void testSelectOnLookupUsingFullJoinOperator() throws Exception
{
testQuery(
"SELECT dim1, m1, cnt, lookyloo.*\n"
+ "FROM foo FULL JOIN lookup.lookyloo ON foo.dim1 = lookyloo.k\n"
+ "WHERE lookyloo.v <> 'xxx' OR lookyloo.v IS NULL",
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new LookupDataSource("lookyloo"),
"j0.",
equalsCondition(DruidExpression.fromColumn("dim1"), DruidExpression.fromColumn("j0.k")),
JoinType.FULL
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(or(not(selector("j0.v", "xxx", null)), selector("j0.v", null, null)))
.columns("cnt", "dim1", "j0.k", "j0.v", "m1")
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"", 1f, 1L, NULL_STRING, NULL_STRING},
new Object[]{"10.1", 2f, 1L, NULL_STRING, NULL_STRING},
new Object[]{"2", 3f, 1L, NULL_STRING, NULL_STRING},
new Object[]{"1", 4f, 1L, NULL_STRING, NULL_STRING},
new Object[]{"def", 5f, 1L, NULL_STRING, NULL_STRING},
new Object[]{"abc", 6f, 1L, "abc", "xabc"},
new Object[]{NULL_STRING, NULL_FLOAT, NULL_LONG, "a", "xa"},
new Object[]{NULL_STRING, NULL_FLOAT, NULL_LONG, "nosuchkey", "mysteryvalue"}
)
);
}
@Test
public void testCountDistinctOfLookup() throws Exception
{
@ -7641,6 +8045,46 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@Test
public void testCountDistinctOfLookupUsingJoinOperator() throws Exception
{
// Cannot yet vectorize the JOIN operator.
cannotVectorize();
testQuery(
"SELECT COUNT(DISTINCT lookyloo.v)\n"
+ "FROM foo LEFT JOIN lookup.lookyloo ON foo.dim1 = lookyloo.k",
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new LookupDataSource("lookyloo"),
"j0.",
equalsCondition(DruidExpression.fromColumn("dim1"), DruidExpression.fromColumn("j0.k")),
JoinType.LEFT
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.aggregators(aggregators(
new CardinalityAggregatorFactory(
"a0",
null,
ImmutableList.of(DefaultDimensionSpec.of("j0.v")),
false,
true
)
))
.context(TIMESERIES_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{NullHandling.replaceWithDefault() ? 2L : 1L}
)
);
}
@Test
public void testTimeseries() throws Exception
{
@ -10648,8 +11092,8 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
);
}
@Test
public void testQueryContextOuterLimit() throws Exception

View File

@ -147,6 +147,7 @@ public class SqlResourceTest extends CalciteTestBase
final PlannerFactory plannerFactory = new PlannerFactory(
druidSchema,
CalciteTests.createMockLookupSchema(),
systemSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,

View File

@ -42,14 +42,14 @@ public class CalcitesTest extends CalciteTestBase
@Test
public void testFindUnusedPrefix()
{
Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar")));
Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x")));
Assert.assertEquals("_x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x0")));
Assert.assertEquals("_x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x4")));
Assert.assertEquals("__x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", "x0")));
Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", " x")));
Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "_xbxx")));
Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x")));
Assert.assertEquals("__x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "x1a", "_x90")));
Assert.assertEquals("x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "bar")));
Assert.assertEquals("x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "bar", "x")));
Assert.assertEquals("_x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "bar", "x0")));
Assert.assertEquals("_x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "bar", "x4")));
Assert.assertEquals("__x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", "x0")));
Assert.assertEquals("x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", " x")));
Assert.assertEquals("x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "_xbxx")));
Assert.assertEquals("x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "xa", "_x")));
Assert.assertEquals("__x", Calcites.findUnusedPrefixForDigits("x", ImmutableSortedSet.of("foo", "x1a", "_x90")));
}
}

View File

@ -0,0 +1,241 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.rel;
import com.google.common.collect.ImmutableSet;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Project;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nullable;
public class DruidRelsTest
{
@Test
public void test_isScanOrMapping_scan()
{
final DruidRel<?> rel = mockDruidRel(DruidQueryRel.class, PartialDruidQuery.Stage.SCAN, null, null);
Assert.assertTrue(DruidRels.isScanOrMapping(rel, true));
Assert.assertTrue(DruidRels.isScanOrMapping(rel, false));
EasyMock.verify(rel, rel.getPartialDruidQuery());
}
@Test
public void test_isScanOrMapping_scanJoin()
{
final DruidRel<?> rel = mockDruidRel(DruidJoinQueryRel.class, PartialDruidQuery.Stage.SCAN, null, null);
Assert.assertTrue(DruidRels.isScanOrMapping(rel, true));
Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
EasyMock.verify(rel, rel.getPartialDruidQuery());
}
@Test
public void test_isScanOrMapping_scanQuery()
{
final DruidRel<?> rel = mockDruidRel(DruidOuterQueryRel.class, PartialDruidQuery.Stage.SCAN, null, null);
Assert.assertFalse(DruidRels.isScanOrMapping(rel, true));
Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
EasyMock.verify(rel, rel.getPartialDruidQuery());
}
@Test
public void test_isScanOrMapping_mapping()
{
final Project project = mockProject(true);
final DruidRel<?> rel = mockDruidRel(
DruidQueryRel.class,
PartialDruidQuery.Stage.SELECT_PROJECT,
project,
null
);
Assert.assertTrue(DruidRels.isScanOrMapping(rel, true));
Assert.assertTrue(DruidRels.isScanOrMapping(rel, false));
EasyMock.verify(rel, rel.getPartialDruidQuery(), project);
}
@Test
public void test_isScanOrMapping_mappingJoin()
{
final Project project = mockProject(true);
final DruidRel<?> rel = mockDruidRel(
DruidJoinQueryRel.class,
PartialDruidQuery.Stage.SELECT_PROJECT,
project,
null
);
Assert.assertTrue(DruidRels.isScanOrMapping(rel, true));
Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
EasyMock.verify(rel, rel.getPartialDruidQuery(), project);
}
@Test
public void test_isScanOrMapping_nonMapping()
{
final Project project = mockProject(false);
final DruidRel<?> rel = mockDruidRel(
DruidQueryRel.class,
PartialDruidQuery.Stage.SELECT_PROJECT,
project,
null
);
Assert.assertFalse(DruidRels.isScanOrMapping(rel, true));
Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
EasyMock.verify(rel, rel.getPartialDruidQuery(), project);
}
@Test
public void test_isScanOrMapping_nonMappingJoin()
{
final Project project = mockProject(false);
final DruidRel<?> rel = mockDruidRel(
DruidJoinQueryRel.class,
PartialDruidQuery.Stage.SELECT_PROJECT,
project,
null
);
Assert.assertFalse(DruidRels.isScanOrMapping(rel, true));
Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
EasyMock.verify(rel, rel.getPartialDruidQuery(), project);
}
@Test
public void test_isScanOrMapping_filterThenProject()
{
final Project project = mockProject(true);
final DruidRel<?> rel = mockDruidRel(
DruidQueryRel.class,
PartialDruidQuery.Stage.SELECT_PROJECT,
project,
mockFilter()
);
Assert.assertFalse(DruidRels.isScanOrMapping(rel, true));
Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
EasyMock.verify(rel, rel.getPartialDruidQuery(), project);
}
@Test
public void test_isScanOrMapping_filterThenProjectJoin()
{
final Project project = mockProject(true);
final DruidRel<?> rel = mockDruidRel(
DruidJoinQueryRel.class,
PartialDruidQuery.Stage.SELECT_PROJECT,
project,
mockFilter()
);
Assert.assertFalse(DruidRels.isScanOrMapping(rel, true));
Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
EasyMock.verify(rel, rel.getPartialDruidQuery(), project);
}
@Test
public void test_isScanOrMapping_filter()
{
final DruidRel<?> rel = mockDruidRel(
DruidQueryRel.class,
PartialDruidQuery.Stage.WHERE_FILTER,
null,
mockFilter()
);
Assert.assertFalse(DruidRels.isScanOrMapping(rel, true));
Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
EasyMock.verify(rel, rel.getPartialDruidQuery());
}
@Test
public void test_isScanOrMapping_filterJoin()
{
final DruidRel<?> rel = mockDruidRel(
DruidJoinQueryRel.class,
PartialDruidQuery.Stage.WHERE_FILTER,
null,
mockFilter()
);
Assert.assertFalse(DruidRels.isScanOrMapping(rel, true));
Assert.assertFalse(DruidRels.isScanOrMapping(rel, false));
EasyMock.verify(rel, rel.getPartialDruidQuery());
}
@Test
public void test_isScanOrMapping_allStages()
{
final ImmutableSet<PartialDruidQuery.Stage> okStages = ImmutableSet.of(
PartialDruidQuery.Stage.SCAN,
PartialDruidQuery.Stage.SELECT_PROJECT
);
for (PartialDruidQuery.Stage stage : PartialDruidQuery.Stage.values()) {
final Project project = mockProject(true);
final DruidRel<?> rel = mockDruidRel(
DruidQueryRel.class,
stage,
project,
null
);
Assert.assertEquals(stage.toString(), okStages.contains(stage), DruidRels.isScanOrMapping(rel, true));
Assert.assertEquals(stage.toString(), okStages.contains(stage), DruidRels.isScanOrMapping(rel, false));
EasyMock.verify(rel, rel.getPartialDruidQuery(), project);
}
}
private static DruidRel<?> mockDruidRel(
final Class<? extends DruidRel<?>> clazz,
final PartialDruidQuery.Stage stage,
@Nullable Project selectProject,
@Nullable Filter whereFilter
)
{
// DruidQueryRels rely on a ton of Calcite stuff like RelOptCluster, RelOptTable, etc, which is quite verbose to
// create real instances of. So, tragically, we'll use EasyMock.
final DruidRel<?> mockRel = EasyMock.mock(clazz);
final PartialDruidQuery mockPartialQuery = EasyMock.mock(PartialDruidQuery.class);
EasyMock.expect(mockPartialQuery.stage()).andReturn(stage).anyTimes();
EasyMock.expect(mockPartialQuery.getSelectProject()).andReturn(selectProject).anyTimes();
EasyMock.expect(mockPartialQuery.getWhereFilter()).andReturn(whereFilter).anyTimes();
EasyMock.expect(mockRel.getPartialDruidQuery()).andReturn(mockPartialQuery).anyTimes();
EasyMock.replay(mockRel, mockPartialQuery);
return mockRel;
}
private static Project mockProject(final boolean mapping)
{
final Project mockProject = EasyMock.mock(Project.class);
EasyMock.expect(mockProject.isMapping()).andReturn(mapping).anyTimes();
EasyMock.replay(mockProject);
return mockProject;
}
private static Filter mockFilter()
{
return EasyMock.mock(Filter.class);
}
}

View File

@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.rule;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.sql.calcite.planner.DruidTypeSystem;
import org.junit.Assert;
import org.junit.Test;
import java.math.BigDecimal;
import java.util.List;
public class DruidJoinRuleTest
{
private final RexBuilder rexBuilder = new RexBuilder(new JavaTypeFactoryImpl());
private final RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(DruidTypeSystem.INSTANCE);
private final RelDataType leftType =
new SqlTypeFactoryImpl(DruidTypeSystem.INSTANCE).createStructType(
ImmutableList.of(typeFactory.createSqlType(SqlTypeName.VARCHAR)),
ImmutableList.of("left")
);
private final RelDataType joinType =
new SqlTypeFactoryImpl(DruidTypeSystem.INSTANCE).createStructType(
ImmutableList.of(
typeFactory.createSqlType(SqlTypeName.VARCHAR),
typeFactory.createSqlType(SqlTypeName.VARCHAR)
),
ImmutableList.of("left", "right")
);
@Test
public void test_canHandleCondition_leftEqRight()
{
Assert.assertTrue(
DruidJoinRule.canHandleCondition(
rexBuilder.makeCall(
SqlStdOperatorTable.EQUALS,
rexBuilder.makeInputRef(joinType, 0),
rexBuilder.makeInputRef(joinType, 1)
),
leftType
)
);
}
@Test
public void test_canHandleCondition_leftFnEqRight()
{
Assert.assertTrue(
DruidJoinRule.canHandleCondition(
rexBuilder.makeCall(
SqlStdOperatorTable.EQUALS,
rexBuilder.makeCall(
SqlStdOperatorTable.CONCAT,
rexBuilder.makeLiteral("foo"),
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0)
),
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 1)
),
leftType
)
);
}
@Test
public void test_canHandleCondition_leftEqRightFn()
{
Assert.assertFalse(
DruidJoinRule.canHandleCondition(
rexBuilder.makeCall(
SqlStdOperatorTable.EQUALS,
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0),
rexBuilder.makeCall(
SqlStdOperatorTable.CONCAT,
rexBuilder.makeLiteral("foo"),
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 1)
)
),
leftType
)
);
}
@Test
public void test_canHandleCondition_leftEqLeft()
{
Assert.assertFalse(
DruidJoinRule.canHandleCondition(
rexBuilder.makeCall(
SqlStdOperatorTable.EQUALS,
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0),
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0)
),
leftType
)
);
}
@Test
public void test_canHandleCondition_rightEqRight()
{
Assert.assertFalse(
DruidJoinRule.canHandleCondition(
rexBuilder.makeCall(
SqlStdOperatorTable.EQUALS,
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 1),
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 1)
),
leftType
)
);
}
@Test
public void test_canHandleCondition_true()
{
Assert.assertTrue(
DruidJoinRule.canHandleCondition(
rexBuilder.makeLiteral(true),
leftType
)
);
}
@Test
public void test_canHandleCondition_false()
{
Assert.assertTrue(
DruidJoinRule.canHandleCondition(
rexBuilder.makeLiteral(false),
leftType
)
);
}
@Test
public void test_decomposeAnd_notAnAnd()
{
final List<RexNode> rexNodes = DruidJoinRule.decomposeAnd(rexBuilder.makeInputRef(leftType, 0));
Assert.assertEquals(1, rexNodes.size());
Assert.assertEquals(rexBuilder.makeInputRef(leftType, 0), Iterables.getOnlyElement(rexNodes));
}
@Test
public void test_decomposeAnd_basic()
{
final List<RexNode> decomposed = DruidJoinRule.decomposeAnd(
rexBuilder.makeCall(
SqlStdOperatorTable.AND,
rexBuilder.makeCall(
SqlStdOperatorTable.AND,
rexBuilder.makeExactLiteral(BigDecimal.valueOf(1)),
rexBuilder.makeExactLiteral(BigDecimal.valueOf(2))
),
rexBuilder.makeCall(
SqlStdOperatorTable.AND,
rexBuilder.makeExactLiteral(BigDecimal.valueOf(3)),
rexBuilder.makeExactLiteral(BigDecimal.valueOf(4))
)
)
);
Assert.assertEquals(
ImmutableList.of(
rexBuilder.makeExactLiteral(BigDecimal.valueOf(1)),
rexBuilder.makeExactLiteral(BigDecimal.valueOf(2)),
rexBuilder.makeExactLiteral(BigDecimal.valueOf(3)),
rexBuilder.makeExactLiteral(BigDecimal.valueOf(4))
),
decomposed
);
}
}

View File

@ -118,6 +118,7 @@ import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.schema.DruidSchema;
import org.apache.druid.sql.calcite.schema.LookupSchema;
import org.apache.druid.sql.calcite.schema.MetadataSegmentView;
import org.apache.druid.sql.calcite.schema.SystemSchema;
import org.apache.druid.sql.calcite.view.NoopViewManager;
@ -720,7 +721,10 @@ public class CalciteTests
.buildMMappedIndex();
return new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
return new SpecificSegmentsQuerySegmentWalker(
conglomerate,
INJECTOR.getInstance(LookupExtractorFactoryContainerProvider.class)
).add(
DataSegment.builder()
.dataSource(DATASOURCE1)
.interval(index1.getDataInterval())
@ -857,6 +861,10 @@ public class CalciteTests
).get(0);
}
public static LookupSchema createMockLookupSchema()
{
return new LookupSchema(INJECTOR.getInstance(LookupExtractorFactoryContainerProvider.class));
}
public static SystemSchema createMockSystemSchema(
final DruidSchema druidSchema,

View File

@ -19,58 +19,168 @@
package org.apache.druid.sql.calcite.util;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.io.Closeables;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.query.Druids;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Queries;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.RetryQueryRunnerConfig;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.join.InlineJoinableFactory;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.Joinables;
import org.apache.druid.segment.join.LookupJoinableFactory;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.server.ClientQuerySegmentWalker;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
/**
* A self-contained class that executes queries similarly to the normal Druid query stack.
*
* {@link ClientQuerySegmentWalker}, the same class that Brokers use as the entry point for their query stack, is
* used directly. Our own class {@link DataServerLikeWalker} mimics the behavior of
* {@link org.apache.druid.server.coordination.ServerManager}, the entry point for Historicals. That class isn't used
* directly because the sheer volume of dependencies makes it quite verbose to use in a test environment.
*/
public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, Closeable
{
private final QueryRunnerFactoryConglomerate conglomerate;
private final QuerySegmentWalker walker;
private final JoinableFactory joinableFactory;
private final Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> timelines = new HashMap<>();
private final List<Closeable> closeables = new ArrayList<>();
private final List<DataSegment> segments = new ArrayList<>();
public SpecificSegmentsQuerySegmentWalker(QueryRunnerFactoryConglomerate conglomerate)
/**
* Create an instance using the provided query runner factory conglomerate and lookup provider.
*/
public SpecificSegmentsQuerySegmentWalker(
final QueryRunnerFactoryConglomerate conglomerate,
final LookupExtractorFactoryContainerProvider lookupProvider
)
{
this.conglomerate = conglomerate;
this.joinableFactory = new MapJoinableFactory(
ImmutableMap.<Class<? extends DataSource>, JoinableFactory>builder()
.put(InlineDataSource.class, new InlineJoinableFactory())
.put(LookupDataSource.class, new LookupJoinableFactory(lookupProvider))
.build()
);
this.walker = new ClientQuerySegmentWalker(
new NoopServiceEmitter(),
new DataServerLikeWalker(),
new QueryToolChestWarehouse()
{
@Override
public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> getToolChest(final QueryType query)
{
return conglomerate.findFactory(query).getToolchest();
}
},
new RetryQueryRunnerConfig(),
TestHelper.makeJsonMapper(),
new ServerConfig(),
null /* Cache */,
new CacheConfig()
{
@Override
public boolean isPopulateCache()
{
return false;
}
@Override
public boolean isUseCache()
{
return false;
}
@Override
public boolean isPopulateResultLevelCache()
{
return false;
}
@Override
public boolean isUseResultLevelCache()
{
return false;
}
}
);
}
/**
* Create an instance without any lookups.
*/
public SpecificSegmentsQuerySegmentWalker(final QueryRunnerFactoryConglomerate conglomerate)
{
this(
conglomerate,
new LookupExtractorFactoryContainerProvider()
{
@Override
public Set<String> getAllLookupNames()
{
return Collections.emptySet();
}
@Override
public Optional<LookupExtractorFactoryContainer> get(String lookupName)
{
return Optional.empty();
}
}
);
}
public SpecificSegmentsQuerySegmentWalker add(
@ -79,8 +189,10 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
)
{
final Segment segment = new QueryableIndexSegment(index, descriptor.getId());
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = timelines
.computeIfAbsent(descriptor.getDataSource(), datasource -> new VersionedIntervalTimeline<>(Ordering.natural()));
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = timelines.computeIfAbsent(
descriptor.getDataSource(),
datasource -> new VersionedIntervalTimeline<>(Ordering.natural())
);
timeline.add(
descriptor.getInterval(),
descriptor.getVersion(),
@ -102,70 +214,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
final Iterable<Interval> intervals
)
{
Query<T> newQuery = query;
if (query instanceof ScanQuery && ((ScanQuery) query).getOrder() != ScanQuery.Order.NONE) {
newQuery = (Query<T>) Druids.ScanQueryBuilder.copy((ScanQuery) query)
.intervals(new MultipleSpecificSegmentSpec(ImmutableList.of()))
.build();
}
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(newQuery);
if (factory == null) {
throw new ISE("Unknown query type[%s].", newQuery.getClass());
}
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
return new FinalizeResultsQueryRunner<>(
toolChest.postMergeQueryDecoration(
toolChest.mergeResults(
toolChest.preMergeQueryDecoration(
(queryPlus, responseContext) -> {
Query<T> query1 = queryPlus.getQuery();
Query<T> newQuery1 = query1;
if (query instanceof ScanQuery && ((ScanQuery) query).getOrder() != ScanQuery.Order.NONE) {
newQuery1 = (Query<T>) Druids.ScanQueryBuilder.copy((ScanQuery) query)
.intervals(new MultipleSpecificSegmentSpec(
ImmutableList.of(new SegmentDescriptor(
Intervals.of("2015-04-12/2015-04-13"),
"4",
0
))))
.context(ImmutableMap.of(
ScanQuery.CTX_KEY_OUTERMOST,
false
))
.build();
}
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = getTimelineForTableDataSource(
newQuery1);
return makeBaseRunner(
newQuery1,
toolChest,
factory,
FunctionalIterable
.create(intervals)
.transformCat(
interval -> timeline.lookup(interval)
)
.transformCat(
holder -> FunctionalIterable
.create(holder.getObject())
.transform(
chunk -> new SegmentDescriptor(
holder.getInterval(),
holder.getVersion(),
chunk.getChunkNumber()
)
)
)
).run(QueryPlus.wrap(newQuery1), responseContext);
}
)
)
),
toolChest
);
return walker.getQueryRunnerForIntervals(query, intervals);
}
@Override
@ -174,23 +223,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
final Iterable<SegmentDescriptor> specs
)
{
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
if (factory == null) {
throw new ISE("Unknown query type[%s].", query.getClass());
}
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
return new FinalizeResultsQueryRunner<>(
toolChest.postMergeQueryDecoration(
toolChest.mergeResults(
toolChest.preMergeQueryDecoration(
makeBaseRunner(query, toolChest, factory, specs)
)
)
),
toolChest
);
return walker.getQueryRunnerForSegments(query, specs);
}
@Override
@ -201,52 +234,182 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
}
}
private <T> VersionedIntervalTimeline<String, ReferenceCountingSegment> getTimelineForTableDataSource(Query<T> query)
private List<WindowedSegment> getSegmentsForTable(final String dataSource, final Interval interval)
{
if (query.getDataSource() instanceof TableDataSource) {
return timelines.get(((TableDataSource) query.getDataSource()).getName());
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = timelines.get(dataSource);
if (timeline == null) {
return Collections.emptyList();
} else {
throw new UOE("DataSource type[%s] unsupported", query.getDataSource().getClass().getName());
final List<WindowedSegment> retVal = new ArrayList<>();
for (TimelineObjectHolder<String, ReferenceCountingSegment> holder : timeline.lookup(interval)) {
for (PartitionChunk<ReferenceCountingSegment> chunk : holder.getObject()) {
retVal.add(new WindowedSegment(chunk.getObject(), holder.getInterval()));
}
}
return retVal;
}
}
private <T> QueryRunner<T> makeBaseRunner(
final Query<T> query,
final QueryToolChest<T, Query<T>> toolChest,
final QueryRunnerFactory<T, Query<T>> factory,
final Iterable<SegmentDescriptor> specs
)
private List<WindowedSegment> getSegmentsForTable(final String dataSource, final Iterable<SegmentDescriptor> specs)
{
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = getTimelineForTableDataSource(query);
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = timelines.get(dataSource);
if (timeline == null) {
return new NoopQueryRunner<>();
return Collections.emptyList();
} else {
final List<WindowedSegment> retVal = new ArrayList<>();
for (SegmentDescriptor spec : specs) {
final PartitionHolder<ReferenceCountingSegment> entry = timeline.findEntry(
spec.getInterval(),
spec.getVersion()
);
retVal.add(new WindowedSegment(entry.getChunk(spec.getPartitionNumber()).getObject(), spec.getInterval()));
}
return retVal;
}
}
public static class WindowedSegment
{
private final Segment segment;
private final Interval interval;
public WindowedSegment(Segment segment, Interval interval)
{
this.segment = segment;
this.interval = interval;
Preconditions.checkArgument(segment.getId().getInterval().contains(interval));
}
return new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
factory.mergeRunners(
Execs.directExecutor(),
FunctionalIterable
.create(specs)
.transformCat(
descriptor -> {
final PartitionHolder<ReferenceCountingSegment> holder = timeline.findEntry(
descriptor.getInterval(),
descriptor.getVersion()
);
public Segment getSegment()
{
return segment;
}
return Iterables.transform(
holder,
chunk -> new SpecificSegmentQueryRunner<T>(
factory.createRunner(chunk.getObject()),
new SpecificSegmentSpec(descriptor)
public Interval getInterval()
{
return interval;
}
public SegmentDescriptor getDescriptor()
{
return new SegmentDescriptor(interval, segment.getId().getVersion(), segment.getId().getPartitionNum());
}
}
/**
* Mimics the behavior of a data server (e.g. Historical).
*
* Compare to {@link org.apache.druid.server.SegmentManager}.
*/
private class DataServerLikeWalker implements QuerySegmentWalker
{
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final Iterable<Interval> intervals)
{
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
if (!analysis.isConcreteTableBased()) {
throw new ISE("Cannot handle datasource: %s", query.getDataSource());
}
final String dataSourceName = ((TableDataSource) analysis.getBaseDataSource()).getName();
FunctionalIterable<SegmentDescriptor> segmentDescriptors = FunctionalIterable
.create(intervals)
.transformCat(interval -> getSegmentsForTable(dataSourceName, interval))
.transform(WindowedSegment::getDescriptor);
return getQueryRunnerForSegments(query, segmentDescriptors);
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
{
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
if (factory == null) {
throw new ISE("Unknown query type[%s].", query.getClass());
}
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
if (!analysis.isConcreteTableBased()) {
throw new ISE("Cannot handle datasource: %s", query.getDataSource());
}
final String dataSourceName = ((TableDataSource) analysis.getBaseDataSource()).getName();
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
// Make sure this query type can handle the subquery, if present.
if (analysis.isQuery()
&& !toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery())) {
throw new ISE("Cannot handle subquery: %s", analysis.getDataSource());
}
final Function<Segment, Segment> segmentMapFn = Joinables.createSegmentMapFn(
analysis.getPreJoinableClauses(),
joinableFactory,
new AtomicLong()
);
final QueryRunner<T> baseRunner = new FinalizeResultsQueryRunner<>(
toolChest.postMergeQueryDecoration(
toolChest.mergeResults(
toolChest.preMergeQueryDecoration(
makeTableRunner(toolChest, factory, getSegmentsForTable(dataSourceName, specs), segmentMapFn)
)
)
),
toolChest
);
// Wrap baseRunner in a runner that rewrites the QuerySegmentSpec to mention the specific segments.
// This mimics what CachingClusteredClient on the Broker does, and is required for certain queries (like Scan)
// to function properly.
return (theQuery, responseContext) -> baseRunner.run(
theQuery.withQuery(Queries.withSpecificSegments(theQuery.getQuery(), ImmutableList.copyOf(specs))),
responseContext
);
}
private <T> QueryRunner<T> makeTableRunner(
final QueryToolChest<T, Query<T>> toolChest,
final QueryRunnerFactory<T, Query<T>> factory,
final Iterable<WindowedSegment> segments,
final Function<Segment, Segment> segmentMapFn
)
{
final List<WindowedSegment> segmentsList = Lists.newArrayList(segments);
if (segmentsList.isEmpty()) {
// Note: this is not correct when there's a right or full outer join going on.
// See https://github.com/apache/druid/issues/9229 for details.
return new NoopQueryRunner<>();
}
return new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
factory.mergeRunners(
Execs.directExecutor(),
FunctionalIterable
.create(segmentsList)
.transform(
segment ->
new SpecificSegmentQueryRunner<>(
factory.createRunner(segmentMapFn.apply(segment.getSegment())),
new SpecificSegmentSpec(segment.getDescriptor())
)
);
}
)
)
),
toolChest
);
)
)
),
toolChest
);
}
}
}