Add flag in SQL to disable left base filter optimization for joins (#10947)

* Add flag to disable left base filter

* code coverage

* Draft

* Review comments

* code coverage

* add docs

* Add old tests
This commit is contained in:
Abhishek Agarwal 2021-03-10 02:37:34 +05:30 committed by GitHub
parent 4dd22a850b
commit c66951a59e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 588 additions and 34 deletions

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.common;
import javax.annotation.Nullable;
import java.util.Objects;
public class Triple<T1, T2, T3>
{
@Nullable
public final T1 first;
@Nullable
public final T2 second;
@Nullable
public final T3 third;
public Triple(@Nullable T1 first, @Nullable T2 second, @Nullable T3 third)
{
this.first = first;
this.second = second;
this.third = third;
}
public static <T1, T2, T3> Triple<T1, T2, T3> of(@Nullable T1 first, @Nullable T2 second, @Nullable T3 third)
{
return new Triple<>(first, second, third);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Triple<?, ?, ?> triple = (Triple<?, ?, ?>) o;
return Objects.equals(first, triple.first)
&& Objects.equals(second, triple.second)
&& Objects.equals(third, triple.third);
}
@Override
public int hashCode()
{
return Objects.hash(first, second, third);
}
@Override
public String toString()
{
return "Triple{" +
"first=" + first +
", second=" + second +
", third=" + third +
'}';
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.common;
import org.junit.Assert;
import org.junit.Test;
public class TripleTest
{
@Test
public void test()
{
Triple one = new Triple("first", "second", "third");
Triple second = new Triple("first", "second", "third");
Assert.assertEquals(second, one);
Assert.assertEquals(second.hashCode(), one.hashCode());
Assert.assertEquals("Triple{first=first, second=second, third=third}", one.toString());
}
@Test
public void testUnequals()
{
Triple left = new Triple("first", "second", "third");
Assert.assertNotEquals(new Triple<>(null, "second", "third"), left);
Assert.assertNotEquals(new Triple<>("abc", "second", "third"), left);
Assert.assertNotEquals(new Triple<>("first", null, "third"), left);
Assert.assertNotEquals(new Triple<>("first", "abc", "third"), left);
Assert.assertNotEquals(new Triple<>("first", "second", null), left);
Assert.assertNotEquals(new Triple<>("first", "second", "abc"), left);
}
}

View File

@ -59,6 +59,7 @@ These parameters apply to all query types.
|parallelMergeSmallBatchRows|`druid.processing.merge.task.smallBatchNumRows`|Size of result batches to operate on in ForkJoinPool merge tasks for parallel result merging on the Broker. See [Broker configuration](../configuration/index.md#broker) for more details.|
|useFilterCNF|`false`| If true, Druid will attempt to convert the query filter to Conjunctive Normal Form (CNF). During query processing, columns can be pre-filtered by intersecting the bitmap indexes of all values that match the eligible filters, often greatly reducing the raw number of rows which need to be scanned. But this effect only happens for the top level filter, or individual clauses of a top level 'and' filter. As such, filters in CNF potentially have a higher chance to utilize a large amount of bitmap indexes on string columns during pre-filtering. However, this setting should be used with great caution, as it can sometimes have a negative effect on performance, and in some cases, the act of computing CNF of a filter can be expensive. We recommend hand tuning your filters to produce an optimal form if possible, or at least verifying through experimentation that using this parameter actually improves your query performance with no ill-effects.|
|secondaryPartitionPruning|`true`|Enable secondary partition pruning on the Broker. The Broker will always prune unnecessary segments from the input scan based on a filter on time intervals, but if the data is further partitioned with hash or range partitioning, this option will enable additional pruning based on a filter on secondary partition dimensions.|
|enableJoinLeftTableScanDirect|`false`|This flag applies to queries which have joins. For joins, where left child is a simple scan with a filter, by default, druid will run the scan as a query and the join the results to the right child on broker. Setting this flag to true overrides that behavior and druid will attempt to push the join to data servers instead. Please note that the flag could be applicable to queries even if there is no explicit join. since queries can internally translated into a join by the SQL planner.|
## Query-type-specific parameters

View File

@ -129,21 +129,6 @@ public class JoinDataSource implements DataSource
return new JoinDataSource(left, right, rightPrefix, conditionAnalysis, joinType, leftFilter);
}
/**
* Create a join dataSource from an existing {@link JoinConditionAnalysis}.
*/
public static JoinDataSource create(
final DataSource left,
final DataSource right,
final String rightPrefix,
final String condition,
final JoinType joinType,
final ExprMacroTable macroTable
)
{
return create(left, right, rightPrefix, condition, joinType, null, macroTable);
}
@Override
public Set<String> getTableNames()
{

View File

@ -30,6 +30,7 @@ import org.apache.druid.java.util.common.Numbers;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@PublicApi
@ -54,6 +55,10 @@ public class QueryContexts
public static final String JOIN_FILTER_REWRITE_ENABLE_KEY = "enableJoinFilterRewrite";
public static final String JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS_ENABLE_KEY = "enableJoinFilterRewriteValueColumnFilters";
public static final String JOIN_FILTER_REWRITE_MAX_SIZE_KEY = "joinFilterRewriteMaxSize";
// This flag control whether a sql join query with left scan should be attempted to be run as direct table access
// instead of being wrapped inside a query. With direct table access enabled, druid can push down the join operation to
// data servers.
public static final String SQL_JOIN_LEFT_SCAN_DIRECT = "enableJoinLeftTableScanDirect";
public static final String USE_FILTER_CNF_KEY = "useFilterCNF";
public static final String NUM_RETRIES_ON_MISSING_SEGMENTS_KEY = "numRetriesOnMissingSegments";
public static final String RETURN_PARTIAL_RESULTS_KEY = "returnPartialResults";
@ -76,6 +81,7 @@ public class QueryContexts
public static final boolean DEFAULT_ENABLE_JOIN_FILTER_REWRITE = true;
public static final boolean DEFAULT_ENABLE_JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS = false;
public static final long DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE = 10000;
public static final boolean DEFAULT_ENABLE_SQL_JOIN_LEFT_SCAN_DIRECT = false;
public static final boolean DEFAULT_USE_FILTER_CNF = false;
public static final boolean DEFAULT_SECONDARY_PARTITION_PRUNING = true;
@ -292,6 +298,11 @@ public class QueryContexts
return parseBoolean(query, JOIN_FILTER_REWRITE_ENABLE_KEY, DEFAULT_ENABLE_JOIN_FILTER_REWRITE);
}
public static <T> boolean getEnableJoinLeftScanDirect(Map<String, Object> context)
{
return parseBoolean(context, SQL_JOIN_LEFT_SCAN_DIRECT, DEFAULT_ENABLE_SQL_JOIN_LEFT_SCAN_DIRECT);
}
public static <T> boolean isSecondaryPartitionPruningEnabled(Query<T> query)
{
return parseBoolean(query, SECONDARY_PARTITION_PRUNING_KEY, DEFAULT_SECONDARY_PARTITION_PRUNING);
@ -404,6 +415,12 @@ public class QueryContexts
return val == null ? defaultValue : Numbers.parseBoolean(val);
}
static boolean parseBoolean(Map<String, Object> context, String key, boolean defaultValue)
{
final Object val = context.get(key);
return val == null ? defaultValue : Numbers.parseBoolean(val);
}
private QueryContexts()
{
}

View File

@ -20,7 +20,7 @@
package org.apache.druid.query.planning;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.Triple;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.JoinDataSource;
@ -126,8 +126,8 @@ public class DataSourceAnalysis
}
if (current instanceof JoinDataSource) {
final Pair<Pair<DataSource, DimFilter>, List<PreJoinableClause>> flattened = flattenJoin((JoinDataSource) current);
return new DataSourceAnalysis(dataSource, flattened.lhs.lhs, baseQuery, flattened.lhs.rhs, flattened.rhs);
final Triple<DataSource, DimFilter, List<PreJoinableClause>> flattened = flattenJoin((JoinDataSource) current);
return new DataSourceAnalysis(dataSource, flattened.first, baseQuery, flattened.second, flattened.third);
} else {
return new DataSourceAnalysis(dataSource, current, baseQuery, null, Collections.emptyList());
}
@ -139,7 +139,7 @@ public class DataSourceAnalysis
*
* @throws IllegalArgumentException if dataSource cannot be fully flattened.
*/
private static Pair<Pair<DataSource, DimFilter>, List<PreJoinableClause>> flattenJoin(final JoinDataSource dataSource)
private static Triple<DataSource, DimFilter, List<PreJoinableClause>> flattenJoin(final JoinDataSource dataSource)
{
DataSource current = dataSource;
DimFilter currentDimFilter = null;
@ -166,7 +166,7 @@ public class DataSourceAnalysis
// going-up order. So reverse them.
Collections.reverse(preJoinableClauses);
return Pair.of(Pair.of(current, currentDimFilter), preJoinableClauses);
return Triple.of(current, currentDimFilter, preJoinableClauses);
}
/**

View File

@ -63,6 +63,7 @@ public class HashJoinSegmentStorageAdapter implements StorageAdapter
/**
* @param baseAdapter A StorageAdapter for the left-hand side base segment
* @param clauses The right-hand side clauses. The caller is responsible for ensuring that there are no
* duplicate prefixes or prefixes that shadow each other across the clauses
* @param joinFilterPreAnalysis Pre-analysis for the query we expect to run on this storage adapter
*/
HashJoinSegmentStorageAdapter(
@ -78,6 +79,7 @@ public class HashJoinSegmentStorageAdapter implements StorageAdapter
* @param baseAdapter A StorageAdapter for the left-hand side base segment
* @param baseFilter A filter for the left-hand side base segment
* @param clauses The right-hand side clauses. The caller is responsible for ensuring that there are no
* duplicate prefixes or prefixes that shadow each other across the clauses
* @param joinFilterPreAnalysis Pre-analysis for the query we expect to run on this storage adapter
*/
HashJoinSegmentStorageAdapter(

View File

@ -50,6 +50,7 @@ public class JoinDataSourceTest
"j.",
"x == \"j.x\"",
JoinType.LEFT,
null,
ExprMacroTable.nil()
);
@ -59,6 +60,7 @@ public class JoinDataSourceTest
"j.",
"x == \"j.x\"",
JoinType.LEFT,
null,
ExprMacroTable.nil()
);

View File

@ -131,4 +131,18 @@ public class QueryContextsTest
);
Assert.assertTrue(QueryContexts.isSecondaryPartitionPruningEnabled(query));
}
@Test
public void testGetEnableJoinLeftScanDirect()
{
Assert.assertFalse(QueryContexts.getEnableJoinLeftScanDirect(ImmutableMap.of()));
Assert.assertTrue(QueryContexts.getEnableJoinLeftScanDirect(ImmutableMap.of(
QueryContexts.SQL_JOIN_LEFT_SCAN_DIRECT,
true
)));
Assert.assertFalse(QueryContexts.getEnableJoinLeftScanDirect(ImmutableMap.of(
QueryContexts.SQL_JOIN_LEFT_SCAN_DIRECT,
false
)));
}
}

View File

@ -499,6 +499,7 @@ public class ClientQuerySegmentWalkerTest
"j.",
"\"j.s\" == \"s\"",
JoinType.INNER,
null,
ExprMacroTable.nil()
)
)
@ -566,6 +567,7 @@ public class ClientQuerySegmentWalkerTest
"j.",
"\"j.s\" == \"s\"",
JoinType.INNER,
null,
ExprMacroTable.nil()
)
)
@ -750,6 +752,7 @@ public class ClientQuerySegmentWalkerTest
"j.",
"\"j.s\" == \"s\"",
JoinType.INNER,
null,
ExprMacroTable.nil()
)
)

View File

@ -242,7 +242,7 @@ public class Rules
.addAll(baseRuleSet(plannerContext))
.add(DruidRelToDruidRule.instance())
.add(new DruidTableScanRule(queryMaker))
.addAll(DruidRules.rules());
.addAll(DruidRules.rules(plannerContext));
return retVal.build();
}

View File

@ -42,6 +42,7 @@ import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.sql.calcite.rel.DruidJoinQueryRel;
import org.apache.druid.sql.calcite.rel.DruidQueryRel;
import org.apache.druid.sql.calcite.rel.DruidRel;
import org.apache.druid.sql.calcite.rel.PartialDruidQuery;
@ -54,9 +55,12 @@ import java.util.stream.Collectors;
public class DruidJoinRule extends RelOptRule
{
private static final DruidJoinRule INSTANCE = new DruidJoinRule();
private static final DruidJoinRule INSTANCE = new DruidJoinRule(true);
private static final DruidJoinRule LEFT_SCAN_AS_QUERY = new DruidJoinRule(false);
private DruidJoinRule()
private final boolean enableLeftScanDirect;
private DruidJoinRule(final boolean enableLeftScanDirect)
{
super(
operand(
@ -65,11 +69,12 @@ public class DruidJoinRule extends RelOptRule
operand(DruidRel.class, any())
)
);
this.enableLeftScanDirect = enableLeftScanDirect;
}
public static DruidJoinRule instance()
public static DruidJoinRule instance(boolean enableLeftScanDirect)
{
return INSTANCE;
return enableLeftScanDirect ? INSTANCE : LEFT_SCAN_AS_QUERY;
}
@Override
@ -104,8 +109,10 @@ public class DruidJoinRule extends RelOptRule
// Already verified to be present in "matches", so just call "get".
// Can't be final, because we're going to reassign it up to a couple of times.
ConditionAnalysis conditionAnalysis = analyzeCondition(join.getCondition(), join.getLeft().getRowType()).get();
final boolean isLeftDirectAccessPossible = enableLeftScanDirect && (left instanceof DruidQueryRel);
if (left.getPartialDruidQuery().stage() == PartialDruidQuery.Stage.SELECT_PROJECT) {
if (left.getPartialDruidQuery().stage() == PartialDruidQuery.Stage.SELECT_PROJECT
&& (isLeftDirectAccessPossible || left.getPartialDruidQuery().getWhereFilter() == null)) {
// Swap the left-side projection above the join, so the left side is a simple scan or mapping. This helps us
// avoid subqueries.
final RelNode leftScan = left.getPartialDruidQuery().getScan();

View File

@ -30,6 +30,8 @@ import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.Sort;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidOuterQueryRel;
import org.apache.druid.sql.calcite.rel.DruidRel;
import org.apache.druid.sql.calcite.rel.PartialDruidQuery;
@ -46,8 +48,9 @@ public class DruidRules
// No instantiation.
}
public static List<RelOptRule> rules()
public static List<RelOptRule> rules(PlannerContext plannerContext)
{
boolean enableLeftScanDirect = QueryContexts.getEnableJoinLeftScanDirect(plannerContext.getQueryContext());
return ImmutableList.of(
new DruidQueryRule<>(
Filter.class,
@ -92,7 +95,7 @@ public class DruidRules
DruidUnionRule.instance(),
DruidUnionDataSourceRule.instance(),
DruidSortUnionRule.instance(),
DruidJoinRule.instance()
DruidJoinRule.instance(enableLeftScanDirect)
);
}

View File

@ -877,4 +877,12 @@ public class BaseCalciteQueryTest extends CalciteTestBase
};
}
}
protected Map<String, Object> withLeftDirectAccessEnabled(Map<String, Object> context)
{
// since context is usually immutable in tests, make a copy
HashMap<String, Object> newContext = new HashMap<>(context);
newContext.put(QueryContexts.SQL_JOIN_LEFT_SCAN_DIRECT, true);
return newContext;
}
}

View File

@ -100,6 +100,7 @@ public class CalciteCorrelatedQueryTest extends BaseCalciteQueryTest
public void testCorrelatedSubquery(Map<String, Object> queryContext) throws Exception
{
cannotVectorize();
queryContext = withLeftDirectAccessEnabled(queryContext);
testQuery(
"select country, ANY_VALUE(\n"
@ -205,6 +206,7 @@ public class CalciteCorrelatedQueryTest extends BaseCalciteQueryTest
public void testCorrelatedSubqueryWithLeftFilter(Map<String, Object> queryContext) throws Exception
{
cannotVectorize();
queryContext = withLeftDirectAccessEnabled(queryContext);
testQuery(
"select country, ANY_VALUE(\n"
@ -282,11 +284,97 @@ public class CalciteCorrelatedQueryTest extends BaseCalciteQueryTest
);
}
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testCorrelatedSubqueryWithLeftFilter_leftDirectAccessDisabled(Map<String, Object> queryContext) throws Exception
{
cannotVectorize();
testQuery(
"select country, ANY_VALUE(\n"
+ " select max(\"users\") from (\n"
+ " select floor(__time to day), count(*) \"users\" from visits f where f.country = visits.country group by 1\n"
+ " )\n"
+ " ) as \"dailyVisits\"\n"
+ "from visits \n"
+ " where city = 'B' and __time between '2021-01-01 01:00:00' AND '2021-01-02 23:59:59'"
+ " group by 1",
queryContext,
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
join(
new QueryDataSource(newScanQueryBuilder().dataSource("visits")
.intervals(querySegmentSpec(Intervals.of(
"2021-01-01T01:00:00.000Z/2021-01-02T23:59:59.001Z")))
.filters(selector("city", "B", null))
.columns("__time", "city", "country")
.build()),
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(
GroupByQuery.builder()
.setDataSource("visits")
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
.setVirtualColumns(new ExpressionVirtualColumn(
"v0",
"timestamp_floor(\"__time\",'P1D',null,'UTC')",
ValueType.LONG,
TestExprMacroTable.INSTANCE
))
.setDimFilter(not(selector("country", null, null)))
.setDimensions(
new DefaultDimensionSpec(
"v0",
"d0",
ValueType.LONG
),
new DefaultDimensionSpec(
"country",
"d1"
)
)
.setAggregatorSpecs(new CountAggregatorFactory("a0"))
.setContext(queryContext)
.setGranularity(new AllGranularity())
.build()
)
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
.setDimensions(new DefaultDimensionSpec("d1", "_d0"))
.setAggregatorSpecs(
new LongMaxAggregatorFactory("_a0", "a0")
)
.setGranularity(new AllGranularity())
.setContext(queryContext)
.build()
),
"j0.",
equalsCondition(
DruidExpression.fromColumn("country"),
DruidExpression.fromColumn("j0._d0")
),
JoinType.LEFT
)
)
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
.setDimensions(new DefaultDimensionSpec("country", "d0"))
.setAggregatorSpecs(new LongAnyAggregatorFactory("a0", "j0._a0"))
.setGranularity(new AllGranularity())
.setContext(queryContext)
.build()
),
ImmutableList.of(
new Object[]{"canada", 4L}
)
);
}
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testCorrelatedSubqueryWithCorrelatedQueryFilter(Map<String, Object> queryContext) throws Exception
{
cannotVectorize();
queryContext = withLeftDirectAccessEnabled(queryContext);
testQuery(
"select country, ANY_VALUE(\n"
@ -374,7 +462,7 @@ public class CalciteCorrelatedQueryTest extends BaseCalciteQueryTest
public void testCorrelatedSubqueryWithCorrelatedQueryFilter_Scan(Map<String, Object> queryContext) throws Exception
{
cannotVectorize();
queryContext = withLeftDirectAccessEnabled(queryContext);
testQuery(
"select country, ANY_VALUE(\n"
+ " select max(\"users\") from (\n"

View File

@ -84,6 +84,7 @@ import org.apache.druid.query.filter.BoundDimFilter;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.InDimFilter;
import org.apache.druid.query.filter.LikeDimFilter;
import org.apache.druid.query.filter.NotDimFilter;
import org.apache.druid.query.filter.RegexDimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.groupby.GroupByQuery;
@ -5470,8 +5471,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
public void testViewAndJoin() throws Exception
{
cannotVectorize();
Map<String, Object> queryContext = withLeftDirectAccessEnabled(QUERY_CONTEXT_DEFAULT);
testQuery(
"SELECT COUNT(*) FROM view.cview as a INNER JOIN druid.foo d on d.dim2 = a.dim2 WHERE a.dim1_firstchar <> 'z' ",
queryContext,
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(
@ -5482,7 +5485,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
newScanQueryBuilder().dataSource(CalciteTests.DATASOURCE3)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("dim2")
.context(QUERY_CONTEXT_DEFAULT)
.context(queryContext)
.build()
),
"j0.",
@ -5494,7 +5497,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
newScanQueryBuilder().dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("dim2")
.context(QUERY_CONTEXT_DEFAULT)
.context(queryContext)
.build()
),
"_j0.",
@ -5506,7 +5509,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.filters(not(selector("dim1", "z", new SubstringDimExtractionFn(0, 1))))
.granularity(Granularities.ALL)
.aggregators(aggregators(new CountAggregatorFactory("a0")))
.context(TIMESERIES_CONTEXT_DEFAULT)
.context(withLeftDirectAccessEnabled(TIMESERIES_CONTEXT_DEFAULT))
.build()
),
ImmutableList.of(
@ -13333,7 +13336,15 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.builder()
.setDataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Intervals.of("2001-01-02T00:00:00.000Z/146140482-04-24T15:36:27.903Z")))
.columns("dim1")
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(queryContext)
.build()
),
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
@ -13352,7 +13363,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
)
.setGranularity(Granularities.ALL)
.setInterval(querySegmentSpec(Intervals.of("2001-01-02T00:00:00.000Z/146140482-04-24T15:36:27.903Z")))
.setInterval(querySegmentSpec(Filtration.eternity()))
.setDimFilter(selector("dim1", "def", null))
.setDimensions(
dimensions(
@ -16124,6 +16135,76 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
@Parameters(source = QueryContextForJoinProvider.class)
public void testLeftJoinOnTwoInlineDataSourcesWithTimeFilter(Map<String, Object> queryContext) throws Exception
{
testQuery(
"with abc as\n"
+ "(\n"
+ " SELECT dim1, \"__time\", m1 from foo WHERE \"dim1\" = '10.1' AND \"__time\" >= '1999'\n"
+ ")\n"
+ "SELECT t1.dim1, t1.\"__time\" from abc as t1 LEFT JOIN abc as t2 on t1.dim1 = t2.dim1 WHERE t1.dim1 = '10.1'\n",
queryContext,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(
querySegmentSpec(
Intervals.utc(
DateTimes.of("1999-01-01").getMillis(),
JodaUtils.MAX_INSTANT
)
)
)
.filters(new SelectorDimFilter("dim1", "10.1", null))
.virtualColumns(expressionVirtualColumn("v0", "\'10.1\'", ValueType.STRING))
.columns(ImmutableList.of("__time", "v0"))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(queryContext)
.build()
),
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(
querySegmentSpec(
Intervals.utc(
DateTimes.of("1999-01-01").getMillis(),
JodaUtils.MAX_INSTANT
)
)
)
.filters(new SelectorDimFilter("dim1", "10.1", null))
.virtualColumns(expressionVirtualColumn("v0", "\'10.1\'", ValueType.STRING))
.columns(ImmutableList.of("v0"))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(queryContext)
.build()
),
"j0.",
equalsCondition(DruidExpression.fromColumn("v0"), DruidExpression.fromColumn("j0.v0")),
JoinType.LEFT
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(expressionVirtualColumn("_v0", "\'10.1\'", ValueType.STRING))
.columns("__time", "_v0")
.filters(new SelectorDimFilter("v0", "10.1", null))
.context(queryContext)
.build()
),
ImmutableList.of(
new Object[]{"10.1", 946771200000L}
)
);
}
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testLeftJoinOnTwoInlineDataSourcesWithTimeFilter_withLeftDirectAccess(Map<String, Object> queryContext) throws Exception
{
queryContext = withLeftDirectAccessEnabled(queryContext);
testQuery(
"with abc as\n"
+ "(\n"
@ -16181,6 +16262,61 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
@Parameters(source = QueryContextForJoinProvider.class)
public void testLeftJoinOnTwoInlineDataSourcesWithOuterWhere(Map<String, Object> queryContext) throws Exception
{
testQuery(
"with abc as\n"
+ "(\n"
+ " SELECT dim1, \"__time\", m1 from foo WHERE \"dim1\" = '10.1'\n"
+ ")\n"
+ "SELECT t1.dim1, t1.\"__time\" from abc as t1 LEFT JOIN abc as t2 on t1.dim1 = t2.dim1 WHERE t1.dim1 = '10.1'\n",
queryContext,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(new SelectorDimFilter("dim1", "10.1", null))
.virtualColumns(expressionVirtualColumn("v0", "\'10.1\'", ValueType.STRING))
.columns(ImmutableList.of("__time", "v0"))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(queryContext)
.build()
),
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(new SelectorDimFilter("dim1", "10.1", null))
.columns(ImmutableList.of("dim1"))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(queryContext)
.build()
),
"j0.",
equalsCondition(DruidExpression.fromColumn("v0"), DruidExpression.fromColumn("j0.dim1")),
JoinType.LEFT
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(expressionVirtualColumn("_v0", "\'10.1\'", ValueType.STRING))
.columns("__time", "_v0")
.filters(new SelectorDimFilter("v0", "10.1", null))
.context(queryContext)
.build()
),
ImmutableList.of(
new Object[]{"10.1", 946771200000L}
)
);
}
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testLeftJoinOnTwoInlineDataSourcesWithOuterWhere_withLeftDirectAccess(Map<String, Object> queryContext) throws Exception
{
queryContext = withLeftDirectAccessEnabled(queryContext);
testQuery(
"with abc as\n"
+ "(\n"
@ -16225,6 +16361,60 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
@Parameters(source = QueryContextForJoinProvider.class)
public void testLeftJoinOnTwoInlineDataSources(Map<String, Object> queryContext) throws Exception
{
testQuery(
"with abc as\n"
+ "(\n"
+ " SELECT dim1, \"__time\", m1 from foo WHERE \"dim1\" = '10.1'\n"
+ ")\n"
+ "SELECT t1.dim1, t1.\"__time\" from abc as t1 LEFT JOIN abc as t2 on t1.dim1 = t2.dim1\n",
queryContext,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(new SelectorDimFilter("dim1", "10.1", null))
.virtualColumns(expressionVirtualColumn("v0", "\'10.1\'", ValueType.STRING))
.columns(ImmutableList.of("__time", "v0"))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(queryContext)
.build()
),
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(new SelectorDimFilter("dim1", "10.1", null))
.columns(ImmutableList.of("dim1"))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(queryContext)
.build()
),
"j0.",
equalsCondition(DruidExpression.fromColumn("v0"), DruidExpression.fromColumn("j0.dim1")),
JoinType.LEFT
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(expressionVirtualColumn("_v0", "\'10.1\'", ValueType.STRING))
.columns("__time", "_v0")
.context(queryContext)
.build()
),
ImmutableList.of(
new Object[]{"10.1", 946771200000L}
)
);
}
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testLeftJoinOnTwoInlineDataSources_withLeftDirectAccess(Map<String, Object> queryContext) throws Exception
{
queryContext = withLeftDirectAccessEnabled(queryContext);
testQuery(
"with abc as\n"
+ "(\n"
@ -16269,6 +16459,61 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
@Parameters(source = QueryContextForJoinProvider.class)
public void testInnerJoinOnTwoInlineDataSourcesWithOuterWhere(Map<String, Object> queryContext) throws Exception
{
testQuery(
"with abc as\n"
+ "(\n"
+ " SELECT dim1, \"__time\", m1 from foo WHERE \"dim1\" = '10.1'\n"
+ ")\n"
+ "SELECT t1.dim1, t1.\"__time\" from abc as t1 INNER JOIN abc as t2 on t1.dim1 = t2.dim1 WHERE t1.dim1 = '10.1'\n",
queryContext,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(new SelectorDimFilter("dim1", "10.1", null))
.virtualColumns(expressionVirtualColumn("v0", "\'10.1\'", ValueType.STRING))
.columns(ImmutableList.of("__time", "v0"))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(queryContext)
.build()
),
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(new SelectorDimFilter("dim1", "10.1", null))
.columns(ImmutableList.of("dim1"))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(queryContext)
.build()
),
"j0.",
equalsCondition(DruidExpression.fromColumn("v0"), DruidExpression.fromColumn("j0.dim1")),
JoinType.INNER
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(expressionVirtualColumn("_v0", "\'10.1\'", ValueType.STRING))
.columns("__time", "_v0")
.filters(new NotDimFilter(new SelectorDimFilter("v0", null, null)))
.context(queryContext)
.build()
),
ImmutableList.of(
new Object[]{"10.1", 946771200000L}
)
);
}
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testInnerJoinOnTwoInlineDataSourcesWithOuterWhere_withLeftDirectAccess(Map<String, Object> queryContext) throws Exception
{
queryContext = withLeftDirectAccessEnabled(queryContext);
testQuery(
"with abc as\n"
+ "(\n"
@ -16313,6 +16558,60 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
@Parameters(source = QueryContextForJoinProvider.class)
public void testInnerJoinOnTwoInlineDataSources(Map<String, Object> queryContext) throws Exception
{
testQuery(
"with abc as\n"
+ "(\n"
+ " SELECT dim1, \"__time\", m1 from foo WHERE \"dim1\" = '10.1'\n"
+ ")\n"
+ "SELECT t1.dim1, t1.\"__time\" from abc as t1 INNER JOIN abc as t2 on t1.dim1 = t2.dim1\n",
queryContext,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(new SelectorDimFilter("dim1", "10.1", null))
.virtualColumns(expressionVirtualColumn("v0", "\'10.1\'", ValueType.STRING))
.columns(ImmutableList.of("__time", "v0"))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(queryContext)
.build()
),
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(new SelectorDimFilter("dim1", "10.1", null))
.columns(ImmutableList.of("dim1"))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(queryContext)
.build()
),
"j0.",
equalsCondition(DruidExpression.fromColumn("v0"), DruidExpression.fromColumn("j0.dim1")),
JoinType.INNER
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(expressionVirtualColumn("_v0", "\'10.1\'", ValueType.STRING))
.columns("__time", "_v0")
.context(queryContext)
.build()
),
ImmutableList.of(
new Object[]{"10.1", 946771200000L}
)
);
}
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testInnerJoinOnTwoInlineDataSources_withLeftDirectAccess(Map<String, Object> queryContext) throws Exception
{
queryContext = withLeftDirectAccessEnabled(queryContext);
testQuery(
"with abc as\n"
+ "(\n"

View File

@ -1422,6 +1422,7 @@ skipEmptyBuckets
useCache
useResultLevelCache
vectorSize
enableJoinLeftTableScanDirect
- ../docs/querying/querying.md
DatasourceMetadata
TimeBoundary