Refactor JoinFilterAnalyzer (#9921)

* Refactor JoinFilterAnalyzer

This patch attempts to make it easier to follow the join filter analysis code
with the hope of making it easier to add rewrite optimizations in the future.

To keep the patch small and easy to review, this is the first of at least 2
patches that are planned.

This patch adds a builder to the Pre-Analysis, so that it is easier to
instantiate the preAnalysis. It also moves some of the filter normalization
code out to Fitlers with associated tests.

* fix tests
This commit is contained in:
Suneet Saldanha 2020-05-28 22:32:09 -07:00 committed by GitHub
parent 880a7943b3
commit faef31a0af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 362 additions and 109 deletions

View File

@ -49,6 +49,7 @@ import org.apache.druid.segment.filter.cnf.HiveCnfHelper;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
@ -521,4 +522,25 @@ public class Filters
return new OrFilter(filterSet);
}
/**
* @param filter the filter.
* @return The normalized or clauses for the provided filter.
*/
public static Set<Filter> toNormalizedOrClauses(Filter filter)
{
Filter normalizedFilter = Filters.toCnf(filter);
// List of candidates for pushdown
// CNF normalization will generate either
// - an AND filter with multiple subfilters
// - or a single non-AND subfilter which cannot be split further
Set<Filter> normalizedOrClauses;
if (normalizedFilter instanceof AndFilter) {
normalizedOrClauses = ((AndFilter) normalizedFilter).getFilters();
} else {
normalizedOrClauses = Collections.singleton(normalizedFilter);
}
return normalizedOrClauses;
}
}

View File

@ -30,7 +30,6 @@ import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.filter.AndFilter;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.filter.OrFilter;
import org.apache.druid.segment.filter.SelectorFilter;
@ -42,7 +41,6 @@ import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@ -76,7 +74,7 @@ import java.util.Set;
*
* The result of this pre-analysis method should be passed into the next step of join filter analysis, described below.
*
* The {@link #splitFilter(JoinFilterPreAnalysis)} method takes the pre-analysis result and optionally applies the\
* The {@link #splitFilter(JoinFilterPreAnalysis)} method takes the pre-analysis result and optionally applies the
* filter rewrite and push down operations on a per-segment level.
*/
public class JoinFilterAnalyzer
@ -118,34 +116,15 @@ public class JoinFilterAnalyzer
final List<VirtualColumn> postJoinVirtualColumns = new ArrayList<>();
splitVirtualColumns(joinableClauses, virtualColumns, preJoinVirtualColumns, postJoinVirtualColumns);
JoinFilterPreAnalysis.Builder preAnalysisBuilder =
new JoinFilterPreAnalysis.Builder(joinableClauses, originalFilter, postJoinVirtualColumns)
.withEnableFilterPushDown(enableFilterPushDown)
.withEnableFilterRewrite(enableFilterRewrite);
if (originalFilter == null || !enableFilterPushDown) {
return new JoinFilterPreAnalysis(
joinableClauses,
originalFilter,
postJoinVirtualColumns,
null,
null,
null,
null,
enableFilterPushDown,
enableFilterRewrite,
Collections.emptyMap()
);
return preAnalysisBuilder.build();
}
Filter normalizedFilter = Filters.toCnf(originalFilter);
// List of candidates for pushdown
// CNF normalization will generate either
// - an AND filter with multiple subfilters
// - or a single non-AND subfilter which cannot be split further
Set<Filter> normalizedOrClauses;
if (normalizedFilter instanceof AndFilter) {
normalizedOrClauses = ((AndFilter) normalizedFilter).getFilters();
} else {
normalizedOrClauses = Collections.singleton(normalizedFilter);
}
Set<Filter> normalizedOrClauses = Filters.toNormalizedOrClauses(originalFilter);
List<Filter> normalizedBaseTableClauses = new ArrayList<>();
List<Filter> normalizedJoinTableClauses = new ArrayList<>();
@ -161,68 +140,17 @@ public class JoinFilterAnalyzer
normalizedBaseTableClauses.add(orClause);
}
}
preAnalysisBuilder
.withNormalizedBaseTableClauses(normalizedBaseTableClauses)
.withNormalizedJoinTableClauses(normalizedJoinTableClauses);
if (!enableFilterRewrite) {
return new JoinFilterPreAnalysis(
joinableClauses,
originalFilter,
postJoinVirtualColumns,
normalizedBaseTableClauses,
normalizedJoinTableClauses,
null,
null,
enableFilterPushDown,
enableFilterRewrite,
Collections.emptyMap()
);
return preAnalysisBuilder.build();
}
// build the equicondition map, used for determining how the tables are connected through joins
Map<String, Set<Expr>> equiconditions = new HashMap<>();
for (JoinableClause clause : joinableClauses) {
for (Equality equality : clause.getCondition().getEquiConditions()) {
Set<Expr> exprsForRhs = equiconditions.computeIfAbsent(
clause.getPrefix() + equality.getRightColumn(),
(rhs) -> new HashSet<>()
);
exprsForRhs.add(equality.getLeftExpr());
}
}
Map<String, Set<Expr>> equiconditions = preAnalysisBuilder.computeEquiconditionsFromJoinableClauses();
// Determine candidates for filter rewrites.
// A candidate is an RHS column that appears in a filter, along with the value being filtered on, plus
// the joinable clause associated with the table that the RHS column is from.
Set<RhsRewriteCandidate> rhsRewriteCandidates = new LinkedHashSet<>();
for (Filter orClause : normalizedJoinTableClauses) {
if (filterMatchesNull(orClause)) {
continue;
}
if (orClause instanceof OrFilter) {
for (Filter subFilter : ((OrFilter) orClause).getFilters()) {
Optional<RhsRewriteCandidate> rhsRewriteCandidate = determineRhsRewriteCandidatesForSingleFilter(
subFilter,
equiconditions,
joinableClauses
);
if (rhsRewriteCandidate.isPresent()) {
rhsRewriteCandidates.add(rhsRewriteCandidate.get());
}
}
continue;
}
Optional<RhsRewriteCandidate> rhsRewriteCandidate = determineRhsRewriteCandidatesForSingleFilter(
orClause,
equiconditions,
joinableClauses
);
if (rhsRewriteCandidate.isPresent()) {
rhsRewriteCandidates.add(rhsRewriteCandidate.get());
}
}
Set<RhsRewriteCandidate> rhsRewriteCandidates = getRhsRewriteCandidates(normalizedJoinTableClauses, equiconditions, joinableClauses);
// Build a map of RHS table prefix -> JoinFilterColumnCorrelationAnalysis based on the RHS rewrite candidates
Map<String, Optional<Map<String, JoinFilterColumnCorrelationAnalysis>>> correlationsByPrefix = new HashMap<>();
@ -248,7 +176,6 @@ public class JoinFilterAnalyzer
assert (baseColumnAnalysis != null);
return Optional.of(correlatedBaseTableColumns.get().get(c));
}
}
);
} else {
@ -280,9 +207,7 @@ public class JoinFilterAnalyzer
List<JoinFilterColumnCorrelationAnalysis> perColumnCorrelations =
correlationsByDirectFilteringColumn.computeIfAbsent(
rhsRewriteCandidate.getRhsColumn(),
(rhsCol) -> {
return new ArrayList<>();
}
(rhsCol) -> new ArrayList<>()
);
perColumnCorrelations.add(
directRewriteCorrelations.get(rhsRewriteCandidate.getRhsColumn()).get()
@ -299,9 +224,7 @@ public class JoinFilterAnalyzer
List<JoinFilterColumnCorrelationAnalysis> perColumnCorrelations =
correlationsByFilteringColumn.computeIfAbsent(
rhsRewriteCandidate.getRhsColumn(),
(rhsCol) -> {
return new ArrayList<>();
}
(rhsCol) -> new ArrayList<>()
);
perColumnCorrelations.add(correlationForPrefix.getValue());
correlationForPrefix.getValue().getCorrelatedValuesMap().computeIfAbsent(
@ -348,20 +271,10 @@ public class JoinFilterAnalyzer
correlationsByDirectFilteringColumn.put(correlation.getKey(), dedupList);
}
}
preAnalysisBuilder.withCorrelationsByFilteringColumn(correlationsByFilteringColumn)
.withCorrelationsByDirectFilteringColumn(correlationsByDirectFilteringColumn);
return new JoinFilterPreAnalysis(
joinableClauses,
originalFilter,
postJoinVirtualColumns,
normalizedBaseTableClauses,
normalizedJoinTableClauses,
correlationsByFilteringColumn,
correlationsByDirectFilteringColumn,
enableFilterPushDown,
enableFilterRewrite,
equiconditions
);
return preAnalysisBuilder.build();
}
private static Optional<RhsRewriteCandidate> determineRhsRewriteCandidatesForSingleFilter(
@ -414,9 +327,7 @@ public class JoinFilterAnalyzer
{
if (requiredColumns.size() == 1) {
String reqColumn = requiredColumns.iterator().next();
if (equiconditions.containsKey(reqColumn)) {
return true;
}
return equiconditions.containsKey(reqColumn);
}
return false;
}
@ -1052,6 +963,57 @@ public class JoinFilterAnalyzer
}
}
/**
* Determine candidates for filter rewrites.
* A candidate is an RHS column that appears in a filter, along with the value being filtered on, plus
* the joinable clause associated with the table that the RHS column is from.
*
* These candidates are redued to filter rewrite correlations.
*
* @param normalizedJoinTableClauses
* @param equiconditions
* @param joinableClauses
* @return A set of candidates for filter rewrites.
*/
private static Set<RhsRewriteCandidate> getRhsRewriteCandidates(
List<Filter> normalizedJoinTableClauses,
Map<String, Set<Expr>> equiconditions,
List<JoinableClause> joinableClauses)
{
Set<RhsRewriteCandidate> rhsRewriteCandidates = new LinkedHashSet<>();
for (Filter orClause : normalizedJoinTableClauses) {
if (filterMatchesNull(orClause)) {
continue;
}
if (orClause instanceof OrFilter) {
for (Filter subFilter : ((OrFilter) orClause).getFilters()) {
Optional<RhsRewriteCandidate> rhsRewriteCandidate = determineRhsRewriteCandidatesForSingleFilter(
subFilter,
equiconditions,
joinableClauses
);
rhsRewriteCandidate.ifPresent(rhsRewriteCandidates::add);
}
continue;
}
Optional<RhsRewriteCandidate> rhsRewriteCandidate = determineRhsRewriteCandidatesForSingleFilter(
orClause,
equiconditions,
joinableClauses
);
rhsRewriteCandidate.ifPresent(rhsRewriteCandidates::add);
}
return rhsRewriteCandidates;
}
/**
* A candidate is an RHS column that appears in a filter, along with the value being filtered on, plus
* the joinable clause associated with the table that the RHS column is from.
*/
private static class RhsRewriteCandidate
{
private final boolean isDirectRewrite;

View File

@ -22,8 +22,14 @@ package org.apache.druid.segment.join.filter;
import org.apache.druid.math.expr.Expr;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.join.Equality;
import org.apache.druid.segment.join.JoinableClause;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -53,7 +59,7 @@ public class JoinFilterPreAnalysis
private final List<VirtualColumn> postJoinVirtualColumns;
private final Map<String, Set<Expr>> equiconditions;
public JoinFilterPreAnalysis(
private JoinFilterPreAnalysis(
final List<JoinableClause> joinableClauses,
final Filter originalFilter,
final List<VirtualColumn> postJoinVirtualColumns,
@ -127,5 +133,105 @@ public class JoinFilterPreAnalysis
{
return equiconditions;
}
/**
* A Builder class to build {@link JoinFilterPreAnalysis}
*/
public static class Builder
{
@Nonnull private final List<JoinableClause> joinableClauses;
@Nullable private final Filter originalFilter;
@Nullable private List<Filter> normalizedBaseTableClauses;
@Nullable private List<Filter> normalizedJoinTableClauses;
@Nullable private Map<String, List<JoinFilterColumnCorrelationAnalysis>> correlationsByFilteringColumn;
@Nullable private Map<String, List<JoinFilterColumnCorrelationAnalysis>> correlationsByDirectFilteringColumn;
private boolean enableFilterPushDown = false;
private boolean enableFilterRewrite = false;
@Nonnull private final List<VirtualColumn> postJoinVirtualColumns;
@Nonnull private Map<String, Set<Expr>> equiconditions = Collections.emptyMap();
public Builder(
@Nonnull List<JoinableClause> joinableClauses,
@Nullable Filter originalFilter,
@Nonnull List<VirtualColumn> postJoinVirtualColumns
)
{
this.joinableClauses = joinableClauses;
this.originalFilter = originalFilter;
this.postJoinVirtualColumns = postJoinVirtualColumns;
}
public Builder withNormalizedBaseTableClauses(List<Filter> normalizedBaseTableClauses)
{
this.normalizedBaseTableClauses = normalizedBaseTableClauses;
return this;
}
public Builder withNormalizedJoinTableClauses(List<Filter> normalizedJoinTableClauses)
{
this.normalizedJoinTableClauses = normalizedJoinTableClauses;
return this;
}
public Builder withCorrelationsByFilteringColumn(
Map<String, List<JoinFilterColumnCorrelationAnalysis>> correlationsByFilteringColumn
)
{
this.correlationsByFilteringColumn = correlationsByFilteringColumn;
return this;
}
public Builder withCorrelationsByDirectFilteringColumn(
Map<String, List<JoinFilterColumnCorrelationAnalysis>> correlationsByDirectFilteringColumn
)
{
this.correlationsByDirectFilteringColumn = correlationsByDirectFilteringColumn;
return this;
}
public Builder withEnableFilterPushDown(boolean enableFilterPushDown)
{
this.enableFilterPushDown = enableFilterPushDown;
return this;
}
public Builder withEnableFilterRewrite(boolean enableFilterRewrite)
{
this.enableFilterRewrite = enableFilterRewrite;
return this;
}
public Map<String, Set<Expr>> computeEquiconditionsFromJoinableClauses()
{
this.equiconditions = new HashMap<>();
for (JoinableClause clause : joinableClauses) {
for (Equality equality : clause.getCondition().getEquiConditions()) {
Set<Expr> exprsForRhs = equiconditions.computeIfAbsent(
clause.getPrefix() + equality.getRightColumn(),
(rhs) -> new HashSet<>()
);
exprsForRhs.add(equality.getLeftExpr());
}
}
return equiconditions;
}
public JoinFilterPreAnalysis build()
{
return new JoinFilterPreAnalysis(
joinableClauses,
originalFilter,
postJoinVirtualColumns,
normalizedBaseTableClauses,
normalizedJoinTableClauses,
correlationsByFilteringColumn,
correlationsByDirectFilteringColumn,
enableFilterPushDown,
enableFilterRewrite,
equiconditions
);
}
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.segment.filter;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.dimension.DimensionSpec;
@ -35,6 +36,7 @@ import org.junit.Test;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -155,6 +157,51 @@ public class FilterCnfConversionTest
assertFilter(muchReducible, expected, cnf);
}
@Test
public void testToNormalizedOrClausesWithMuchReducibleFilter()
{
final Filter muchReducible = FilterTestUtils.and(
// should be flattened
FilterTestUtils.and(
FilterTestUtils.and(
FilterTestUtils.and(FilterTestUtils.selector("col1", "val1"))
)
),
// should be flattened
FilterTestUtils.and(
FilterTestUtils.or(
FilterTestUtils.and(FilterTestUtils.selector("col1", "val1"))
)
),
// should be flattened
FilterTestUtils.or(
FilterTestUtils.and(
FilterTestUtils.or(FilterTestUtils.selector("col1", "val1"))
)
),
// should eliminate duplicate filters
FilterTestUtils.selector("col1", "val1"),
FilterTestUtils.selector("col2", "val2"),
FilterTestUtils.and(
FilterTestUtils.selector("col1", "val1"),
FilterTestUtils.selector("col2", "val2")
),
FilterTestUtils.and(
FilterTestUtils.selector("col1", "val1"),
FilterTestUtils.and(
FilterTestUtils.selector("col2", "val2"),
FilterTestUtils.selector("col1", "val1")
)
)
);
final Set<Filter> expected = ImmutableSet.of(
FilterTestUtils.selector("col1", "val1"),
FilterTestUtils.selector("col2", "val2")
);
final Set<Filter> normalizedOrClauses = Filters.toNormalizedOrClauses(muchReducible);
Assert.assertEquals(expected, normalizedOrClauses);
}
@Test
public void testToCnfWithComplexFilterIncludingNotAndOr()
{
@ -259,6 +306,110 @@ public class FilterCnfConversionTest
assertFilter(filter, expected, cnf);
}
@Test
public void testToNormalizedOrClausesWithComplexFilterIncludingNotAndOr()
{
final Filter filter = FilterTestUtils.and(
FilterTestUtils.or(
FilterTestUtils.and(
FilterTestUtils.selector("col1", "val1"),
FilterTestUtils.selector("col2", "val2")
),
FilterTestUtils.not(
FilterTestUtils.and(
FilterTestUtils.selector("col4", "val4"),
FilterTestUtils.selector("col5", "val5")
)
)
),
FilterTestUtils.or(
FilterTestUtils.not(
FilterTestUtils.or(
FilterTestUtils.selector("col2", "val2"),
FilterTestUtils.selector("col4", "val4"),
FilterTestUtils.selector("col5", "val5")
)
),
FilterTestUtils.and(
FilterTestUtils.selector("col1", "val1"),
FilterTestUtils.selector("col3", "val3")
)
),
FilterTestUtils.and(
FilterTestUtils.or(
FilterTestUtils.selector("col1", "val1"),
FilterTestUtils.selector("col2", "val22"), // selecting different value
FilterTestUtils.selector("col3", "val3")
),
FilterTestUtils.not(
FilterTestUtils.selector("col1", "val11")
)
),
FilterTestUtils.and(
FilterTestUtils.or(
FilterTestUtils.selector("col1", "val1"),
FilterTestUtils.selector("col2", "val22"),
FilterTestUtils.selector("col3", "val3")
),
FilterTestUtils.not(
FilterTestUtils.selector("col1", "val11") // selecting different value
)
)
);
final Set<Filter> expected = ImmutableSet.of(
FilterTestUtils.or(
FilterTestUtils.selector("col1", "val1"),
FilterTestUtils.selector("col2", "val22"),
FilterTestUtils.selector("col3", "val3")
),
FilterTestUtils.or(
FilterTestUtils.selector("col1", "val1"),
FilterTestUtils.not(FilterTestUtils.selector("col2", "val2"))
),
FilterTestUtils.or(
FilterTestUtils.not(FilterTestUtils.selector("col2", "val2")),
FilterTestUtils.selector("col3", "val3")
),
FilterTestUtils.or(
FilterTestUtils.selector("col1", "val1"),
FilterTestUtils.not(FilterTestUtils.selector("col4", "val4"))
),
FilterTestUtils.or(
FilterTestUtils.selector("col3", "val3"),
FilterTestUtils.not(FilterTestUtils.selector("col4", "val4"))
),
FilterTestUtils.or(
FilterTestUtils.selector("col1", "val1"),
FilterTestUtils.not(FilterTestUtils.selector("col5", "val5"))
),
FilterTestUtils.or(
FilterTestUtils.selector("col3", "val3"),
FilterTestUtils.not(FilterTestUtils.selector("col5", "val5"))
),
FilterTestUtils.not(FilterTestUtils.selector("col1", "val11")),
// The below OR filter could be eliminated because this filter also has
// (col1 = val1 || ~(col4 = val4)) && (col1 = val1 || ~(col5 = val5)).
// The reduction process would be
// (col1 = val1 || ~(col4 = val4)) && (col1 = val1 || ~(col5 = val5)) && (col1 = val1 || ~(col4 = val4) || ~(col5 = val5))
// => (col1 = val1 && ~(col4 = val4) || ~(col5 = val5)) && (col1 = val1 || ~(col4 = val4) || ~(col5 = val5))
// => (col1 = val1 && ~(col4 = val4) || ~(col5 = val5))
// => (col1 = val1 || ~(col4 = val4)) && (col1 = val1 || ~(col5 = val5)).
// However, we don't have this reduction now, so we have a filter in a suboptimized CNF.
FilterTestUtils.or(
FilterTestUtils.selector("col1", "val1"),
FilterTestUtils.not(FilterTestUtils.selector("col4", "val4")),
FilterTestUtils.not(FilterTestUtils.selector("col5", "val5"))
),
FilterTestUtils.or(
FilterTestUtils.selector("col2", "val2"),
FilterTestUtils.not(FilterTestUtils.selector("col4", "val4")),
FilterTestUtils.not(FilterTestUtils.selector("col5", "val5"))
)
);
final Set<Filter> normalizedOrClauses = Filters.toNormalizedOrClauses(filter);
Assert.assertEquals(expected, normalizedOrClauses);
}
@Test
public void testToCnfCollapsibleBigFilter()
{
@ -355,6 +506,18 @@ public class FilterCnfConversionTest
assertFilter(filter, expectedCnf, Filters.toCnf(filter));
}
@Test
public void testToNormalizedOrClausesNonAndFilterShouldReturnSingleton()
{
Filter filter = FilterTestUtils.or(
FilterTestUtils.selector("col1", "val1"),
FilterTestUtils.selector("col2", "val2")
);
Set<Filter> expected = Collections.singleton(filter);
Set<Filter> normalizedOrClauses = Filters.toNormalizedOrClauses(filter);
Assert.assertEquals(expected, normalizedOrClauses);
}
@Test
public void testTrueFalseFilterRequiredColumnRewrite()
{