Optimize join queries where filter matches nothing (#9931)

* Refactor JoinFilterAnalyzer

This patch attempts to make it easier to follow the join filter analysis code
with the hope of making it easier to add rewrite optimizations in the future.

To keep the patch small and easy to review, this is the first of at least 2
patches that are planned.

This patch adds a builder to the Pre-Analysis, so that it is easier to
instantiate the preAnalysis. It also moves some of the filter normalization
code out to Fitlers with associated tests.

* fix tests

* Refactor JoinFilterAnalyzer - part 2

This change introduces the following components:
 * RhsRewriteCandidates - a wrapper for a list of candidates and associated
     functions to operate on the set of candidates.
 * JoinableClauses - a wrapper for the list of JoinableClause that represent
     a join condition and the associated functions to operate on the clauses.
 * Equiconditions - a wrapper representing the equiconditions that are used
     in the join condition.

And associated test changes.

This refactoring surfaced 2 bugs:
 - Missing equals and hashcode implementation for RhsRewriteCandidate, thus
   allowing potential duplicates in the rhs rewrite candidates
 - Missing Filter#supportsRequiredColumnRewrite check in
   analyzeJoinFilterClause, which could result in UnsupportedOperationException
   being thrown by the filter

* fix compile error

* remove unused class

* Refactor JoinFilterAnalyzer - Correlations

Move the correlation related code out into it's own class so it's easier
to maintain.
Another patch should follow this one so that the query path uses the
correlation object instead of it's underlying maps.

* Optimize join queries where filter matches nothing

Fixes #9787

This PR changes the Joinable interface to return an Optional set of correlated
values for a column.
This allows the JoinFilterAnalyzer to differentiate between the case where the
column has no matching values and when the column could not find matching
values.

This PR chose not to distinguish between cases where correlated values could
not be computed because of a config that has this behavior disabled or because
of user error - like a column that could not be found. The reasoning was that
the latter is likely an error and the non filter pushdown path will surface the
error if it is.
This commit is contained in:
Suneet Saldanha 2020-05-29 16:53:03 -07:00 committed by GitHub
parent 9c40bebc02
commit e03d38b6c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 600 additions and 432 deletions

View File

@ -24,6 +24,7 @@ import org.apache.druid.segment.column.ColumnCapabilities;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Optional;
import java.util.Set;
/**
@ -88,9 +89,11 @@ public interface Joinable
* returned than this limit, return an empty set.
* @param allowNonKeyColumnSearch If true, allow searchs on non-key columns. If this is false,
* a search on a non-key column should return an empty set.
* @return The set of correlated column values. If we cannot determine correlated values, return an empty set.
* @return The set of correlated column values. If we cannot determine correlated values, return absent.
*
* In case either the search or retrieval column names are not found, this will return absent.
*/
Set<String> getCorrelatedColumnValues(
Optional<Set<String>> getCorrelatedColumnValues(
String searchColumnName,
String searchColumnValue,
String retrievalColumnName,

View File

@ -28,21 +28,16 @@ import org.apache.druid.query.filter.InDimFilter;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.filter.FalseFilter;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.filter.OrFilter;
import org.apache.druid.segment.filter.SelectorFilter;
import org.apache.druid.segment.join.Equality;
import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.JoinableClause;
import org.apache.druid.segment.join.filter.rewrite.RhsRewriteCandidate;
import org.apache.druid.segment.join.filter.rewrite.RhsRewriteCandidates;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -147,131 +142,16 @@ public class JoinFilterAnalyzer
// build the equicondition map, used for determining how the tables are connected through joins
Equiconditions equiconditions = preAnalysisBuilder.computeEquiconditionsFromJoinableClauses();
RhsRewriteCandidates rhsRewriteCandidates =
RhsRewriteCandidates.getRhsRewriteCandidates(normalizedJoinTableClauses, equiconditions, joinableClauses);
JoinFilterCorrelations correlations = JoinFilterCorrelations.computeJoinFilterCorrelations(
normalizedJoinTableClauses,
equiconditions,
joinableClauses,
enableRewriteValueColumnFilters,
filterRewriteMaxSize
);
// Build a map of RHS table prefix -> JoinFilterColumnCorrelationAnalysis based on the RHS rewrite candidates
Map<String, Optional<Map<String, JoinFilterColumnCorrelationAnalysis>>> correlationsByPrefix = new HashMap<>();
Map<String, Optional<JoinFilterColumnCorrelationAnalysis>> directRewriteCorrelations = new HashMap<>();
for (RhsRewriteCandidate rhsRewriteCandidate : rhsRewriteCandidates.getRhsRewriteCandidates()) {
if (rhsRewriteCandidate.isDirectRewrite()) {
directRewriteCorrelations.computeIfAbsent(
rhsRewriteCandidate.getRhsColumn(),
c -> {
Optional<Map<String, JoinFilterColumnCorrelationAnalysis>> correlatedBaseTableColumns =
findCorrelatedBaseTableColumns(
joinableClauses,
c,
rhsRewriteCandidate,
equiconditions
);
if (!correlatedBaseTableColumns.isPresent()) {
return Optional.empty();
} else {
JoinFilterColumnCorrelationAnalysis baseColumnAnalysis = correlatedBaseTableColumns.get().get(c);
// for direct rewrites, there will only be one analysis keyed by the RHS column
assert (baseColumnAnalysis != null);
return Optional.of(correlatedBaseTableColumns.get().get(c));
}
}
);
} else {
correlationsByPrefix.computeIfAbsent(
rhsRewriteCandidate.getJoinableClause().getPrefix(),
p -> findCorrelatedBaseTableColumns(
joinableClauses,
p,
rhsRewriteCandidate,
equiconditions
)
);
}
}
// Using the RHS table prefix -> JoinFilterColumnCorrelationAnalysis created in the previous step,
// build a map of rhsFilterColumn -> Pair(rhsFilterColumn, rhsFilterValue) -> correlatedValues for specific filter pair
// The Pair(rhsFilterColumn, rhsFilterValue) -> correlatedValues mappings are stored in the
// JoinFilterColumnCorrelationAnalysis objects, which are shared across all rhsFilterColumn entries that belong
// to the same RHS table.
//
// The value is a List<JoinFilterColumnCorrelationAnalysis> instead of a single value because a table can be joined
// to another via multiple columns.
// (See JoinFilterAnalyzerTest.test_filterPushDown_factToRegionOneColumnToTwoRHSColumnsAndFilterOnRHS for an example)
Map<String, List<JoinFilterColumnCorrelationAnalysis>> correlationsByFilteringColumn = new LinkedHashMap<>();
Map<String, List<JoinFilterColumnCorrelationAnalysis>> correlationsByDirectFilteringColumn = new LinkedHashMap<>();
for (RhsRewriteCandidate rhsRewriteCandidate : rhsRewriteCandidates.getRhsRewriteCandidates()) {
if (rhsRewriteCandidate.isDirectRewrite()) {
List<JoinFilterColumnCorrelationAnalysis> perColumnCorrelations =
correlationsByDirectFilteringColumn.computeIfAbsent(
rhsRewriteCandidate.getRhsColumn(),
(rhsCol) -> new ArrayList<>()
);
perColumnCorrelations.add(
directRewriteCorrelations.get(rhsRewriteCandidate.getRhsColumn()).get()
);
continue;
}
Optional<Map<String, JoinFilterColumnCorrelationAnalysis>> correlationsForPrefix = correlationsByPrefix.get(
rhsRewriteCandidate.getJoinableClause().getPrefix()
);
if (correlationsForPrefix.isPresent()) {
for (Map.Entry<String, JoinFilterColumnCorrelationAnalysis> correlationForPrefix : correlationsForPrefix.get()
.entrySet()) {
List<JoinFilterColumnCorrelationAnalysis> perColumnCorrelations =
correlationsByFilteringColumn.computeIfAbsent(
rhsRewriteCandidate.getRhsColumn(),
(rhsCol) -> new ArrayList<>()
);
perColumnCorrelations.add(correlationForPrefix.getValue());
correlationForPrefix.getValue().getCorrelatedValuesMap().computeIfAbsent(
Pair.of(rhsRewriteCandidate.getRhsColumn(), rhsRewriteCandidate.getValueForRewrite()),
(rhsVal) -> {
Set<String> correlatedValues = getCorrelatedValuesForPushDown(
rhsRewriteCandidate.getRhsColumn(),
rhsRewriteCandidate.getValueForRewrite(),
correlationForPrefix.getValue().getJoinColumn(),
rhsRewriteCandidate.getJoinableClause(),
enableRewriteValueColumnFilters,
filterRewriteMaxSize
);
if (correlatedValues.isEmpty()) {
return Optional.empty();
} else {
return Optional.of(correlatedValues);
}
}
);
}
} else {
correlationsByFilteringColumn.put(rhsRewriteCandidate.getRhsColumn(), null);
}
}
// Go through each per-column analysis list and prune duplicates
for (Map.Entry<String, List<JoinFilterColumnCorrelationAnalysis>> correlation : correlationsByFilteringColumn
.entrySet()) {
if (correlation.getValue() != null) {
List<JoinFilterColumnCorrelationAnalysis> dedupList = eliminateCorrelationDuplicates(
correlation.getValue()
);
correlationsByFilteringColumn.put(correlation.getKey(), dedupList);
}
}
for (Map.Entry<String, List<JoinFilterColumnCorrelationAnalysis>> correlation : correlationsByDirectFilteringColumn
.entrySet()) {
if (correlation.getValue() != null) {
List<JoinFilterColumnCorrelationAnalysis> dedupList = eliminateCorrelationDuplicates(
correlation.getValue()
);
correlationsByDirectFilteringColumn.put(correlation.getKey(), dedupList);
}
}
preAnalysisBuilder.withCorrelationsByFilteringColumn(correlationsByFilteringColumn)
.withCorrelationsByDirectFilteringColumn(correlationsByDirectFilteringColumn);
return preAnalysisBuilder.build();
return preAnalysisBuilder.withCorrelations(correlations)
.build();
}
/**
@ -561,10 +441,20 @@ public class JoinFilterAnalyzer
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(selectorFilter);
}
Set<String> newFilterValues = correlatedValues.get();
// in nothing => match nothing
if (newFilterValues.isEmpty()) {
return new JoinFilterAnalysis(
true,
selectorFilter,
FalseFilter.instance()
);
}
for (String correlatedBaseColumn : correlationAnalysis.getBaseColumns()) {
Filter rewrittenFilter = new InDimFilter(
correlatedBaseColumn,
correlatedValues.get(),
newFilterValues,
null,
null
).toFilter();
@ -587,7 +477,7 @@ public class JoinFilterAnalyzer
Filter rewrittenFilter = new InDimFilter(
pushDownVirtualColumn.getOutputName(),
correlatedValues.get(),
newFilterValues,
null,
null
).toFilter();
@ -613,218 +503,6 @@ public class JoinFilterAnalyzer
return PUSH_DOWN_VIRTUAL_COLUMN_NAME_BASE + counter;
}
/**
* Helper method for rewriting filters on join table columns into filters on base table columns.
*
* @param filterColumn A join table column that we're filtering on
* @param filterValue The value to filter on
* @param correlatedJoinColumn A join table column that appears as the RHS of an equicondition, which we can correlate
* with a column on the base table
* @param clauseForFilteredTable The joinable clause that corresponds to the join table being filtered on
*
* @return A list of values of the correlatedJoinColumn that appear in rows where filterColumn = filterValue
* Returns an empty set if we cannot determine the correlated values.
*/
private static Set<String> getCorrelatedValuesForPushDown(
String filterColumn,
String filterValue,
String correlatedJoinColumn,
JoinableClause clauseForFilteredTable,
boolean enableRewriteValueColumnFilters,
long filterRewriteMaxSize
)
{
String filterColumnNoPrefix = filterColumn.substring(clauseForFilteredTable.getPrefix().length());
String correlatedColumnNoPrefix = correlatedJoinColumn.substring(clauseForFilteredTable.getPrefix().length());
return clauseForFilteredTable.getJoinable().getCorrelatedColumnValues(
filterColumnNoPrefix,
filterValue,
correlatedColumnNoPrefix,
filterRewriteMaxSize,
enableRewriteValueColumnFilters
);
}
/**
* For each rhs column that appears in the equiconditions for a table's JoinableClause,
* we try to determine what base table columns are related to the rhs column through the total set of equiconditions.
* We do this by searching backwards through the chain of join equiconditions using the provided equicondition map.
*
* For example, suppose we have 3 tables, A,B,C, joined with the following conditions, where A is the base table:
* A.joinColumn == B.joinColumn
* B.joinColum == C.joinColumn
*
* We would determine that C.joinColumn is correlated with A.joinColumn: we first see that
* C.joinColumn is linked to B.joinColumn which in turn is linked to A.joinColumn
*
* Suppose we had the following join conditions instead:
* f(A.joinColumn) == B.joinColumn
* B.joinColum == C.joinColumn
* In this case, the JoinFilterColumnCorrelationAnalysis for C.joinColumn would be linked to f(A.joinColumn).
*
* Suppose we had the following join conditions instead:
* A.joinColumn == B.joinColumn
* f(B.joinColum) == C.joinColumn
*
* Because we cannot reverse the function f() applied to the second table B in all cases,
* we cannot relate C.joinColumn to A.joinColumn, and we would not generate a correlation for C.joinColumn
*
* @param joinableClauses List of joinable clauses for the query
* @param tablePrefix Prefix for a join table
* @param rhsRewriteCandidate RHS rewrite candidate that we find correlated base table columns for
* @param equiConditions Map of equiconditions, keyed by the right hand columns
*
* @return A list of correlatation analyses for the equicondition RHS columns that reside in the table associated with
* the tablePrefix
*/
private static Optional<Map<String, JoinFilterColumnCorrelationAnalysis>> findCorrelatedBaseTableColumns(
JoinableClauses joinableClauses,
String tablePrefix,
RhsRewriteCandidate rhsRewriteCandidate,
Equiconditions equiConditions
)
{
JoinableClause clauseForTablePrefix = rhsRewriteCandidate.getJoinableClause();
JoinConditionAnalysis jca = clauseForTablePrefix.getCondition();
Set<String> rhsColumns = new HashSet<>();
if (rhsRewriteCandidate.isDirectRewrite()) {
// If we filter on a RHS join column, we only need to consider that column from the RHS side
rhsColumns.add(rhsRewriteCandidate.getRhsColumn());
} else {
for (Equality eq : jca.getEquiConditions()) {
rhsColumns.add(tablePrefix + eq.getRightColumn());
}
}
Map<String, JoinFilterColumnCorrelationAnalysis> correlations = new LinkedHashMap<>();
for (String rhsColumn : rhsColumns) {
Set<String> correlatedBaseColumns = new HashSet<>();
Set<Expr> correlatedBaseExpressions = new HashSet<>();
getCorrelationForRHSColumn(
joinableClauses,
equiConditions,
rhsColumn,
correlatedBaseColumns,
correlatedBaseExpressions
);
if (correlatedBaseColumns.isEmpty() && correlatedBaseExpressions.isEmpty()) {
continue;
}
correlations.put(
rhsColumn,
new JoinFilterColumnCorrelationAnalysis(
rhsColumn,
correlatedBaseColumns,
correlatedBaseExpressions
)
);
}
if (correlations.size() == 0) {
return Optional.empty();
} else {
return Optional.of(correlations);
}
}
/**
* Helper method for {@link #findCorrelatedBaseTableColumns} that determines correlated base table columns
* and/or expressions for a single RHS column and adds them to the provided sets as it traverses the
* equicondition column relationships.
*
* @param equiConditions Map of equiconditions, keyed by the right hand columns
* @param rhsColumn RHS column to find base table correlations for
* @param correlatedBaseColumns Set of correlated base column names for the provided RHS column. Will be modified.
* @param correlatedBaseExpressions Set of correlated base column expressions for the provided RHS column. Will be
* modified.
*/
private static void getCorrelationForRHSColumn(
JoinableClauses joinableClauses,
Equiconditions equiConditions,
String rhsColumn,
Set<String> correlatedBaseColumns,
Set<Expr> correlatedBaseExpressions
)
{
String findMappingFor = rhsColumn;
Set<Expr> lhsExprs = equiConditions.getLhsExprs(findMappingFor);
if (lhsExprs == null) {
return;
}
for (Expr lhsExpr : lhsExprs) {
String identifier = lhsExpr.getBindingIfIdentifier();
if (identifier == null) {
// We push down if the function only requires base table columns
Expr.BindingDetails bindingDetails = lhsExpr.analyzeInputs();
Set<String> requiredBindings = bindingDetails.getRequiredBindings();
if (joinableClauses.areSomeColumnsFromJoin(requiredBindings)) {
break;
}
correlatedBaseExpressions.add(lhsExpr);
} else {
// simple identifier, see if we can correlate it with a column on the base table
findMappingFor = identifier;
if (joinableClauses.getColumnFromJoinIfExists(identifier) == null) {
correlatedBaseColumns.add(findMappingFor);
} else {
getCorrelationForRHSColumn(
joinableClauses,
equiConditions,
findMappingFor,
correlatedBaseColumns,
correlatedBaseExpressions
);
}
}
}
}
/**
* Given a list of JoinFilterColumnCorrelationAnalysis, prune the list so that we only have one
* JoinFilterColumnCorrelationAnalysis for each unique combination of base columns.
*
* Suppose we have a join condition like the following, where A is the base table:
* A.joinColumn == B.joinColumn && A.joinColumn == B.joinColumn2
*
* We only need to consider one correlation to A.joinColumn since B.joinColumn and B.joinColumn2 must
* have the same value in any row that matches the join condition.
*
* In the future this method could consider which column correlation should be preserved based on availability of
* indices and other heuristics.
*
* When push down of filters with LHS expressions in the join condition is supported, this method should also
* consider expressions.
*
* @param originalList Original list of column correlation analyses.
*
* @return Pruned list of column correlation analyses.
*/
private static List<JoinFilterColumnCorrelationAnalysis> eliminateCorrelationDuplicates(
List<JoinFilterColumnCorrelationAnalysis> originalList
)
{
Map<Set<String>, JoinFilterColumnCorrelationAnalysis> uniquesMap = new HashMap<>();
for (JoinFilterColumnCorrelationAnalysis jca : originalList) {
Set<String> mapKey = new HashSet<>(jca.getBaseColumns());
for (Expr expr : jca.getBaseExpressions()) {
mapKey.add(expr.stringify());
}
uniquesMap.put(mapKey, jca);
}
return new ArrayList<>(uniquesMap.values());
}
private static boolean isColumnFromPostJoinVirtualColumns(
List<VirtualColumn> postJoinVirtualColumns,
String column

View File

@ -0,0 +1,407 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join.filter;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.math.expr.Expr;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.join.Equality;
import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.JoinableClause;
import org.apache.druid.segment.join.filter.rewrite.RhsRewriteCandidate;
import org.apache.druid.segment.join.filter.rewrite.RhsRewriteCandidates;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
* A wrapper class for correlation analyses of different filters involved in the query. It contains:
*
* - A mapping of RHS filtering columns -> List<JoinFilterColumnCorrelationAnalysis>, used for filter rewrites
* - A second mapping of RHS filtering columns -> List<JoinFilterColumnCorrelationAnalysis>, used for direct filter rewrites
*/
public class JoinFilterCorrelations
{
private final Map<String, List<JoinFilterColumnCorrelationAnalysis>> correlationsByFilteringColumn;
private final Map<String, List<JoinFilterColumnCorrelationAnalysis>> correlationsByDirectFilteringColumn;
private JoinFilterCorrelations(
Map<String, List<JoinFilterColumnCorrelationAnalysis>> correlationsByFilteringColumn,
Map<String, List<JoinFilterColumnCorrelationAnalysis>> correlationsByDirectFilteringColumn
)
{
this.correlationsByFilteringColumn = correlationsByFilteringColumn;
this.correlationsByDirectFilteringColumn = correlationsByDirectFilteringColumn;
}
public Map<String, List<JoinFilterColumnCorrelationAnalysis>> getCorrelationsByFilteringColumn()
{
return correlationsByFilteringColumn;
}
public Map<String, List<JoinFilterColumnCorrelationAnalysis>> getCorrelationsByDirectFilteringColumn()
{
return correlationsByDirectFilteringColumn;
}
public static JoinFilterCorrelations computeJoinFilterCorrelations(
List<Filter> normalizedJoinTableClauses,
Equiconditions equiconditions,
JoinableClauses joinableClauses,
boolean enableRewriteValueColumnFilters,
long filterRewriteMaxSize
)
{
RhsRewriteCandidates rhsRewriteCandidates =
RhsRewriteCandidates.getRhsRewriteCandidates(normalizedJoinTableClauses, equiconditions, joinableClauses);
// Build a map of RHS table prefix -> JoinFilterColumnCorrelationAnalysis based on the RHS rewrite candidates
Map<String, Optional<Map<String, JoinFilterColumnCorrelationAnalysis>>> correlationsByPrefix = new HashMap<>();
Map<String, Optional<JoinFilterColumnCorrelationAnalysis>> directRewriteCorrelations = new HashMap<>();
for (RhsRewriteCandidate rhsRewriteCandidate : rhsRewriteCandidates.getRhsRewriteCandidates()) {
if (rhsRewriteCandidate.isDirectRewrite()) {
directRewriteCorrelations.computeIfAbsent(
rhsRewriteCandidate.getRhsColumn(),
c -> {
Optional<Map<String, JoinFilterColumnCorrelationAnalysis>> correlatedBaseTableColumns =
findCorrelatedBaseTableColumns(
joinableClauses,
c,
rhsRewriteCandidate,
equiconditions
);
if (!correlatedBaseTableColumns.isPresent()) {
return Optional.empty();
} else {
JoinFilterColumnCorrelationAnalysis baseColumnAnalysis = correlatedBaseTableColumns.get().get(c);
// for direct rewrites, there will only be one analysis keyed by the RHS column
assert (baseColumnAnalysis != null);
return Optional.of(correlatedBaseTableColumns.get().get(c));
}
}
);
} else {
correlationsByPrefix.computeIfAbsent(
rhsRewriteCandidate.getJoinableClause().getPrefix(),
p -> findCorrelatedBaseTableColumns(
joinableClauses,
p,
rhsRewriteCandidate,
equiconditions
)
);
}
}
// Using the RHS table prefix -> JoinFilterColumnCorrelationAnalysis created in the previous step,
// build a map of rhsFilterColumn -> Pair(rhsFilterColumn, rhsFilterValue) -> correlatedValues for specific filter pair
// The Pair(rhsFilterColumn, rhsFilterValue) -> correlatedValues mappings are stored in the
// JoinFilterColumnCorrelationAnalysis objects, which are shared across all rhsFilterColumn entries that belong
// to the same RHS table.
//
// The value is a List<JoinFilterColumnCorrelationAnalysis> instead of a single value because a table can be joined
// to another via multiple columns.
// (See JoinFilterAnalyzerTest.test_filterPushDown_factToRegionOneColumnToTwoRHSColumnsAndFilterOnRHS for an example)
Map<String, List<JoinFilterColumnCorrelationAnalysis>> correlationsByFilteringColumn = new LinkedHashMap<>();
Map<String, List<JoinFilterColumnCorrelationAnalysis>> correlationsByDirectFilteringColumn = new LinkedHashMap<>();
for (RhsRewriteCandidate rhsRewriteCandidate : rhsRewriteCandidates.getRhsRewriteCandidates()) {
if (rhsRewriteCandidate.isDirectRewrite()) {
List<JoinFilterColumnCorrelationAnalysis> perColumnCorrelations =
correlationsByDirectFilteringColumn.computeIfAbsent(
rhsRewriteCandidate.getRhsColumn(),
(rhsCol) -> new ArrayList<>()
);
perColumnCorrelations.add(
directRewriteCorrelations.get(rhsRewriteCandidate.getRhsColumn()).get()
);
continue;
}
Optional<Map<String, JoinFilterColumnCorrelationAnalysis>> correlationsForPrefix = correlationsByPrefix.get(
rhsRewriteCandidate.getJoinableClause().getPrefix()
);
if (correlationsForPrefix.isPresent()) {
for (Map.Entry<String, JoinFilterColumnCorrelationAnalysis> correlationForPrefix : correlationsForPrefix.get()
.entrySet()) {
List<JoinFilterColumnCorrelationAnalysis> perColumnCorrelations =
correlationsByFilteringColumn.computeIfAbsent(
rhsRewriteCandidate.getRhsColumn(),
(rhsCol) -> new ArrayList<>()
);
perColumnCorrelations.add(correlationForPrefix.getValue());
correlationForPrefix.getValue().getCorrelatedValuesMap().computeIfAbsent(
Pair.of(rhsRewriteCandidate.getRhsColumn(), rhsRewriteCandidate.getValueForRewrite()),
(rhsVal) -> {
Optional<Set<String>> correlatedValues = getCorrelatedValuesForPushDown(
rhsRewriteCandidate.getRhsColumn(),
rhsRewriteCandidate.getValueForRewrite(),
correlationForPrefix.getValue().getJoinColumn(),
rhsRewriteCandidate.getJoinableClause(),
enableRewriteValueColumnFilters,
filterRewriteMaxSize
);
return correlatedValues;
}
);
}
} else {
correlationsByFilteringColumn.put(rhsRewriteCandidate.getRhsColumn(), null);
}
}
// Go through each per-column analysis list and prune duplicates
for (Map.Entry<String, List<JoinFilterColumnCorrelationAnalysis>> correlation : correlationsByFilteringColumn
.entrySet()) {
if (correlation.getValue() != null) {
List<JoinFilterColumnCorrelationAnalysis> dedupList = eliminateCorrelationDuplicates(
correlation.getValue()
);
correlationsByFilteringColumn.put(correlation.getKey(), dedupList);
}
}
for (Map.Entry<String, List<JoinFilterColumnCorrelationAnalysis>> correlation : correlationsByDirectFilteringColumn
.entrySet()) {
if (correlation.getValue() != null) {
List<JoinFilterColumnCorrelationAnalysis> dedupList = eliminateCorrelationDuplicates(
correlation.getValue()
);
correlationsByDirectFilteringColumn.put(correlation.getKey(), dedupList);
}
}
return new JoinFilterCorrelations(correlationsByFilteringColumn, correlationsByDirectFilteringColumn);
}
/**
* Given a list of JoinFilterColumnCorrelationAnalysis, prune the list so that we only have one
* JoinFilterColumnCorrelationAnalysis for each unique combination of base columns.
* <p>
* Suppose we have a join condition like the following, where A is the base table:
* A.joinColumn == B.joinColumn && A.joinColumn == B.joinColumn2
* <p>
* We only need to consider one correlation to A.joinColumn since B.joinColumn and B.joinColumn2 must
* have the same value in any row that matches the join condition.
* <p>
* In the future this method could consider which column correlation should be preserved based on availability of
* indices and other heuristics.
* <p>
* When push down of filters with LHS expressions in the join condition is supported, this method should also
* consider expressions.
*
* @param originalList Original list of column correlation analyses.
* @return Pruned list of column correlation analyses.
*/
private static List<JoinFilterColumnCorrelationAnalysis> eliminateCorrelationDuplicates(
List<JoinFilterColumnCorrelationAnalysis> originalList
)
{
Map<Set<String>, JoinFilterColumnCorrelationAnalysis> uniquesMap = new HashMap<>();
for (JoinFilterColumnCorrelationAnalysis jca : originalList) {
Set<String> mapKey = new HashSet<>(jca.getBaseColumns());
for (Expr expr : jca.getBaseExpressions()) {
mapKey.add(expr.stringify());
}
uniquesMap.put(mapKey, jca);
}
return new ArrayList<>(uniquesMap.values());
}
/**
* Helper method for rewriting filters on join table columns into filters on base table columns.
*
* @param filterColumn A join table column that we're filtering on
* @param filterValue The value to filter on
* @param correlatedJoinColumn A join table column that appears as the RHS of an equicondition, which we can correlate
* with a column on the base table
* @param clauseForFilteredTable The joinable clause that corresponds to the join table being filtered on
* @return A list of values of the correlatedJoinColumn that appear in rows where filterColumn = filterValue
* Returns absent if we cannot determine the correlated values.
*/
private static Optional<Set<String>> getCorrelatedValuesForPushDown(
String filterColumn,
String filterValue,
String correlatedJoinColumn,
JoinableClause clauseForFilteredTable,
boolean enableRewriteValueColumnFilters,
long filterRewriteMaxSize
)
{
String filterColumnNoPrefix = filterColumn.substring(clauseForFilteredTable.getPrefix().length());
String correlatedColumnNoPrefix = correlatedJoinColumn.substring(clauseForFilteredTable.getPrefix().length());
return clauseForFilteredTable.getJoinable().getCorrelatedColumnValues(
filterColumnNoPrefix,
filterValue,
correlatedColumnNoPrefix,
filterRewriteMaxSize,
enableRewriteValueColumnFilters
);
}
/**
* For each rhs column that appears in the equiconditions for a table's JoinableClause,
* we try to determine what base table columns are related to the rhs column through the total set of equiconditions.
* We do this by searching backwards through the chain of join equiconditions using the provided equicondition map.
* <p>
* For example, suppose we have 3 tables, A,B,C, joined with the following conditions, where A is the base table:
* A.joinColumn == B.joinColumn
* B.joinColum == C.joinColumnenableRewriteValueColumnFilters
* <p>
* We would determine that C.joinColumn is correlated with A.joinColumn: we first see that
* C.joinColumn is linked to B.joinColumn which in turn is linked to A.joinColumn
* <p>
* Suppose we had the following join conditions instead:
* f(A.joinColumn) == B.joinColumn
* B.joinColum == C.joinColumn
* In this case, the JoinFilterColumnCorrelationAnalysis for C.joinColumn would be linked to f(A.joinColumn).
* <p>
* Suppose we had the following join conditions instead:
* A.joinColumn == B.joinColumn
* f(B.joinColum) == C.joinColumn
* <p>
* Because we cannot reverse the function f() applied to the second table B in all cases,
* we cannot relate C.joinColumn to A.joinColumn, and we would not generate a correlation for C.joinColumn
*
* @param joinableClauses List of joinable clauses for the query
* @param tablePrefix Prefix for a join table
* @param rhsRewriteCandidate RHS rewrite candidate that we find correlated base table columns for
* @param equiConditions Map of equiconditions, keyed by the right hand columns
* @return A list of correlatation analyses for the equicondition RHS columns that reside in the table associated with
* the tablePrefix
*/
private static Optional<Map<String, JoinFilterColumnCorrelationAnalysis>> findCorrelatedBaseTableColumns(
JoinableClauses joinableClauses,
String tablePrefix,
RhsRewriteCandidate rhsRewriteCandidate,
Equiconditions equiConditions
)
{
JoinableClause clauseForTablePrefix = rhsRewriteCandidate.getJoinableClause();
JoinConditionAnalysis jca = clauseForTablePrefix.getCondition();
Set<String> rhsColumns = new HashSet<>();
if (rhsRewriteCandidate.isDirectRewrite()) {
// If we filter on a RHS join column, we only need to consider that column from the RHS side
rhsColumns.add(rhsRewriteCandidate.getRhsColumn());
} else {
for (Equality eq : jca.getEquiConditions()) {
rhsColumns.add(tablePrefix + eq.getRightColumn());
}
}
Map<String, JoinFilterColumnCorrelationAnalysis> correlations = new LinkedHashMap<>();
for (String rhsColumn : rhsColumns) {
Set<String> correlatedBaseColumns = new HashSet<>();
Set<Expr> correlatedBaseExpressions = new HashSet<>();
getCorrelationForRHSColumn(
joinableClauses,
equiConditions,
rhsColumn,
correlatedBaseColumns,
correlatedBaseExpressions
);
if (correlatedBaseColumns.isEmpty() && correlatedBaseExpressions.isEmpty()) {
continue;
}
correlations.put(
rhsColumn,
new JoinFilterColumnCorrelationAnalysis(
rhsColumn,
correlatedBaseColumns,
correlatedBaseExpressions
)
);
}
if (correlations.size() == 0) {
return Optional.empty();
} else {
return Optional.of(correlations);
}
}
/**
* Helper method for {@link #findCorrelatedBaseTableColumns} that determines correlated base table columns
* and/or expressions for a single RHS column and adds them to the provided sets as it traverses the
* equicondition column relationships.
*
* @param equiConditions Map of equiconditions, keyed by the right hand columns
* @param rhsColumn RHS column to find base table correlations for
* @param correlatedBaseColumns Set of correlated base column names for the provided RHS column. Will be modified.
* @param correlatedBaseExpressions Set of correlated base column expressions for the provided RHS column. Will be
* modified.
*/
private static void getCorrelationForRHSColumn(
JoinableClauses joinableClauses,
Equiconditions equiConditions,
String rhsColumn,
Set<String> correlatedBaseColumns,
Set<Expr> correlatedBaseExpressions
)
{
String findMappingFor = rhsColumn;
Set<Expr> lhsExprs = equiConditions.getLhsExprs(findMappingFor);
if (lhsExprs == null) {
return;
}
for (Expr lhsExpr : lhsExprs) {
String identifier = lhsExpr.getBindingIfIdentifier();
if (identifier == null) {
// We push down if the function only requires base table columns
Expr.BindingDetails bindingDetails = lhsExpr.analyzeInputs();
Set<String> requiredBindings = bindingDetails.getRequiredBindings();
if (joinableClauses.areSomeColumnsFromJoin(requiredBindings)) {
break;
}
correlatedBaseExpressions.add(lhsExpr);
} else {
// simple identifier, see if we can correlate it with a column on the base table
findMappingFor = identifier;
if (joinableClauses.getColumnFromJoinIfExists(identifier) == null) {
correlatedBaseColumns.add(findMappingFor);
} else {
getCorrelationForRHSColumn(
joinableClauses,
equiConditions,
findMappingFor,
correlatedBaseColumns,
correlatedBaseExpressions
);
}
}
}
}
}

View File

@ -41,8 +41,6 @@ import java.util.Set;
* - The query's original filter (if any)
* - A list of filter clauses from the original filter's CNF representation that only reference the base table
* - A list of filter clauses from the original filter's CNF representation that reference RHS join tables
* - A mapping of RHS filtering columns -> List<JoinFilterColumnCorrelationAnalysis>, used for filter rewrites
* - A second mapping of RHS filtering columns -> List<JoinFilterColumnCorrelationAnalysis>, used for direct filter rewrites
* - A list of virtual columns that can only be computed post-join
* - Control flag booleans for whether filter push down and RHS rewrites are enabled.
*/
@ -52,8 +50,7 @@ public class JoinFilterPreAnalysis
private final Filter originalFilter;
private final List<Filter> normalizedBaseTableClauses;
private final List<Filter> normalizedJoinTableClauses;
private final Map<String, List<JoinFilterColumnCorrelationAnalysis>> correlationsByFilteringColumn;
private final Map<String, List<JoinFilterColumnCorrelationAnalysis>> correlationsByDirectFilteringColumn;
private final JoinFilterCorrelations correlations;
private final boolean enableFilterPushDown;
private final boolean enableFilterRewrite;
private final List<VirtualColumn> postJoinVirtualColumns;
@ -65,8 +62,7 @@ public class JoinFilterPreAnalysis
final List<VirtualColumn> postJoinVirtualColumns,
final List<Filter> normalizedBaseTableClauses,
final List<Filter> normalizedJoinTableClauses,
final Map<String, List<JoinFilterColumnCorrelationAnalysis>> correlationsByFilteringColumn,
final Map<String, List<JoinFilterColumnCorrelationAnalysis>> correlationsByDirectFilteringColumn,
JoinFilterCorrelations correlations,
final boolean enableFilterPushDown,
final boolean enableFilterRewrite,
final Equiconditions equiconditions
@ -77,8 +73,7 @@ public class JoinFilterPreAnalysis
this.postJoinVirtualColumns = postJoinVirtualColumns;
this.normalizedBaseTableClauses = normalizedBaseTableClauses;
this.normalizedJoinTableClauses = normalizedJoinTableClauses;
this.correlationsByFilteringColumn = correlationsByFilteringColumn;
this.correlationsByDirectFilteringColumn = correlationsByDirectFilteringColumn;
this.correlations = correlations;
this.enableFilterPushDown = enableFilterPushDown;
this.enableFilterRewrite = enableFilterRewrite;
this.equiconditions = equiconditions;
@ -111,12 +106,12 @@ public class JoinFilterPreAnalysis
public Map<String, List<JoinFilterColumnCorrelationAnalysis>> getCorrelationsByFilteringColumn()
{
return correlationsByFilteringColumn;
return correlations.getCorrelationsByFilteringColumn();
}
public Map<String, List<JoinFilterColumnCorrelationAnalysis>> getCorrelationsByDirectFilteringColumn()
{
return correlationsByDirectFilteringColumn;
return correlations.getCorrelationsByDirectFilteringColumn();
}
public boolean isEnableFilterPushDown()
@ -143,8 +138,7 @@ public class JoinFilterPreAnalysis
@Nullable private final Filter originalFilter;
@Nullable private List<Filter> normalizedBaseTableClauses;
@Nullable private List<Filter> normalizedJoinTableClauses;
@Nullable private Map<String, List<JoinFilterColumnCorrelationAnalysis>> correlationsByFilteringColumn;
@Nullable private Map<String, List<JoinFilterColumnCorrelationAnalysis>> correlationsByDirectFilteringColumn;
@Nullable private JoinFilterCorrelations correlations;
private boolean enableFilterPushDown = false;
private boolean enableFilterRewrite = false;
@Nonnull private final List<VirtualColumn> postJoinVirtualColumns;
@ -173,19 +167,11 @@ public class JoinFilterPreAnalysis
return this;
}
public Builder withCorrelationsByFilteringColumn(
Map<String, List<JoinFilterColumnCorrelationAnalysis>> correlationsByFilteringColumn
public Builder withCorrelations(
JoinFilterCorrelations correlations
)
{
this.correlationsByFilteringColumn = correlationsByFilteringColumn;
return this;
}
public Builder withCorrelationsByDirectFilteringColumn(
Map<String, List<JoinFilterColumnCorrelationAnalysis>> correlationsByDirectFilteringColumn
)
{
this.correlationsByDirectFilteringColumn = correlationsByDirectFilteringColumn;
this.correlations = correlations;
return this;
}
@ -225,8 +211,7 @@ public class JoinFilterPreAnalysis
postJoinVirtualColumns,
normalizedBaseTableClauses,
normalizedJoinTableClauses,
correlationsByFilteringColumn,
correlationsByDirectFilteringColumn,
correlations,
enableFilterPushDown,
enableFilterRewrite,
equiconditions

View File

@ -33,6 +33,7 @@ import org.apache.druid.segment.join.Joinable;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
public class LookupJoinable implements Joinable
@ -88,7 +89,7 @@ public class LookupJoinable implements Joinable
}
@Override
public Set<String> getCorrelatedColumnValues(
public Optional<Set<String>> getCorrelatedColumnValues(
String searchColumnName,
String searchColumnValue,
String retrievalColumnName,
@ -97,7 +98,7 @@ public class LookupJoinable implements Joinable
)
{
if (!ALL_COLUMNS.contains(searchColumnName) || !ALL_COLUMNS.contains(retrievalColumnName)) {
return ImmutableSet.of();
return Optional.empty();
}
Set<String> correlatedValues;
if (LookupColumnSelectorFactory.KEY_COLUMN.equals(searchColumnName)) {
@ -109,7 +110,7 @@ public class LookupJoinable implements Joinable
}
} else {
if (!allowNonKeyColumnSearch) {
return ImmutableSet.of();
return Optional.empty();
}
if (LookupColumnSelectorFactory.VALUE_COLUMN.equals(retrievalColumnName)) {
// This should not happen in practice because the column to be joined on must be a key.
@ -120,6 +121,6 @@ public class LookupJoinable implements Joinable
correlatedValues = ImmutableSet.copyOf(extractor.unapply(searchColumnValue));
}
}
return correlatedValues;
return Optional.of(correlatedValues);
}
}

View File

@ -19,7 +19,6 @@
package org.apache.druid.segment.join.table;
import com.google.common.collect.ImmutableSet;
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
@ -31,6 +30,7 @@ import javax.annotation.Nullable;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
public class IndexedTableJoinable implements Joinable
@ -82,7 +82,7 @@ public class IndexedTableJoinable implements Joinable
}
@Override
public Set<String> getCorrelatedColumnValues(
public Optional<Set<String>> getCorrelatedColumnValues(
String searchColumnName,
String searchColumnValue,
String retrievalColumnName,
@ -94,7 +94,7 @@ public class IndexedTableJoinable implements Joinable
int correlatedColumnPosition = table.rowSignature().indexOf(retrievalColumnName);
if (filterColumnPosition < 0 || correlatedColumnPosition < 0) {
return ImmutableSet.of();
return Optional.empty();
}
Set<String> correlatedValues = new HashSet<>();
@ -108,13 +108,13 @@ public class IndexedTableJoinable implements Joinable
correlatedValues.add(correlatedDimVal);
if (correlatedValues.size() > maxCorrelationSetSize) {
return ImmutableSet.of();
return Optional.empty();
}
}
return correlatedValues;
return Optional.of(correlatedValues);
} else {
if (!allowNonKeyColumnSearch) {
return ImmutableSet.of();
return Optional.empty();
}
IndexedTable.Reader dimNameReader = table.columnReader(filterColumnPosition);
@ -125,12 +125,12 @@ public class IndexedTableJoinable implements Joinable
String correlatedDimVal = Objects.toString(correlatedColumnReader.read(i), null);
correlatedValues.add(correlatedDimVal);
if (correlatedValues.size() > maxCorrelationSetSize) {
return ImmutableSet.of();
return Optional.empty();
}
}
}
return correlatedValues;
return Optional.of(correlatedValues);
}
}
}

View File

@ -39,6 +39,7 @@ import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.filter.AndFilter;
import org.apache.druid.segment.filter.BoundFilter;
import org.apache.druid.segment.filter.FalseFilter;
import org.apache.druid.segment.filter.OrFilter;
import org.apache.druid.segment.filter.SelectorFilter;
import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
@ -1262,6 +1263,60 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
);
}
@Test
public void test_filterPushDown_factToCountryRightWithFilterOnValueThatMatchesNothing()
{
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryOnIsoCode(JoinType.RIGHT));
Filter originalFilter = new AndFilter(
ImmutableList.of(
new SelectorFilter("channel", null),
new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName", "NO MATCH")
)
);
JoinFilterPreAnalysis joinFilterPreAnalysis = simplePreAnalysis(
joinableClauses,
originalFilter
);
HashJoinSegmentStorageAdapter adapter = new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
joinFilterPreAnalysis
);
JoinFilterSplit expectedFilterSplit = new JoinFilterSplit(
FalseFilter.instance(),
new AndFilter(
ImmutableList.of(
new SelectorFilter("channel", null),
new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName", "NO MATCH")
)
),
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
JoinTestHelper.verifyCursors(
adapter.makeCursors(
originalFilter,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
),
ImmutableList.of(
"page",
"countryIsoCode",
"countryNumber",
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryIsoCode",
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName",
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryNumber"
),
ImmutableList.of()
);
}
@Test
public void test_filterPushDown_factToCountryRightWithFilterOnNullColumnsUsingLookup()
{

View File

@ -36,6 +36,7 @@ import org.mockito.junit.MockitoJUnitRunner;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
@RunWith(MockitoJUnitRunner.class)
@ -117,7 +118,7 @@ public class LookupJoinableTest
@Test
public void getCorrelatedColummnValuesMissingSearchColumnShouldReturnEmptySet()
{
Set<String> correlatedValues =
Optional<Set<String>> correlatedValues =
target.getCorrelatedColumnValues(
UNKNOWN_COLUMN,
SEARCH_KEY_VALUE,
@ -125,13 +126,13 @@ public class LookupJoinableTest
0,
false);
Assert.assertEquals(Collections.emptySet(), correlatedValues);
Assert.assertFalse(correlatedValues.isPresent());
}
@Test
public void getCorrelatedColummnValuesMissingRetrievalColumnShouldReturnEmptySet()
{
Set<String> correlatedValues =
Optional<Set<String>> correlatedValues =
target.getCorrelatedColumnValues(
LookupColumnSelectorFactory.KEY_COLUMN,
SEARCH_KEY_VALUE,
@ -139,85 +140,85 @@ public class LookupJoinableTest
0,
false);
Assert.assertEquals(Collections.emptySet(), correlatedValues);
Assert.assertFalse(correlatedValues.isPresent());
}
@Test
public void getCorrelatedColumnValuesForSearchKeyAndRetrieveKeyColumnShouldReturnSearchValue()
{
Set<String> correlatedValues = target.getCorrelatedColumnValues(
Optional<Set<String>> correlatedValues = target.getCorrelatedColumnValues(
LookupColumnSelectorFactory.KEY_COLUMN,
SEARCH_KEY_VALUE,
LookupColumnSelectorFactory.KEY_COLUMN,
0,
false);
Assert.assertEquals(ImmutableSet.of(SEARCH_KEY_VALUE), correlatedValues);
Assert.assertEquals(Optional.of(ImmutableSet.of(SEARCH_KEY_VALUE)), correlatedValues);
}
@Test
public void getCorrelatedColumnValuesForSearchKeyAndRetrieveValueColumnShouldReturnExtractedValue()
{
Set<String> correlatedValues = target.getCorrelatedColumnValues(
Optional<Set<String>> correlatedValues = target.getCorrelatedColumnValues(
LookupColumnSelectorFactory.KEY_COLUMN,
SEARCH_KEY_VALUE,
LookupColumnSelectorFactory.VALUE_COLUMN,
0,
false);
Assert.assertEquals(ImmutableSet.of(SEARCH_VALUE_VALUE), correlatedValues);
Assert.assertEquals(Optional.of(ImmutableSet.of(SEARCH_VALUE_VALUE)), correlatedValues);
}
@Test
public void getCorrelatedColumnValuesForSearchKeyMissingAndRetrieveValueColumnShouldReturnExtractedValue()
{
Set<String> correlatedValues = target.getCorrelatedColumnValues(
Optional<Set<String>> correlatedValues = target.getCorrelatedColumnValues(
LookupColumnSelectorFactory.KEY_COLUMN,
SEARCH_KEY_NULL_VALUE,
LookupColumnSelectorFactory.VALUE_COLUMN,
0,
false);
Assert.assertEquals(Collections.singleton(null), correlatedValues);
Assert.assertEquals(Optional.of(Collections.singleton(null)), correlatedValues);
}
@Test
public void getCorrelatedColumnValuesForSearchValueAndRetrieveValueColumnAndNonKeyColumnSearchDisabledShouldReturnSearchValue()
{
Set<String> correlatedValues = target.getCorrelatedColumnValues(
Optional<Set<String>> correlatedValues = target.getCorrelatedColumnValues(
LookupColumnSelectorFactory.VALUE_COLUMN,
SEARCH_VALUE_VALUE,
LookupColumnSelectorFactory.VALUE_COLUMN,
10,
false);
Assert.assertEquals(ImmutableSet.of(), correlatedValues);
Assert.assertEquals(Optional.empty(), correlatedValues);
correlatedValues = target.getCorrelatedColumnValues(
LookupColumnSelectorFactory.VALUE_COLUMN,
SEARCH_VALUE_VALUE,
LookupColumnSelectorFactory.KEY_COLUMN,
10,
false);
Assert.assertEquals(ImmutableSet.of(), correlatedValues);
Assert.assertEquals(Optional.empty(), correlatedValues);
}
@Test
public void getCorrelatedColumnValuesForSearchValueAndRetrieveValueColumnShouldReturnSearchValue()
{
Set<String> correlatedValues = target.getCorrelatedColumnValues(
Optional<Set<String>> correlatedValues = target.getCorrelatedColumnValues(
LookupColumnSelectorFactory.VALUE_COLUMN,
SEARCH_VALUE_VALUE,
LookupColumnSelectorFactory.VALUE_COLUMN,
0,
true);
Assert.assertEquals(ImmutableSet.of(SEARCH_VALUE_VALUE), correlatedValues);
Assert.assertEquals(Optional.of(ImmutableSet.of(SEARCH_VALUE_VALUE)), correlatedValues);
}
@Test
public void getCorrelatedColumnValuesForSearchValueAndRetrieveKeyColumnShouldReturnUnAppliedValue()
{
Set<String> correlatedValues = target.getCorrelatedColumnValues(
Optional<Set<String>> correlatedValues = target.getCorrelatedColumnValues(
LookupColumnSelectorFactory.VALUE_COLUMN,
SEARCH_VALUE_VALUE,
LookupColumnSelectorFactory.KEY_COLUMN,
10,
true);
Assert.assertEquals(ImmutableSet.of(SEARCH_KEY_VALUE), correlatedValues);
Assert.assertEquals(Optional.of(ImmutableSet.of(SEARCH_KEY_VALUE)), correlatedValues);
}
@Test
@ -228,24 +229,24 @@ public class LookupJoinableTest
*/
public void getCorrelatedColumnValuesForSearchValueAndRetrieveKeyColumnWithMaxLimitSetShouldHonorMaxLimit()
{
Set<String> correlatedValues = target.getCorrelatedColumnValues(
Optional<Set<String>> correlatedValues = target.getCorrelatedColumnValues(
LookupColumnSelectorFactory.VALUE_COLUMN,
SEARCH_VALUE_VALUE,
LookupColumnSelectorFactory.KEY_COLUMN,
0,
true);
Assert.assertEquals(ImmutableSet.of(), correlatedValues);
Assert.assertEquals(Optional.empty(), correlatedValues);
}
@Test
public void getCorrelatedColumnValuesForSearchUnknownValueAndRetrieveKeyColumnShouldReturnNoCorrelatedValues()
{
Set<String> correlatedValues = target.getCorrelatedColumnValues(
Optional<Set<String>> correlatedValues = target.getCorrelatedColumnValues(
LookupColumnSelectorFactory.VALUE_COLUMN,
SEARCH_VALUE_UNKNOWN,
LookupColumnSelectorFactory.KEY_COLUMN,
10,
true);
Assert.assertEquals(ImmutableSet.of(), correlatedValues);
Assert.assertEquals(Optional.of(ImmutableSet.of()), correlatedValues);
}
}

View File

@ -40,6 +40,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
public class IndexedTableJoinableTest
@ -190,9 +191,9 @@ public class IndexedTableJoinableTest
}
@Test
public void getCorrelatedColummnValuesMissingSearchColumnShouldReturnEmptySet()
public void getCorrelatedColummnValuesMissingSearchColumnShouldReturnEmpty()
{
Set<String> correlatedValues =
Optional<Set<String>> correlatedValues =
target.getCorrelatedColumnValues(
UNKNOWN_COLUMN,
"foo",
@ -200,13 +201,13 @@ public class IndexedTableJoinableTest
MAX_CORRELATION_SET_SIZE,
false);
Assert.assertEquals(Collections.emptySet(), correlatedValues);
Assert.assertEquals(Optional.empty(), correlatedValues);
}
@Test
public void getCorrelatedColummnValuesMissingRetrievalColumnShouldReturnEmptySet()
public void getCorrelatedColummnValuesMissingRetrievalColumnShouldReturnEmpty()
{
Set<String> correlatedValues =
Optional<Set<String>> correlatedValues =
target.getCorrelatedColumnValues(
KEY_COLUMN,
"foo",
@ -214,121 +215,121 @@ public class IndexedTableJoinableTest
MAX_CORRELATION_SET_SIZE,
false);
Assert.assertEquals(Collections.emptySet(), correlatedValues);
Assert.assertEquals(Optional.empty(), correlatedValues);
}
@Test
public void getCorrelatedColumnValuesForSearchKeyAndRetrieveKeyColumnShouldReturnSearchValue()
{
Set<String> correlatedValues = target.getCorrelatedColumnValues(
Optional<Set<String>> correlatedValues = target.getCorrelatedColumnValues(
KEY_COLUMN,
SEARCH_KEY_VALUE,
KEY_COLUMN,
MAX_CORRELATION_SET_SIZE,
false);
Assert.assertEquals(ImmutableSet.of(SEARCH_KEY_VALUE), correlatedValues);
Assert.assertEquals(Optional.of(ImmutableSet.of(SEARCH_KEY_VALUE)), correlatedValues);
}
@Test
public void getCorrelatedColumnValuesForSearchKeyAndRetrieveKeyColumnAboveLimitShouldReturnEmptySet()
public void getCorrelatedColumnValuesForSearchKeyAndRetrieveKeyColumnAboveLimitShouldReturnEmpty()
{
Set<String> correlatedValues = target.getCorrelatedColumnValues(
Optional<Set<String>> correlatedValues = target.getCorrelatedColumnValues(
KEY_COLUMN,
SEARCH_KEY_VALUE,
KEY_COLUMN,
0,
false);
Assert.assertEquals(ImmutableSet.of(), correlatedValues);
Assert.assertEquals(Optional.empty(), correlatedValues);
}
@Test
public void getCorrelatedColumnValuesForSearchKeyAndRetrieveValueColumnShouldReturnExtractedValue()
{
Set<String> correlatedValues = target.getCorrelatedColumnValues(
Optional<Set<String>> correlatedValues = target.getCorrelatedColumnValues(
KEY_COLUMN,
SEARCH_KEY_VALUE,
VALUE_COLUMN,
MAX_CORRELATION_SET_SIZE,
false);
Assert.assertEquals(ImmutableSet.of(SEARCH_VALUE_VALUE), correlatedValues);
Assert.assertEquals(Optional.of(ImmutableSet.of(SEARCH_VALUE_VALUE)), correlatedValues);
}
@Test
public void getCorrelatedColumnValuesForSearchKeyMissingAndRetrieveValueColumnShouldReturnExtractedValue()
{
Set<String> correlatedValues = target.getCorrelatedColumnValues(
Optional<Set<String>> correlatedValues = target.getCorrelatedColumnValues(
KEY_COLUMN,
SEARCH_KEY_NULL_VALUE,
VALUE_COLUMN,
MAX_CORRELATION_SET_SIZE,
false);
Assert.assertEquals(Collections.singleton(null), correlatedValues);
Assert.assertEquals(Optional.of(Collections.singleton(null)), correlatedValues);
}
@Test
public void getCorrelatedColumnValuesForSearchValueAndRetrieveValueColumnAndNonKeyColumnSearchDisabledShouldReturnSearchValue()
public void getCorrelatedColumnValuesForSearchValueAndRetrieveValueColumnAndNonKeyColumnSearchDisabledShouldReturnEmpty()
{
Set<String> correlatedValues = target.getCorrelatedColumnValues(
Optional<Set<String>> correlatedValues = target.getCorrelatedColumnValues(
VALUE_COLUMN,
SEARCH_VALUE_VALUE,
VALUE_COLUMN,
MAX_CORRELATION_SET_SIZE,
false);
Assert.assertEquals(ImmutableSet.of(), correlatedValues);
Assert.assertEquals(Optional.empty(), correlatedValues);
correlatedValues = target.getCorrelatedColumnValues(
VALUE_COLUMN,
SEARCH_VALUE_VALUE,
KEY_COLUMN,
10,
false);
Assert.assertEquals(ImmutableSet.of(), correlatedValues);
Assert.assertEquals(Optional.empty(), correlatedValues);
}
@Test
public void getCorrelatedColumnValuesForSearchValueAndRetrieveValueColumnShouldReturnSearchValue()
{
Set<String> correlatedValues = target.getCorrelatedColumnValues(
Optional<Set<String>> correlatedValues = target.getCorrelatedColumnValues(
VALUE_COLUMN,
SEARCH_VALUE_VALUE,
VALUE_COLUMN,
MAX_CORRELATION_SET_SIZE,
true);
Assert.assertEquals(ImmutableSet.of(SEARCH_VALUE_VALUE), correlatedValues);
Assert.assertEquals(Optional.of(ImmutableSet.of(SEARCH_VALUE_VALUE)), correlatedValues);
}
@Test
public void getCorrelatedColumnValuesForSearchValueAndRetrieveKeyColumnShouldReturnUnAppliedValue()
{
Set<String> correlatedValues = target.getCorrelatedColumnValues(
Optional<Set<String>> correlatedValues = target.getCorrelatedColumnValues(
VALUE_COLUMN,
SEARCH_VALUE_VALUE,
KEY_COLUMN,
10,
true);
Assert.assertEquals(ImmutableSet.of(SEARCH_KEY_VALUE), correlatedValues);
Assert.assertEquals(Optional.of(ImmutableSet.of(SEARCH_KEY_VALUE)), correlatedValues);
}
@Test
public void getCorrelatedColumnValuesForSearchValueAndRetrieveKeyColumnWithMaxLimitSetShouldHonorMaxLimit()
{
Set<String> correlatedValues = target.getCorrelatedColumnValues(
Optional<Set<String>> correlatedValues = target.getCorrelatedColumnValues(
VALUE_COLUMN,
SEARCH_VALUE_VALUE,
KEY_COLUMN,
0,
true);
Assert.assertEquals(ImmutableSet.of(), correlatedValues);
Assert.assertEquals(Optional.empty(), correlatedValues);
}
@Test
public void getCorrelatedColumnValuesForSearchUnknownValueAndRetrieveKeyColumnShouldReturnNoCorrelatedValues()
{
Set<String> correlatedValues = target.getCorrelatedColumnValues(
Optional<Set<String>> correlatedValues = target.getCorrelatedColumnValues(
VALUE_COLUMN,
SEARCH_VALUE_UNKNOWN,
KEY_COLUMN,
10,
true);
Assert.assertEquals(ImmutableSet.of(), correlatedValues);
Assert.assertEquals(Optional.of(ImmutableSet.of()), correlatedValues);
}
}

View File

@ -162,7 +162,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
public static final String DUMMY_SQL_ID = "dummy";
public static final String LOS_ANGELES = "America/Los_Angeles";
static final ImmutableMap.Builder<String, Object> DEFAULT_QUERY_CONTEXT_BUILDER =
private static final ImmutableMap.Builder<String, Object> DEFAULT_QUERY_CONTEXT_BUILDER =
ImmutableMap.<String, Object>builder()
.put(PlannerContext.CTX_SQL_QUERY_ID, DUMMY_SQL_ID)
.put(PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z")

View File

@ -8085,6 +8085,42 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testFilterAndGroupByLookupUsingJoinOperatorWithValueFilterPushdownMatchesNothig(Map<String, Object> queryContext) throws Exception
{
// Cannot vectorize JOIN operator.
cannotVectorize();
testQuery(
"SELECT lookyloo.k, COUNT(*)\n"
+ "FROM foo LEFT JOIN lookup.lookyloo ON foo.dim2 = lookyloo.k\n"
+ "WHERE lookyloo.v = '123'\n"
+ "GROUP BY lookyloo.k",
queryContext,
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new LookupDataSource("lookyloo"),
"j0.",
equalsCondition(DruidExpression.fromColumn("dim2"), DruidExpression.fromColumn("j0.k")),
JoinType.LEFT
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setDimFilter(selector("j0.v", "123", null))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("j0.k", "d0")))
.setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
.setContext(queryContext)
.build()
),
ImmutableList.of()
);
}
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testFilterAndGroupByLookupUsingJoinOperatorAllowNulls(Map<String, Object> queryContext) throws Exception
@ -8217,6 +8253,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
{
// Cannot vectorize JOIN operator.
cannotVectorize();
testQuery(
"SELECT lookyloo.k, COUNT(*)\n"
+ "FROM foo LEFT JOIN lookup.lookyloo ON foo.dim2 = lookyloo.k\n"