From e03d38b6c83bd84b1752d1a51baf61b194234548 Mon Sep 17 00:00:00 2001 From: Suneet Saldanha <44787917+suneet-s@users.noreply.github.com> Date: Fri, 29 May 2020 16:53:03 -0700 Subject: [PATCH] Optimize join queries where filter matches nothing (#9931) * Refactor JoinFilterAnalyzer This patch attempts to make it easier to follow the join filter analysis code with the hope of making it easier to add rewrite optimizations in the future. To keep the patch small and easy to review, this is the first of at least 2 patches that are planned. This patch adds a builder to the Pre-Analysis, so that it is easier to instantiate the preAnalysis. It also moves some of the filter normalization code out to Fitlers with associated tests. * fix tests * Refactor JoinFilterAnalyzer - part 2 This change introduces the following components: * RhsRewriteCandidates - a wrapper for a list of candidates and associated functions to operate on the set of candidates. * JoinableClauses - a wrapper for the list of JoinableClause that represent a join condition and the associated functions to operate on the clauses. * Equiconditions - a wrapper representing the equiconditions that are used in the join condition. And associated test changes. This refactoring surfaced 2 bugs: - Missing equals and hashcode implementation for RhsRewriteCandidate, thus allowing potential duplicates in the rhs rewrite candidates - Missing Filter#supportsRequiredColumnRewrite check in analyzeJoinFilterClause, which could result in UnsupportedOperationException being thrown by the filter * fix compile error * remove unused class * Refactor JoinFilterAnalyzer - Correlations Move the correlation related code out into it's own class so it's easier to maintain. Another patch should follow this one so that the query path uses the correlation object instead of it's underlying maps. * Optimize join queries where filter matches nothing Fixes #9787 This PR changes the Joinable interface to return an Optional set of correlated values for a column. This allows the JoinFilterAnalyzer to differentiate between the case where the column has no matching values and when the column could not find matching values. This PR chose not to distinguish between cases where correlated values could not be computed because of a config that has this behavior disabled or because of user error - like a column that could not be found. The reasoning was that the latter is likely an error and the non filter pushdown path will surface the error if it is. --- .../apache/druid/segment/join/Joinable.java | 7 +- .../join/filter/JoinFilterAnalyzer.java | 366 +--------------- .../join/filter/JoinFilterCorrelations.java | 407 ++++++++++++++++++ .../join/filter/JoinFilterPreAnalysis.java | 35 +- .../segment/join/lookup/LookupJoinable.java | 9 +- .../join/table/IndexedTableJoinable.java | 16 +- .../segment/join/JoinFilterAnalyzerTest.java | 55 +++ .../join/lookup/LookupJoinableTest.java | 43 +- .../join/table/IndexedTableJoinableTest.java | 55 +-- .../sql/calcite/BaseCalciteQueryTest.java | 2 +- .../druid/sql/calcite/CalciteQueryTest.java | 37 ++ 11 files changed, 600 insertions(+), 432 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterCorrelations.java diff --git a/processing/src/main/java/org/apache/druid/segment/join/Joinable.java b/processing/src/main/java/org/apache/druid/segment/join/Joinable.java index 1ededff5e8b..001a0fccd21 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/Joinable.java +++ b/processing/src/main/java/org/apache/druid/segment/join/Joinable.java @@ -24,6 +24,7 @@ import org.apache.druid.segment.column.ColumnCapabilities; import javax.annotation.Nullable; import java.util.List; +import java.util.Optional; import java.util.Set; /** @@ -88,9 +89,11 @@ public interface Joinable * returned than this limit, return an empty set. * @param allowNonKeyColumnSearch If true, allow searchs on non-key columns. If this is false, * a search on a non-key column should return an empty set. - * @return The set of correlated column values. If we cannot determine correlated values, return an empty set. + * @return The set of correlated column values. If we cannot determine correlated values, return absent. + * + * In case either the search or retrieval column names are not found, this will return absent. */ - Set getCorrelatedColumnValues( + Optional> getCorrelatedColumnValues( String searchColumnName, String searchColumnValue, String retrievalColumnName, diff --git a/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalyzer.java b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalyzer.java index a4f9ba666d6..f4f7d31be1c 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalyzer.java +++ b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalyzer.java @@ -28,21 +28,16 @@ import org.apache.druid.query.filter.InDimFilter; import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.filter.FalseFilter; import org.apache.druid.segment.filter.Filters; import org.apache.druid.segment.filter.OrFilter; import org.apache.druid.segment.filter.SelectorFilter; -import org.apache.druid.segment.join.Equality; -import org.apache.druid.segment.join.JoinConditionAnalysis; -import org.apache.druid.segment.join.JoinableClause; -import org.apache.druid.segment.join.filter.rewrite.RhsRewriteCandidate; -import org.apache.druid.segment.join.filter.rewrite.RhsRewriteCandidates; import org.apache.druid.segment.virtual.ExpressionVirtualColumn; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -147,131 +142,16 @@ public class JoinFilterAnalyzer // build the equicondition map, used for determining how the tables are connected through joins Equiconditions equiconditions = preAnalysisBuilder.computeEquiconditionsFromJoinableClauses(); - RhsRewriteCandidates rhsRewriteCandidates = - RhsRewriteCandidates.getRhsRewriteCandidates(normalizedJoinTableClauses, equiconditions, joinableClauses); + JoinFilterCorrelations correlations = JoinFilterCorrelations.computeJoinFilterCorrelations( + normalizedJoinTableClauses, + equiconditions, + joinableClauses, + enableRewriteValueColumnFilters, + filterRewriteMaxSize + ); - // Build a map of RHS table prefix -> JoinFilterColumnCorrelationAnalysis based on the RHS rewrite candidates - Map>> correlationsByPrefix = new HashMap<>(); - Map> directRewriteCorrelations = new HashMap<>(); - for (RhsRewriteCandidate rhsRewriteCandidate : rhsRewriteCandidates.getRhsRewriteCandidates()) { - if (rhsRewriteCandidate.isDirectRewrite()) { - directRewriteCorrelations.computeIfAbsent( - rhsRewriteCandidate.getRhsColumn(), - c -> { - Optional> correlatedBaseTableColumns = - findCorrelatedBaseTableColumns( - joinableClauses, - c, - rhsRewriteCandidate, - equiconditions - ); - if (!correlatedBaseTableColumns.isPresent()) { - return Optional.empty(); - } else { - JoinFilterColumnCorrelationAnalysis baseColumnAnalysis = correlatedBaseTableColumns.get().get(c); - // for direct rewrites, there will only be one analysis keyed by the RHS column - assert (baseColumnAnalysis != null); - return Optional.of(correlatedBaseTableColumns.get().get(c)); - } - } - ); - } else { - correlationsByPrefix.computeIfAbsent( - rhsRewriteCandidate.getJoinableClause().getPrefix(), - p -> findCorrelatedBaseTableColumns( - joinableClauses, - p, - rhsRewriteCandidate, - equiconditions - ) - ); - } - } - - // Using the RHS table prefix -> JoinFilterColumnCorrelationAnalysis created in the previous step, - // build a map of rhsFilterColumn -> Pair(rhsFilterColumn, rhsFilterValue) -> correlatedValues for specific filter pair - // The Pair(rhsFilterColumn, rhsFilterValue) -> correlatedValues mappings are stored in the - // JoinFilterColumnCorrelationAnalysis objects, which are shared across all rhsFilterColumn entries that belong - // to the same RHS table. - // - // The value is a List instead of a single value because a table can be joined - // to another via multiple columns. - // (See JoinFilterAnalyzerTest.test_filterPushDown_factToRegionOneColumnToTwoRHSColumnsAndFilterOnRHS for an example) - Map> correlationsByFilteringColumn = new LinkedHashMap<>(); - Map> correlationsByDirectFilteringColumn = new LinkedHashMap<>(); - for (RhsRewriteCandidate rhsRewriteCandidate : rhsRewriteCandidates.getRhsRewriteCandidates()) { - if (rhsRewriteCandidate.isDirectRewrite()) { - List perColumnCorrelations = - correlationsByDirectFilteringColumn.computeIfAbsent( - rhsRewriteCandidate.getRhsColumn(), - (rhsCol) -> new ArrayList<>() - ); - perColumnCorrelations.add( - directRewriteCorrelations.get(rhsRewriteCandidate.getRhsColumn()).get() - ); - continue; - } - - Optional> correlationsForPrefix = correlationsByPrefix.get( - rhsRewriteCandidate.getJoinableClause().getPrefix() - ); - if (correlationsForPrefix.isPresent()) { - for (Map.Entry correlationForPrefix : correlationsForPrefix.get() - .entrySet()) { - List perColumnCorrelations = - correlationsByFilteringColumn.computeIfAbsent( - rhsRewriteCandidate.getRhsColumn(), - (rhsCol) -> new ArrayList<>() - ); - perColumnCorrelations.add(correlationForPrefix.getValue()); - correlationForPrefix.getValue().getCorrelatedValuesMap().computeIfAbsent( - Pair.of(rhsRewriteCandidate.getRhsColumn(), rhsRewriteCandidate.getValueForRewrite()), - (rhsVal) -> { - Set correlatedValues = getCorrelatedValuesForPushDown( - rhsRewriteCandidate.getRhsColumn(), - rhsRewriteCandidate.getValueForRewrite(), - correlationForPrefix.getValue().getJoinColumn(), - rhsRewriteCandidate.getJoinableClause(), - enableRewriteValueColumnFilters, - filterRewriteMaxSize - ); - - if (correlatedValues.isEmpty()) { - return Optional.empty(); - } else { - return Optional.of(correlatedValues); - } - } - ); - } - } else { - correlationsByFilteringColumn.put(rhsRewriteCandidate.getRhsColumn(), null); - } - } - - // Go through each per-column analysis list and prune duplicates - for (Map.Entry> correlation : correlationsByFilteringColumn - .entrySet()) { - if (correlation.getValue() != null) { - List dedupList = eliminateCorrelationDuplicates( - correlation.getValue() - ); - correlationsByFilteringColumn.put(correlation.getKey(), dedupList); - } - } - for (Map.Entry> correlation : correlationsByDirectFilteringColumn - .entrySet()) { - if (correlation.getValue() != null) { - List dedupList = eliminateCorrelationDuplicates( - correlation.getValue() - ); - correlationsByDirectFilteringColumn.put(correlation.getKey(), dedupList); - } - } - preAnalysisBuilder.withCorrelationsByFilteringColumn(correlationsByFilteringColumn) - .withCorrelationsByDirectFilteringColumn(correlationsByDirectFilteringColumn); - - return preAnalysisBuilder.build(); + return preAnalysisBuilder.withCorrelations(correlations) + .build(); } /** @@ -561,10 +441,20 @@ public class JoinFilterAnalyzer return JoinFilterAnalysis.createNoPushdownFilterAnalysis(selectorFilter); } + Set newFilterValues = correlatedValues.get(); + // in nothing => match nothing + if (newFilterValues.isEmpty()) { + return new JoinFilterAnalysis( + true, + selectorFilter, + FalseFilter.instance() + ); + } + for (String correlatedBaseColumn : correlationAnalysis.getBaseColumns()) { Filter rewrittenFilter = new InDimFilter( correlatedBaseColumn, - correlatedValues.get(), + newFilterValues, null, null ).toFilter(); @@ -587,7 +477,7 @@ public class JoinFilterAnalyzer Filter rewrittenFilter = new InDimFilter( pushDownVirtualColumn.getOutputName(), - correlatedValues.get(), + newFilterValues, null, null ).toFilter(); @@ -613,218 +503,6 @@ public class JoinFilterAnalyzer return PUSH_DOWN_VIRTUAL_COLUMN_NAME_BASE + counter; } - /** - * Helper method for rewriting filters on join table columns into filters on base table columns. - * - * @param filterColumn A join table column that we're filtering on - * @param filterValue The value to filter on - * @param correlatedJoinColumn A join table column that appears as the RHS of an equicondition, which we can correlate - * with a column on the base table - * @param clauseForFilteredTable The joinable clause that corresponds to the join table being filtered on - * - * @return A list of values of the correlatedJoinColumn that appear in rows where filterColumn = filterValue - * Returns an empty set if we cannot determine the correlated values. - */ - private static Set getCorrelatedValuesForPushDown( - String filterColumn, - String filterValue, - String correlatedJoinColumn, - JoinableClause clauseForFilteredTable, - boolean enableRewriteValueColumnFilters, - long filterRewriteMaxSize - ) - { - String filterColumnNoPrefix = filterColumn.substring(clauseForFilteredTable.getPrefix().length()); - String correlatedColumnNoPrefix = correlatedJoinColumn.substring(clauseForFilteredTable.getPrefix().length()); - - return clauseForFilteredTable.getJoinable().getCorrelatedColumnValues( - filterColumnNoPrefix, - filterValue, - correlatedColumnNoPrefix, - filterRewriteMaxSize, - enableRewriteValueColumnFilters - ); - } - - /** - * For each rhs column that appears in the equiconditions for a table's JoinableClause, - * we try to determine what base table columns are related to the rhs column through the total set of equiconditions. - * We do this by searching backwards through the chain of join equiconditions using the provided equicondition map. - * - * For example, suppose we have 3 tables, A,B,C, joined with the following conditions, where A is the base table: - * A.joinColumn == B.joinColumn - * B.joinColum == C.joinColumn - * - * We would determine that C.joinColumn is correlated with A.joinColumn: we first see that - * C.joinColumn is linked to B.joinColumn which in turn is linked to A.joinColumn - * - * Suppose we had the following join conditions instead: - * f(A.joinColumn) == B.joinColumn - * B.joinColum == C.joinColumn - * In this case, the JoinFilterColumnCorrelationAnalysis for C.joinColumn would be linked to f(A.joinColumn). - * - * Suppose we had the following join conditions instead: - * A.joinColumn == B.joinColumn - * f(B.joinColum) == C.joinColumn - * - * Because we cannot reverse the function f() applied to the second table B in all cases, - * we cannot relate C.joinColumn to A.joinColumn, and we would not generate a correlation for C.joinColumn - * - * @param joinableClauses List of joinable clauses for the query - * @param tablePrefix Prefix for a join table - * @param rhsRewriteCandidate RHS rewrite candidate that we find correlated base table columns for - * @param equiConditions Map of equiconditions, keyed by the right hand columns - * - * @return A list of correlatation analyses for the equicondition RHS columns that reside in the table associated with - * the tablePrefix - */ - private static Optional> findCorrelatedBaseTableColumns( - JoinableClauses joinableClauses, - String tablePrefix, - RhsRewriteCandidate rhsRewriteCandidate, - Equiconditions equiConditions - ) - { - JoinableClause clauseForTablePrefix = rhsRewriteCandidate.getJoinableClause(); - JoinConditionAnalysis jca = clauseForTablePrefix.getCondition(); - - Set rhsColumns = new HashSet<>(); - if (rhsRewriteCandidate.isDirectRewrite()) { - // If we filter on a RHS join column, we only need to consider that column from the RHS side - rhsColumns.add(rhsRewriteCandidate.getRhsColumn()); - } else { - for (Equality eq : jca.getEquiConditions()) { - rhsColumns.add(tablePrefix + eq.getRightColumn()); - } - } - - Map correlations = new LinkedHashMap<>(); - - for (String rhsColumn : rhsColumns) { - Set correlatedBaseColumns = new HashSet<>(); - Set correlatedBaseExpressions = new HashSet<>(); - - getCorrelationForRHSColumn( - joinableClauses, - equiConditions, - rhsColumn, - correlatedBaseColumns, - correlatedBaseExpressions - ); - - if (correlatedBaseColumns.isEmpty() && correlatedBaseExpressions.isEmpty()) { - continue; - } - - correlations.put( - rhsColumn, - new JoinFilterColumnCorrelationAnalysis( - rhsColumn, - correlatedBaseColumns, - correlatedBaseExpressions - ) - ); - } - - if (correlations.size() == 0) { - return Optional.empty(); - } else { - return Optional.of(correlations); - } - } - - /** - * Helper method for {@link #findCorrelatedBaseTableColumns} that determines correlated base table columns - * and/or expressions for a single RHS column and adds them to the provided sets as it traverses the - * equicondition column relationships. - * - * @param equiConditions Map of equiconditions, keyed by the right hand columns - * @param rhsColumn RHS column to find base table correlations for - * @param correlatedBaseColumns Set of correlated base column names for the provided RHS column. Will be modified. - * @param correlatedBaseExpressions Set of correlated base column expressions for the provided RHS column. Will be - * modified. - */ - private static void getCorrelationForRHSColumn( - JoinableClauses joinableClauses, - Equiconditions equiConditions, - String rhsColumn, - Set correlatedBaseColumns, - Set correlatedBaseExpressions - ) - { - String findMappingFor = rhsColumn; - Set lhsExprs = equiConditions.getLhsExprs(findMappingFor); - if (lhsExprs == null) { - return; - } - - for (Expr lhsExpr : lhsExprs) { - String identifier = lhsExpr.getBindingIfIdentifier(); - if (identifier == null) { - // We push down if the function only requires base table columns - Expr.BindingDetails bindingDetails = lhsExpr.analyzeInputs(); - Set requiredBindings = bindingDetails.getRequiredBindings(); - - if (joinableClauses.areSomeColumnsFromJoin(requiredBindings)) { - break; - } - correlatedBaseExpressions.add(lhsExpr); - } else { - // simple identifier, see if we can correlate it with a column on the base table - findMappingFor = identifier; - if (joinableClauses.getColumnFromJoinIfExists(identifier) == null) { - correlatedBaseColumns.add(findMappingFor); - } else { - getCorrelationForRHSColumn( - joinableClauses, - equiConditions, - findMappingFor, - correlatedBaseColumns, - correlatedBaseExpressions - ); - } - } - } - } - - /** - * Given a list of JoinFilterColumnCorrelationAnalysis, prune the list so that we only have one - * JoinFilterColumnCorrelationAnalysis for each unique combination of base columns. - * - * Suppose we have a join condition like the following, where A is the base table: - * A.joinColumn == B.joinColumn && A.joinColumn == B.joinColumn2 - * - * We only need to consider one correlation to A.joinColumn since B.joinColumn and B.joinColumn2 must - * have the same value in any row that matches the join condition. - * - * In the future this method could consider which column correlation should be preserved based on availability of - * indices and other heuristics. - * - * When push down of filters with LHS expressions in the join condition is supported, this method should also - * consider expressions. - * - * @param originalList Original list of column correlation analyses. - * - * @return Pruned list of column correlation analyses. - */ - private static List eliminateCorrelationDuplicates( - List originalList - ) - { - Map, JoinFilterColumnCorrelationAnalysis> uniquesMap = new HashMap<>(); - - for (JoinFilterColumnCorrelationAnalysis jca : originalList) { - Set mapKey = new HashSet<>(jca.getBaseColumns()); - for (Expr expr : jca.getBaseExpressions()) { - mapKey.add(expr.stringify()); - } - - uniquesMap.put(mapKey, jca); - } - - return new ArrayList<>(uniquesMap.values()); - } - private static boolean isColumnFromPostJoinVirtualColumns( List postJoinVirtualColumns, String column diff --git a/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterCorrelations.java b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterCorrelations.java new file mode 100644 index 00000000000..84fbccd8a57 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterCorrelations.java @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.join.filter; + +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.math.expr.Expr; +import org.apache.druid.query.filter.Filter; +import org.apache.druid.segment.join.Equality; +import org.apache.druid.segment.join.JoinConditionAnalysis; +import org.apache.druid.segment.join.JoinableClause; +import org.apache.druid.segment.join.filter.rewrite.RhsRewriteCandidate; +import org.apache.druid.segment.join.filter.rewrite.RhsRewriteCandidates; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * A wrapper class for correlation analyses of different filters involved in the query. It contains: + * + * - A mapping of RHS filtering columns -> List, used for filter rewrites + * - A second mapping of RHS filtering columns -> List, used for direct filter rewrites + */ +public class JoinFilterCorrelations +{ + private final Map> correlationsByFilteringColumn; + private final Map> correlationsByDirectFilteringColumn; + + private JoinFilterCorrelations( + Map> correlationsByFilteringColumn, + Map> correlationsByDirectFilteringColumn + ) + { + this.correlationsByFilteringColumn = correlationsByFilteringColumn; + this.correlationsByDirectFilteringColumn = correlationsByDirectFilteringColumn; + } + + public Map> getCorrelationsByFilteringColumn() + { + return correlationsByFilteringColumn; + } + + public Map> getCorrelationsByDirectFilteringColumn() + { + return correlationsByDirectFilteringColumn; + } + + public static JoinFilterCorrelations computeJoinFilterCorrelations( + List normalizedJoinTableClauses, + Equiconditions equiconditions, + JoinableClauses joinableClauses, + boolean enableRewriteValueColumnFilters, + long filterRewriteMaxSize + ) + { + RhsRewriteCandidates rhsRewriteCandidates = + RhsRewriteCandidates.getRhsRewriteCandidates(normalizedJoinTableClauses, equiconditions, joinableClauses); + + // Build a map of RHS table prefix -> JoinFilterColumnCorrelationAnalysis based on the RHS rewrite candidates + Map>> correlationsByPrefix = new HashMap<>(); + Map> directRewriteCorrelations = new HashMap<>(); + for (RhsRewriteCandidate rhsRewriteCandidate : rhsRewriteCandidates.getRhsRewriteCandidates()) { + if (rhsRewriteCandidate.isDirectRewrite()) { + directRewriteCorrelations.computeIfAbsent( + rhsRewriteCandidate.getRhsColumn(), + c -> { + Optional> correlatedBaseTableColumns = + findCorrelatedBaseTableColumns( + joinableClauses, + c, + rhsRewriteCandidate, + equiconditions + ); + if (!correlatedBaseTableColumns.isPresent()) { + return Optional.empty(); + } else { + JoinFilterColumnCorrelationAnalysis baseColumnAnalysis = correlatedBaseTableColumns.get().get(c); + // for direct rewrites, there will only be one analysis keyed by the RHS column + assert (baseColumnAnalysis != null); + return Optional.of(correlatedBaseTableColumns.get().get(c)); + } + } + ); + } else { + correlationsByPrefix.computeIfAbsent( + rhsRewriteCandidate.getJoinableClause().getPrefix(), + p -> findCorrelatedBaseTableColumns( + joinableClauses, + p, + rhsRewriteCandidate, + equiconditions + ) + ); + } + } + + // Using the RHS table prefix -> JoinFilterColumnCorrelationAnalysis created in the previous step, + // build a map of rhsFilterColumn -> Pair(rhsFilterColumn, rhsFilterValue) -> correlatedValues for specific filter pair + // The Pair(rhsFilterColumn, rhsFilterValue) -> correlatedValues mappings are stored in the + // JoinFilterColumnCorrelationAnalysis objects, which are shared across all rhsFilterColumn entries that belong + // to the same RHS table. + // + // The value is a List instead of a single value because a table can be joined + // to another via multiple columns. + // (See JoinFilterAnalyzerTest.test_filterPushDown_factToRegionOneColumnToTwoRHSColumnsAndFilterOnRHS for an example) + Map> correlationsByFilteringColumn = new LinkedHashMap<>(); + Map> correlationsByDirectFilteringColumn = new LinkedHashMap<>(); + for (RhsRewriteCandidate rhsRewriteCandidate : rhsRewriteCandidates.getRhsRewriteCandidates()) { + if (rhsRewriteCandidate.isDirectRewrite()) { + List perColumnCorrelations = + correlationsByDirectFilteringColumn.computeIfAbsent( + rhsRewriteCandidate.getRhsColumn(), + (rhsCol) -> new ArrayList<>() + ); + perColumnCorrelations.add( + directRewriteCorrelations.get(rhsRewriteCandidate.getRhsColumn()).get() + ); + continue; + } + + Optional> correlationsForPrefix = correlationsByPrefix.get( + rhsRewriteCandidate.getJoinableClause().getPrefix() + ); + if (correlationsForPrefix.isPresent()) { + for (Map.Entry correlationForPrefix : correlationsForPrefix.get() + .entrySet()) { + List perColumnCorrelations = + correlationsByFilteringColumn.computeIfAbsent( + rhsRewriteCandidate.getRhsColumn(), + (rhsCol) -> new ArrayList<>() + ); + perColumnCorrelations.add(correlationForPrefix.getValue()); + correlationForPrefix.getValue().getCorrelatedValuesMap().computeIfAbsent( + Pair.of(rhsRewriteCandidate.getRhsColumn(), rhsRewriteCandidate.getValueForRewrite()), + (rhsVal) -> { + Optional> correlatedValues = getCorrelatedValuesForPushDown( + rhsRewriteCandidate.getRhsColumn(), + rhsRewriteCandidate.getValueForRewrite(), + correlationForPrefix.getValue().getJoinColumn(), + rhsRewriteCandidate.getJoinableClause(), + enableRewriteValueColumnFilters, + filterRewriteMaxSize + ); + return correlatedValues; + } + ); + } + } else { + correlationsByFilteringColumn.put(rhsRewriteCandidate.getRhsColumn(), null); + } + } + + // Go through each per-column analysis list and prune duplicates + for (Map.Entry> correlation : correlationsByFilteringColumn + .entrySet()) { + if (correlation.getValue() != null) { + List dedupList = eliminateCorrelationDuplicates( + correlation.getValue() + ); + correlationsByFilteringColumn.put(correlation.getKey(), dedupList); + } + } + for (Map.Entry> correlation : correlationsByDirectFilteringColumn + .entrySet()) { + if (correlation.getValue() != null) { + List dedupList = eliminateCorrelationDuplicates( + correlation.getValue() + ); + correlationsByDirectFilteringColumn.put(correlation.getKey(), dedupList); + } + } + return new JoinFilterCorrelations(correlationsByFilteringColumn, correlationsByDirectFilteringColumn); + } + + + /** + * Given a list of JoinFilterColumnCorrelationAnalysis, prune the list so that we only have one + * JoinFilterColumnCorrelationAnalysis for each unique combination of base columns. + *

+ * Suppose we have a join condition like the following, where A is the base table: + * A.joinColumn == B.joinColumn && A.joinColumn == B.joinColumn2 + *

+ * We only need to consider one correlation to A.joinColumn since B.joinColumn and B.joinColumn2 must + * have the same value in any row that matches the join condition. + *

+ * In the future this method could consider which column correlation should be preserved based on availability of + * indices and other heuristics. + *

+ * When push down of filters with LHS expressions in the join condition is supported, this method should also + * consider expressions. + * + * @param originalList Original list of column correlation analyses. + * @return Pruned list of column correlation analyses. + */ + private static List eliminateCorrelationDuplicates( + List originalList + ) + { + Map, JoinFilterColumnCorrelationAnalysis> uniquesMap = new HashMap<>(); + + for (JoinFilterColumnCorrelationAnalysis jca : originalList) { + Set mapKey = new HashSet<>(jca.getBaseColumns()); + for (Expr expr : jca.getBaseExpressions()) { + mapKey.add(expr.stringify()); + } + + uniquesMap.put(mapKey, jca); + } + + return new ArrayList<>(uniquesMap.values()); + } + + + /** + * Helper method for rewriting filters on join table columns into filters on base table columns. + * + * @param filterColumn A join table column that we're filtering on + * @param filterValue The value to filter on + * @param correlatedJoinColumn A join table column that appears as the RHS of an equicondition, which we can correlate + * with a column on the base table + * @param clauseForFilteredTable The joinable clause that corresponds to the join table being filtered on + * @return A list of values of the correlatedJoinColumn that appear in rows where filterColumn = filterValue + * Returns absent if we cannot determine the correlated values. + */ + private static Optional> getCorrelatedValuesForPushDown( + String filterColumn, + String filterValue, + String correlatedJoinColumn, + JoinableClause clauseForFilteredTable, + boolean enableRewriteValueColumnFilters, + long filterRewriteMaxSize + ) + { + String filterColumnNoPrefix = filterColumn.substring(clauseForFilteredTable.getPrefix().length()); + String correlatedColumnNoPrefix = correlatedJoinColumn.substring(clauseForFilteredTable.getPrefix().length()); + + return clauseForFilteredTable.getJoinable().getCorrelatedColumnValues( + filterColumnNoPrefix, + filterValue, + correlatedColumnNoPrefix, + filterRewriteMaxSize, + enableRewriteValueColumnFilters + ); + } + + /** + * For each rhs column that appears in the equiconditions for a table's JoinableClause, + * we try to determine what base table columns are related to the rhs column through the total set of equiconditions. + * We do this by searching backwards through the chain of join equiconditions using the provided equicondition map. + *

+ * For example, suppose we have 3 tables, A,B,C, joined with the following conditions, where A is the base table: + * A.joinColumn == B.joinColumn + * B.joinColum == C.joinColumnenableRewriteValueColumnFilters + *

+ * We would determine that C.joinColumn is correlated with A.joinColumn: we first see that + * C.joinColumn is linked to B.joinColumn which in turn is linked to A.joinColumn + *

+ * Suppose we had the following join conditions instead: + * f(A.joinColumn) == B.joinColumn + * B.joinColum == C.joinColumn + * In this case, the JoinFilterColumnCorrelationAnalysis for C.joinColumn would be linked to f(A.joinColumn). + *

+ * Suppose we had the following join conditions instead: + * A.joinColumn == B.joinColumn + * f(B.joinColum) == C.joinColumn + *

+ * Because we cannot reverse the function f() applied to the second table B in all cases, + * we cannot relate C.joinColumn to A.joinColumn, and we would not generate a correlation for C.joinColumn + * + * @param joinableClauses List of joinable clauses for the query + * @param tablePrefix Prefix for a join table + * @param rhsRewriteCandidate RHS rewrite candidate that we find correlated base table columns for + * @param equiConditions Map of equiconditions, keyed by the right hand columns + * @return A list of correlatation analyses for the equicondition RHS columns that reside in the table associated with + * the tablePrefix + */ + private static Optional> findCorrelatedBaseTableColumns( + JoinableClauses joinableClauses, + String tablePrefix, + RhsRewriteCandidate rhsRewriteCandidate, + Equiconditions equiConditions + ) + { + JoinableClause clauseForTablePrefix = rhsRewriteCandidate.getJoinableClause(); + JoinConditionAnalysis jca = clauseForTablePrefix.getCondition(); + + Set rhsColumns = new HashSet<>(); + if (rhsRewriteCandidate.isDirectRewrite()) { + // If we filter on a RHS join column, we only need to consider that column from the RHS side + rhsColumns.add(rhsRewriteCandidate.getRhsColumn()); + } else { + for (Equality eq : jca.getEquiConditions()) { + rhsColumns.add(tablePrefix + eq.getRightColumn()); + } + } + + Map correlations = new LinkedHashMap<>(); + + for (String rhsColumn : rhsColumns) { + Set correlatedBaseColumns = new HashSet<>(); + Set correlatedBaseExpressions = new HashSet<>(); + + getCorrelationForRHSColumn( + joinableClauses, + equiConditions, + rhsColumn, + correlatedBaseColumns, + correlatedBaseExpressions + ); + + if (correlatedBaseColumns.isEmpty() && correlatedBaseExpressions.isEmpty()) { + continue; + } + + correlations.put( + rhsColumn, + new JoinFilterColumnCorrelationAnalysis( + rhsColumn, + correlatedBaseColumns, + correlatedBaseExpressions + ) + ); + } + + if (correlations.size() == 0) { + return Optional.empty(); + } else { + return Optional.of(correlations); + } + } + + /** + * Helper method for {@link #findCorrelatedBaseTableColumns} that determines correlated base table columns + * and/or expressions for a single RHS column and adds them to the provided sets as it traverses the + * equicondition column relationships. + * + * @param equiConditions Map of equiconditions, keyed by the right hand columns + * @param rhsColumn RHS column to find base table correlations for + * @param correlatedBaseColumns Set of correlated base column names for the provided RHS column. Will be modified. + * @param correlatedBaseExpressions Set of correlated base column expressions for the provided RHS column. Will be + * modified. + */ + private static void getCorrelationForRHSColumn( + JoinableClauses joinableClauses, + Equiconditions equiConditions, + String rhsColumn, + Set correlatedBaseColumns, + Set correlatedBaseExpressions + ) + { + String findMappingFor = rhsColumn; + Set lhsExprs = equiConditions.getLhsExprs(findMappingFor); + if (lhsExprs == null) { + return; + } + + for (Expr lhsExpr : lhsExprs) { + String identifier = lhsExpr.getBindingIfIdentifier(); + if (identifier == null) { + // We push down if the function only requires base table columns + Expr.BindingDetails bindingDetails = lhsExpr.analyzeInputs(); + Set requiredBindings = bindingDetails.getRequiredBindings(); + + if (joinableClauses.areSomeColumnsFromJoin(requiredBindings)) { + break; + } + correlatedBaseExpressions.add(lhsExpr); + } else { + // simple identifier, see if we can correlate it with a column on the base table + findMappingFor = identifier; + if (joinableClauses.getColumnFromJoinIfExists(identifier) == null) { + correlatedBaseColumns.add(findMappingFor); + } else { + getCorrelationForRHSColumn( + joinableClauses, + equiConditions, + findMappingFor, + correlatedBaseColumns, + correlatedBaseExpressions + ); + } + } + } + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterPreAnalysis.java b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterPreAnalysis.java index 5b3217cbd2f..ae3731db77a 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterPreAnalysis.java +++ b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterPreAnalysis.java @@ -41,8 +41,6 @@ import java.util.Set; * - The query's original filter (if any) * - A list of filter clauses from the original filter's CNF representation that only reference the base table * - A list of filter clauses from the original filter's CNF representation that reference RHS join tables - * - A mapping of RHS filtering columns -> List, used for filter rewrites - * - A second mapping of RHS filtering columns -> List, used for direct filter rewrites * - A list of virtual columns that can only be computed post-join * - Control flag booleans for whether filter push down and RHS rewrites are enabled. */ @@ -52,8 +50,7 @@ public class JoinFilterPreAnalysis private final Filter originalFilter; private final List normalizedBaseTableClauses; private final List normalizedJoinTableClauses; - private final Map> correlationsByFilteringColumn; - private final Map> correlationsByDirectFilteringColumn; + private final JoinFilterCorrelations correlations; private final boolean enableFilterPushDown; private final boolean enableFilterRewrite; private final List postJoinVirtualColumns; @@ -65,8 +62,7 @@ public class JoinFilterPreAnalysis final List postJoinVirtualColumns, final List normalizedBaseTableClauses, final List normalizedJoinTableClauses, - final Map> correlationsByFilteringColumn, - final Map> correlationsByDirectFilteringColumn, + JoinFilterCorrelations correlations, final boolean enableFilterPushDown, final boolean enableFilterRewrite, final Equiconditions equiconditions @@ -77,8 +73,7 @@ public class JoinFilterPreAnalysis this.postJoinVirtualColumns = postJoinVirtualColumns; this.normalizedBaseTableClauses = normalizedBaseTableClauses; this.normalizedJoinTableClauses = normalizedJoinTableClauses; - this.correlationsByFilteringColumn = correlationsByFilteringColumn; - this.correlationsByDirectFilteringColumn = correlationsByDirectFilteringColumn; + this.correlations = correlations; this.enableFilterPushDown = enableFilterPushDown; this.enableFilterRewrite = enableFilterRewrite; this.equiconditions = equiconditions; @@ -111,12 +106,12 @@ public class JoinFilterPreAnalysis public Map> getCorrelationsByFilteringColumn() { - return correlationsByFilteringColumn; + return correlations.getCorrelationsByFilteringColumn(); } public Map> getCorrelationsByDirectFilteringColumn() { - return correlationsByDirectFilteringColumn; + return correlations.getCorrelationsByDirectFilteringColumn(); } public boolean isEnableFilterPushDown() @@ -143,8 +138,7 @@ public class JoinFilterPreAnalysis @Nullable private final Filter originalFilter; @Nullable private List normalizedBaseTableClauses; @Nullable private List normalizedJoinTableClauses; - @Nullable private Map> correlationsByFilteringColumn; - @Nullable private Map> correlationsByDirectFilteringColumn; + @Nullable private JoinFilterCorrelations correlations; private boolean enableFilterPushDown = false; private boolean enableFilterRewrite = false; @Nonnull private final List postJoinVirtualColumns; @@ -173,19 +167,11 @@ public class JoinFilterPreAnalysis return this; } - public Builder withCorrelationsByFilteringColumn( - Map> correlationsByFilteringColumn + public Builder withCorrelations( + JoinFilterCorrelations correlations ) { - this.correlationsByFilteringColumn = correlationsByFilteringColumn; - return this; - } - - public Builder withCorrelationsByDirectFilteringColumn( - Map> correlationsByDirectFilteringColumn - ) - { - this.correlationsByDirectFilteringColumn = correlationsByDirectFilteringColumn; + this.correlations = correlations; return this; } @@ -225,8 +211,7 @@ public class JoinFilterPreAnalysis postJoinVirtualColumns, normalizedBaseTableClauses, normalizedJoinTableClauses, - correlationsByFilteringColumn, - correlationsByDirectFilteringColumn, + correlations, enableFilterPushDown, enableFilterRewrite, equiconditions diff --git a/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinable.java b/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinable.java index 353808dd6b3..7fae4e0db3b 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinable.java +++ b/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinable.java @@ -33,6 +33,7 @@ import org.apache.druid.segment.join.Joinable; import javax.annotation.Nullable; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Set; public class LookupJoinable implements Joinable @@ -88,7 +89,7 @@ public class LookupJoinable implements Joinable } @Override - public Set getCorrelatedColumnValues( + public Optional> getCorrelatedColumnValues( String searchColumnName, String searchColumnValue, String retrievalColumnName, @@ -97,7 +98,7 @@ public class LookupJoinable implements Joinable ) { if (!ALL_COLUMNS.contains(searchColumnName) || !ALL_COLUMNS.contains(retrievalColumnName)) { - return ImmutableSet.of(); + return Optional.empty(); } Set correlatedValues; if (LookupColumnSelectorFactory.KEY_COLUMN.equals(searchColumnName)) { @@ -109,7 +110,7 @@ public class LookupJoinable implements Joinable } } else { if (!allowNonKeyColumnSearch) { - return ImmutableSet.of(); + return Optional.empty(); } if (LookupColumnSelectorFactory.VALUE_COLUMN.equals(retrievalColumnName)) { // This should not happen in practice because the column to be joined on must be a key. @@ -120,6 +121,6 @@ public class LookupJoinable implements Joinable correlatedValues = ImmutableSet.copyOf(extractor.unapply(searchColumnValue)); } } - return correlatedValues; + return Optional.of(correlatedValues); } } diff --git a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java index a661b5ad21c..66ef1213d6f 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java +++ b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java @@ -19,7 +19,6 @@ package org.apache.druid.segment.join.table; -import com.google.common.collect.ImmutableSet; import it.unimi.dsi.fastutil.ints.IntList; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.column.ColumnCapabilities; @@ -31,6 +30,7 @@ import javax.annotation.Nullable; import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.Set; public class IndexedTableJoinable implements Joinable @@ -82,7 +82,7 @@ public class IndexedTableJoinable implements Joinable } @Override - public Set getCorrelatedColumnValues( + public Optional> getCorrelatedColumnValues( String searchColumnName, String searchColumnValue, String retrievalColumnName, @@ -94,7 +94,7 @@ public class IndexedTableJoinable implements Joinable int correlatedColumnPosition = table.rowSignature().indexOf(retrievalColumnName); if (filterColumnPosition < 0 || correlatedColumnPosition < 0) { - return ImmutableSet.of(); + return Optional.empty(); } Set correlatedValues = new HashSet<>(); @@ -108,13 +108,13 @@ public class IndexedTableJoinable implements Joinable correlatedValues.add(correlatedDimVal); if (correlatedValues.size() > maxCorrelationSetSize) { - return ImmutableSet.of(); + return Optional.empty(); } } - return correlatedValues; + return Optional.of(correlatedValues); } else { if (!allowNonKeyColumnSearch) { - return ImmutableSet.of(); + return Optional.empty(); } IndexedTable.Reader dimNameReader = table.columnReader(filterColumnPosition); @@ -125,12 +125,12 @@ public class IndexedTableJoinable implements Joinable String correlatedDimVal = Objects.toString(correlatedColumnReader.read(i), null); correlatedValues.add(correlatedDimVal); if (correlatedValues.size() > maxCorrelationSetSize) { - return ImmutableSet.of(); + return Optional.empty(); } } } - return correlatedValues; + return Optional.of(correlatedValues); } } } diff --git a/processing/src/test/java/org/apache/druid/segment/join/JoinFilterAnalyzerTest.java b/processing/src/test/java/org/apache/druid/segment/join/JoinFilterAnalyzerTest.java index e1e480be54e..965dd2302f1 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/JoinFilterAnalyzerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/JoinFilterAnalyzerTest.java @@ -39,6 +39,7 @@ import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.filter.AndFilter; import org.apache.druid.segment.filter.BoundFilter; +import org.apache.druid.segment.filter.FalseFilter; import org.apache.druid.segment.filter.OrFilter; import org.apache.druid.segment.filter.SelectorFilter; import org.apache.druid.segment.join.filter.JoinFilterAnalyzer; @@ -1262,6 +1263,60 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes ); } + @Test + public void test_filterPushDown_factToCountryRightWithFilterOnValueThatMatchesNothing() + { + List joinableClauses = ImmutableList.of(factToCountryOnIsoCode(JoinType.RIGHT)); + Filter originalFilter = new AndFilter( + ImmutableList.of( + new SelectorFilter("channel", null), + new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName", "NO MATCH") + ) + ); + JoinFilterPreAnalysis joinFilterPreAnalysis = simplePreAnalysis( + joinableClauses, + originalFilter + ); + HashJoinSegmentStorageAdapter adapter = new HashJoinSegmentStorageAdapter( + factSegment.asStorageAdapter(), + joinableClauses, + joinFilterPreAnalysis + ); + + JoinFilterSplit expectedFilterSplit = new JoinFilterSplit( + FalseFilter.instance(), + new AndFilter( + ImmutableList.of( + new SelectorFilter("channel", null), + new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName", "NO MATCH") + ) + ), + ImmutableSet.of() + ); + JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis); + Assert.assertEquals(expectedFilterSplit, actualFilterSplit); + + JoinTestHelper.verifyCursors( + adapter.makeCursors( + originalFilter, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ), + ImmutableList.of( + "page", + "countryIsoCode", + "countryNumber", + FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryIsoCode", + FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName", + FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryNumber" + ), + ImmutableList.of() + ); + } + @Test public void test_filterPushDown_factToCountryRightWithFilterOnNullColumnsUsingLookup() { diff --git a/processing/src/test/java/org/apache/druid/segment/join/lookup/LookupJoinableTest.java b/processing/src/test/java/org/apache/druid/segment/join/lookup/LookupJoinableTest.java index 4115b84acec..2037f776317 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/lookup/LookupJoinableTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/lookup/LookupJoinableTest.java @@ -36,6 +36,7 @@ import org.mockito.junit.MockitoJUnitRunner; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Set; @RunWith(MockitoJUnitRunner.class) @@ -117,7 +118,7 @@ public class LookupJoinableTest @Test public void getCorrelatedColummnValuesMissingSearchColumnShouldReturnEmptySet() { - Set correlatedValues = + Optional> correlatedValues = target.getCorrelatedColumnValues( UNKNOWN_COLUMN, SEARCH_KEY_VALUE, @@ -125,13 +126,13 @@ public class LookupJoinableTest 0, false); - Assert.assertEquals(Collections.emptySet(), correlatedValues); + Assert.assertFalse(correlatedValues.isPresent()); } @Test public void getCorrelatedColummnValuesMissingRetrievalColumnShouldReturnEmptySet() { - Set correlatedValues = + Optional> correlatedValues = target.getCorrelatedColumnValues( LookupColumnSelectorFactory.KEY_COLUMN, SEARCH_KEY_VALUE, @@ -139,85 +140,85 @@ public class LookupJoinableTest 0, false); - Assert.assertEquals(Collections.emptySet(), correlatedValues); + Assert.assertFalse(correlatedValues.isPresent()); } @Test public void getCorrelatedColumnValuesForSearchKeyAndRetrieveKeyColumnShouldReturnSearchValue() { - Set correlatedValues = target.getCorrelatedColumnValues( + Optional> correlatedValues = target.getCorrelatedColumnValues( LookupColumnSelectorFactory.KEY_COLUMN, SEARCH_KEY_VALUE, LookupColumnSelectorFactory.KEY_COLUMN, 0, false); - Assert.assertEquals(ImmutableSet.of(SEARCH_KEY_VALUE), correlatedValues); + Assert.assertEquals(Optional.of(ImmutableSet.of(SEARCH_KEY_VALUE)), correlatedValues); } @Test public void getCorrelatedColumnValuesForSearchKeyAndRetrieveValueColumnShouldReturnExtractedValue() { - Set correlatedValues = target.getCorrelatedColumnValues( + Optional> correlatedValues = target.getCorrelatedColumnValues( LookupColumnSelectorFactory.KEY_COLUMN, SEARCH_KEY_VALUE, LookupColumnSelectorFactory.VALUE_COLUMN, 0, false); - Assert.assertEquals(ImmutableSet.of(SEARCH_VALUE_VALUE), correlatedValues); + Assert.assertEquals(Optional.of(ImmutableSet.of(SEARCH_VALUE_VALUE)), correlatedValues); } @Test public void getCorrelatedColumnValuesForSearchKeyMissingAndRetrieveValueColumnShouldReturnExtractedValue() { - Set correlatedValues = target.getCorrelatedColumnValues( + Optional> correlatedValues = target.getCorrelatedColumnValues( LookupColumnSelectorFactory.KEY_COLUMN, SEARCH_KEY_NULL_VALUE, LookupColumnSelectorFactory.VALUE_COLUMN, 0, false); - Assert.assertEquals(Collections.singleton(null), correlatedValues); + Assert.assertEquals(Optional.of(Collections.singleton(null)), correlatedValues); } @Test public void getCorrelatedColumnValuesForSearchValueAndRetrieveValueColumnAndNonKeyColumnSearchDisabledShouldReturnSearchValue() { - Set correlatedValues = target.getCorrelatedColumnValues( + Optional> correlatedValues = target.getCorrelatedColumnValues( LookupColumnSelectorFactory.VALUE_COLUMN, SEARCH_VALUE_VALUE, LookupColumnSelectorFactory.VALUE_COLUMN, 10, false); - Assert.assertEquals(ImmutableSet.of(), correlatedValues); + Assert.assertEquals(Optional.empty(), correlatedValues); correlatedValues = target.getCorrelatedColumnValues( LookupColumnSelectorFactory.VALUE_COLUMN, SEARCH_VALUE_VALUE, LookupColumnSelectorFactory.KEY_COLUMN, 10, false); - Assert.assertEquals(ImmutableSet.of(), correlatedValues); + Assert.assertEquals(Optional.empty(), correlatedValues); } @Test public void getCorrelatedColumnValuesForSearchValueAndRetrieveValueColumnShouldReturnSearchValue() { - Set correlatedValues = target.getCorrelatedColumnValues( + Optional> correlatedValues = target.getCorrelatedColumnValues( LookupColumnSelectorFactory.VALUE_COLUMN, SEARCH_VALUE_VALUE, LookupColumnSelectorFactory.VALUE_COLUMN, 0, true); - Assert.assertEquals(ImmutableSet.of(SEARCH_VALUE_VALUE), correlatedValues); + Assert.assertEquals(Optional.of(ImmutableSet.of(SEARCH_VALUE_VALUE)), correlatedValues); } @Test public void getCorrelatedColumnValuesForSearchValueAndRetrieveKeyColumnShouldReturnUnAppliedValue() { - Set correlatedValues = target.getCorrelatedColumnValues( + Optional> correlatedValues = target.getCorrelatedColumnValues( LookupColumnSelectorFactory.VALUE_COLUMN, SEARCH_VALUE_VALUE, LookupColumnSelectorFactory.KEY_COLUMN, 10, true); - Assert.assertEquals(ImmutableSet.of(SEARCH_KEY_VALUE), correlatedValues); + Assert.assertEquals(Optional.of(ImmutableSet.of(SEARCH_KEY_VALUE)), correlatedValues); } @Test @@ -228,24 +229,24 @@ public class LookupJoinableTest */ public void getCorrelatedColumnValuesForSearchValueAndRetrieveKeyColumnWithMaxLimitSetShouldHonorMaxLimit() { - Set correlatedValues = target.getCorrelatedColumnValues( + Optional> correlatedValues = target.getCorrelatedColumnValues( LookupColumnSelectorFactory.VALUE_COLUMN, SEARCH_VALUE_VALUE, LookupColumnSelectorFactory.KEY_COLUMN, 0, true); - Assert.assertEquals(ImmutableSet.of(), correlatedValues); + Assert.assertEquals(Optional.empty(), correlatedValues); } @Test public void getCorrelatedColumnValuesForSearchUnknownValueAndRetrieveKeyColumnShouldReturnNoCorrelatedValues() { - Set correlatedValues = target.getCorrelatedColumnValues( + Optional> correlatedValues = target.getCorrelatedColumnValues( LookupColumnSelectorFactory.VALUE_COLUMN, SEARCH_VALUE_UNKNOWN, LookupColumnSelectorFactory.KEY_COLUMN, 10, true); - Assert.assertEquals(ImmutableSet.of(), correlatedValues); + Assert.assertEquals(Optional.of(ImmutableSet.of()), correlatedValues); } } diff --git a/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinableTest.java b/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinableTest.java index 89f78dc523d..20cfdb786fb 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinableTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinableTest.java @@ -40,6 +40,7 @@ import org.junit.Before; import org.junit.Test; import java.util.Collections; +import java.util.Optional; import java.util.Set; public class IndexedTableJoinableTest @@ -190,9 +191,9 @@ public class IndexedTableJoinableTest } @Test - public void getCorrelatedColummnValuesMissingSearchColumnShouldReturnEmptySet() + public void getCorrelatedColummnValuesMissingSearchColumnShouldReturnEmpty() { - Set correlatedValues = + Optional> correlatedValues = target.getCorrelatedColumnValues( UNKNOWN_COLUMN, "foo", @@ -200,13 +201,13 @@ public class IndexedTableJoinableTest MAX_CORRELATION_SET_SIZE, false); - Assert.assertEquals(Collections.emptySet(), correlatedValues); + Assert.assertEquals(Optional.empty(), correlatedValues); } @Test - public void getCorrelatedColummnValuesMissingRetrievalColumnShouldReturnEmptySet() + public void getCorrelatedColummnValuesMissingRetrievalColumnShouldReturnEmpty() { - Set correlatedValues = + Optional> correlatedValues = target.getCorrelatedColumnValues( KEY_COLUMN, "foo", @@ -214,121 +215,121 @@ public class IndexedTableJoinableTest MAX_CORRELATION_SET_SIZE, false); - Assert.assertEquals(Collections.emptySet(), correlatedValues); + Assert.assertEquals(Optional.empty(), correlatedValues); } @Test public void getCorrelatedColumnValuesForSearchKeyAndRetrieveKeyColumnShouldReturnSearchValue() { - Set correlatedValues = target.getCorrelatedColumnValues( + Optional> correlatedValues = target.getCorrelatedColumnValues( KEY_COLUMN, SEARCH_KEY_VALUE, KEY_COLUMN, MAX_CORRELATION_SET_SIZE, false); - Assert.assertEquals(ImmutableSet.of(SEARCH_KEY_VALUE), correlatedValues); + Assert.assertEquals(Optional.of(ImmutableSet.of(SEARCH_KEY_VALUE)), correlatedValues); } @Test - public void getCorrelatedColumnValuesForSearchKeyAndRetrieveKeyColumnAboveLimitShouldReturnEmptySet() + public void getCorrelatedColumnValuesForSearchKeyAndRetrieveKeyColumnAboveLimitShouldReturnEmpty() { - Set correlatedValues = target.getCorrelatedColumnValues( + Optional> correlatedValues = target.getCorrelatedColumnValues( KEY_COLUMN, SEARCH_KEY_VALUE, KEY_COLUMN, 0, false); - Assert.assertEquals(ImmutableSet.of(), correlatedValues); + Assert.assertEquals(Optional.empty(), correlatedValues); } @Test public void getCorrelatedColumnValuesForSearchKeyAndRetrieveValueColumnShouldReturnExtractedValue() { - Set correlatedValues = target.getCorrelatedColumnValues( + Optional> correlatedValues = target.getCorrelatedColumnValues( KEY_COLUMN, SEARCH_KEY_VALUE, VALUE_COLUMN, MAX_CORRELATION_SET_SIZE, false); - Assert.assertEquals(ImmutableSet.of(SEARCH_VALUE_VALUE), correlatedValues); + Assert.assertEquals(Optional.of(ImmutableSet.of(SEARCH_VALUE_VALUE)), correlatedValues); } @Test public void getCorrelatedColumnValuesForSearchKeyMissingAndRetrieveValueColumnShouldReturnExtractedValue() { - Set correlatedValues = target.getCorrelatedColumnValues( + Optional> correlatedValues = target.getCorrelatedColumnValues( KEY_COLUMN, SEARCH_KEY_NULL_VALUE, VALUE_COLUMN, MAX_CORRELATION_SET_SIZE, false); - Assert.assertEquals(Collections.singleton(null), correlatedValues); + Assert.assertEquals(Optional.of(Collections.singleton(null)), correlatedValues); } @Test - public void getCorrelatedColumnValuesForSearchValueAndRetrieveValueColumnAndNonKeyColumnSearchDisabledShouldReturnSearchValue() + public void getCorrelatedColumnValuesForSearchValueAndRetrieveValueColumnAndNonKeyColumnSearchDisabledShouldReturnEmpty() { - Set correlatedValues = target.getCorrelatedColumnValues( + Optional> correlatedValues = target.getCorrelatedColumnValues( VALUE_COLUMN, SEARCH_VALUE_VALUE, VALUE_COLUMN, MAX_CORRELATION_SET_SIZE, false); - Assert.assertEquals(ImmutableSet.of(), correlatedValues); + Assert.assertEquals(Optional.empty(), correlatedValues); correlatedValues = target.getCorrelatedColumnValues( VALUE_COLUMN, SEARCH_VALUE_VALUE, KEY_COLUMN, 10, false); - Assert.assertEquals(ImmutableSet.of(), correlatedValues); + Assert.assertEquals(Optional.empty(), correlatedValues); } @Test public void getCorrelatedColumnValuesForSearchValueAndRetrieveValueColumnShouldReturnSearchValue() { - Set correlatedValues = target.getCorrelatedColumnValues( + Optional> correlatedValues = target.getCorrelatedColumnValues( VALUE_COLUMN, SEARCH_VALUE_VALUE, VALUE_COLUMN, MAX_CORRELATION_SET_SIZE, true); - Assert.assertEquals(ImmutableSet.of(SEARCH_VALUE_VALUE), correlatedValues); + Assert.assertEquals(Optional.of(ImmutableSet.of(SEARCH_VALUE_VALUE)), correlatedValues); } @Test public void getCorrelatedColumnValuesForSearchValueAndRetrieveKeyColumnShouldReturnUnAppliedValue() { - Set correlatedValues = target.getCorrelatedColumnValues( + Optional> correlatedValues = target.getCorrelatedColumnValues( VALUE_COLUMN, SEARCH_VALUE_VALUE, KEY_COLUMN, 10, true); - Assert.assertEquals(ImmutableSet.of(SEARCH_KEY_VALUE), correlatedValues); + Assert.assertEquals(Optional.of(ImmutableSet.of(SEARCH_KEY_VALUE)), correlatedValues); } @Test public void getCorrelatedColumnValuesForSearchValueAndRetrieveKeyColumnWithMaxLimitSetShouldHonorMaxLimit() { - Set correlatedValues = target.getCorrelatedColumnValues( + Optional> correlatedValues = target.getCorrelatedColumnValues( VALUE_COLUMN, SEARCH_VALUE_VALUE, KEY_COLUMN, 0, true); - Assert.assertEquals(ImmutableSet.of(), correlatedValues); + Assert.assertEquals(Optional.empty(), correlatedValues); } @Test public void getCorrelatedColumnValuesForSearchUnknownValueAndRetrieveKeyColumnShouldReturnNoCorrelatedValues() { - Set correlatedValues = target.getCorrelatedColumnValues( + Optional> correlatedValues = target.getCorrelatedColumnValues( VALUE_COLUMN, SEARCH_VALUE_UNKNOWN, KEY_COLUMN, 10, true); - Assert.assertEquals(ImmutableSet.of(), correlatedValues); + Assert.assertEquals(Optional.of(ImmutableSet.of()), correlatedValues); } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index 39800c1911d..a12775078bf 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -162,7 +162,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase public static final String DUMMY_SQL_ID = "dummy"; public static final String LOS_ANGELES = "America/Los_Angeles"; - static final ImmutableMap.Builder DEFAULT_QUERY_CONTEXT_BUILDER = + private static final ImmutableMap.Builder DEFAULT_QUERY_CONTEXT_BUILDER = ImmutableMap.builder() .put(PlannerContext.CTX_SQL_QUERY_ID, DUMMY_SQL_ID) .put(PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z") diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 80330453aa8..87838e201d8 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -8085,6 +8085,42 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ); } + @Test + @Parameters(source = QueryContextForJoinProvider.class) + public void testFilterAndGroupByLookupUsingJoinOperatorWithValueFilterPushdownMatchesNothig(Map queryContext) throws Exception + { + // Cannot vectorize JOIN operator. + cannotVectorize(); + + testQuery( + "SELECT lookyloo.k, COUNT(*)\n" + + "FROM foo LEFT JOIN lookup.lookyloo ON foo.dim2 = lookyloo.k\n" + + "WHERE lookyloo.v = '123'\n" + + "GROUP BY lookyloo.k", + queryContext, + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + join( + new TableDataSource(CalciteTests.DATASOURCE1), + new LookupDataSource("lookyloo"), + "j0.", + equalsCondition(DruidExpression.fromColumn("dim2"), DruidExpression.fromColumn("j0.k")), + JoinType.LEFT + ) + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setDimFilter(selector("j0.v", "123", null)) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions(new DefaultDimensionSpec("j0.k", "d0"))) + .setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0"))) + .setContext(queryContext) + .build() + ), + ImmutableList.of() + ); + } + @Test @Parameters(source = QueryContextForJoinProvider.class) public void testFilterAndGroupByLookupUsingJoinOperatorAllowNulls(Map queryContext) throws Exception @@ -8217,6 +8253,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest { // Cannot vectorize JOIN operator. cannotVectorize(); + testQuery( "SELECT lookyloo.k, COUNT(*)\n" + "FROM foo LEFT JOIN lookup.lookyloo ON foo.dim2 = lookyloo.k\n"