Directly rewrite filters on RHS join columns into LHS equivalents (#9818)

* Directly rewrite filters on RHS join columns into LHS equivalents

* PR comments

* Fix inspection

* Revert unnecessary ExprMacroTable change

* Fix build after merge

* Address PR comments
This commit is contained in:
Jonathan Wei 2020-05-08 23:45:35 -07:00 committed by GitHub
parent 28be107a1c
commit 16d293d6e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 1389 additions and 220 deletions

View File

@ -28,6 +28,7 @@ import org.apache.druid.segment.ColumnSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import java.util.Map;
import java.util.Set;
public interface Filter
@ -162,4 +163,29 @@ public interface Filter
* can be expected to have a bitmap index retrievable via {@link BitmapIndexSelector#getBitmapIndex(String)}
*/
Set<String> getRequiredColumns();
/**
* Returns true is this filter is able to return a copy of this filter that is identical to this filter except that it
* operates on different columns, based on a renaming map.
*/
default boolean supportsRequiredColumnRewrite()
{
return false;
}
/**
* Return a copy of this filter that is identical to the this filter except that it operates on different columns,
* based on a renaming map where the key is the column to be renamed in the filter, and the value is the new
* column name.
*
* For example, if I have a filter (A = hello), and I have a renaming map (A -> B),
* this should return the filter (B = hello)
*
* @param columnRewrites Column rewrite map
* @return Copy of this filter that operates on new columns based on the rewrite map
*/
default Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
{
throw new UnsupportedOperationException("Required column rewrite is not supported by this filter.");
}
}

View File

@ -363,5 +363,26 @@ public class LikeDimFilter implements DimFilter
{
return suffixMatch;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LikeMatcher that = (LikeMatcher) o;
return getSuffixMatch() == that.getSuffixMatch() &&
Objects.equals(getPrefix(), that.getPrefix()) &&
Objects.equals(pattern.toString(), that.pattern.toString());
}
@Override
public int hashCode()
{
return Objects.hash(getSuffixMatch(), getPrefix(), pattern.toString());
}
}
}

View File

@ -24,6 +24,7 @@ import com.google.common.base.Supplier;
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.BitmapResultFactory;
import org.apache.druid.query.extraction.ExtractionFn;
@ -47,6 +48,7 @@ import org.apache.druid.segment.column.BitmapIndex;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@ -172,6 +174,39 @@ public class BoundFilter implements Filter
return boundDimFilter.getRequiredColumns();
}
@Override
public boolean supportsRequiredColumnRewrite()
{
return true;
}
@Override
public Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
{
String rewriteDimensionTo = columnRewrites.get(boundDimFilter.getDimension());
if (rewriteDimensionTo == null) {
throw new IAE(
"Received a non-applicable rewrite: %s, filter's dimension: %s",
columnRewrites,
boundDimFilter.getDimension()
);
}
BoundDimFilter newDimFilter = new BoundDimFilter(
rewriteDimensionTo,
boundDimFilter.getLower(),
boundDimFilter.getUpper(),
boundDimFilter.isLowerStrict(),
boundDimFilter.isUpperStrict(),
null,
boundDimFilter.getExtractionFn(),
boundDimFilter.getOrdering()
);
return new BoundFilter(
newDimFilter
);
}
private static Pair<Integer, Integer> getStartEndIndexes(
final BoundDimFilter boundDimFilter,
final BitmapIndex bitmapIndex

View File

@ -40,17 +40,18 @@ import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import java.util.Objects;
import java.util.Set;
/**
*/
public class DimensionPredicateFilter implements Filter
{
private final String dimension;
private final DruidPredicateFactory predicateFactory;
private final String basePredicateString;
private final ExtractionFn extractionFn;
private final FilterTuning filterTuning;
protected final String dimension;
protected final DruidPredicateFactory predicateFactory;
protected final String basePredicateString;
protected final ExtractionFn extractionFn;
protected final FilterTuning filterTuning;
public DimensionPredicateFilter(
final String dimension,
@ -218,4 +219,26 @@ public class DimensionPredicateFilter implements Filter
return StringUtils.format("%s = %s", dimension, basePredicateString);
}
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DimensionPredicateFilter that = (DimensionPredicateFilter) o;
return Objects.equals(dimension, that.dimension) &&
Objects.equals(basePredicateString, that.basePredicateString) &&
Objects.equals(extractionFn, that.extractionFn) &&
Objects.equals(filterTuning, that.filterTuning);
}
@Override
public int hashCode()
{
return Objects.hash(dimension, basePredicateString, extractionFn, filterTuning);
}
}

View File

@ -176,4 +176,11 @@ public class ExpressionFilter implements Filter
{
return requiredBindings.get();
}
@Override
public boolean supportsRequiredColumnRewrite()
{
// We could support this, but need a good approach to rewriting the identifiers within an expression.
return false;
}
}

View File

@ -30,6 +30,7 @@ import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
public class FalseFilter implements Filter
@ -99,6 +100,18 @@ public class FalseFilter implements Filter
return Collections.emptySet();
}
@Override
public boolean supportsRequiredColumnRewrite()
{
return true;
}
@Override
public Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
{
return this;
}
@Override
public String toString()
{

View File

@ -498,4 +498,27 @@ public class Filters
return new AndFilter(filterList);
}
/**
* Create a filter representing an OR relationship across a set of filters.
*
* @param filterSet Set of filters
*
* @return If filterSet has more than one element, return an OR filter composed of the filters from filterSet
* If filterSet has a single element, return that element alone
* If filterSet is empty, return null
*/
@Nullable
public static Filter or(Set<Filter> filterSet)
{
if (filterSet.isEmpty()) {
return null;
}
if (filterSet.size() == 1) {
return filterSet.iterator().next();
}
return new OrFilter(filterSet);
}
}

View File

@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableSet;
import it.unimi.dsi.fastutil.ints.IntIterable;
import it.unimi.dsi.fastutil.ints.IntIterator;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.BitmapResultFactory;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.filter.BitmapIndexSelector;
@ -45,6 +46,7 @@ import org.apache.druid.segment.column.BitmapIndex;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@ -184,6 +186,31 @@ public class InFilter implements Filter
return ImmutableSet.of(dimension);
}
@Override
public boolean supportsRequiredColumnRewrite()
{
return true;
}
@Override
public Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
{
String rewriteDimensionTo = columnRewrites.get(dimension);
if (rewriteDimensionTo == null) {
throw new IAE("Received a non-applicable rewrite: %s, filter's dimension: %s", columnRewrites, dimension);
}
return new InFilter(
rewriteDimensionTo,
values,
longPredicateSupplier,
floatPredicateSupplier,
doublePredicateSupplier,
extractionFn,
filterTuning
);
}
@Override
public boolean supportsBitmapIndex(BitmapIndexSelector selector)
{

View File

@ -25,6 +25,7 @@ import it.unimi.dsi.fastutil.ints.IntIterable;
import it.unimi.dsi.fastutil.ints.IntIterator;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.BitmapResultFactory;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.filter.BitmapIndexSelector;
@ -44,7 +45,9 @@ import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
public class LikeFilter implements Filter
@ -107,6 +110,33 @@ public class LikeFilter implements Filter
return ImmutableSet.of(dimension);
}
@Override
public boolean supportsRequiredColumnRewrite()
{
return true;
}
@Override
public Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
{
String rewriteDimensionTo = columnRewrites.get(dimension);
if (rewriteDimensionTo == null) {
throw new IAE(
"Received a non-applicable rewrite: %s, filter's dimension: %s",
columnRewrites,
dimension
);
}
return new LikeFilter(
rewriteDimensionTo,
extractionFn,
likeMatcher,
filterTuning
);
}
@Override
public boolean supportsBitmapIndex(BitmapIndexSelector selector)
{
@ -253,4 +283,26 @@ public class LikeFilter implements Filter
}
};
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LikeFilter that = (LikeFilter) o;
return Objects.equals(dimension, that.dimension) &&
Objects.equals(extractionFn, that.extractionFn) &&
Objects.equals(likeMatcher, that.likeMatcher) &&
Objects.equals(filterTuning, that.filterTuning);
}
@Override
public int hashCode()
{
return Objects.hash(dimension, extractionFn, likeMatcher, filterTuning);
}
}

View File

@ -33,6 +33,7 @@ import org.apache.druid.segment.ColumnSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@ -111,6 +112,18 @@ public class NotFilter implements Filter
return baseFilter.getRequiredColumns();
}
@Override
public boolean supportsRequiredColumnRewrite()
{
return baseFilter.supportsRequiredColumnRewrite();
}
@Override
public Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
{
return new NotFilter(baseFilter.rewriteRequiredColumns(columnRewrites));
}
@Override
public boolean supportsBitmapIndex(BitmapIndexSelector selector)
{

View File

@ -20,19 +20,25 @@
package org.apache.druid.segment.filter;
import com.google.common.base.Predicate;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.filter.DruidDoublePredicate;
import org.apache.druid.query.filter.DruidFloatPredicate;
import org.apache.druid.query.filter.DruidLongPredicate;
import org.apache.druid.query.filter.DruidPredicateFactory;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.filter.FilterTuning;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Pattern;
/**
*/
public class RegexFilter extends DimensionPredicateFilter
{
private final Pattern pattern;
public RegexFilter(
final String dimension,
final Pattern pattern,
@ -79,5 +85,55 @@ public class RegexFilter extends DimensionPredicateFilter
extractionFn,
filterTuning
);
this.pattern = pattern;
}
@Override
public boolean supportsRequiredColumnRewrite()
{
return true;
}
@Override
public Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
{
String rewriteDimensionTo = columnRewrites.get(dimension);
if (rewriteDimensionTo == null) {
throw new IAE(
"Received a non-applicable rewrite: %s, filter's dimension: %s",
columnRewrites,
dimension
);
}
return new RegexFilter(
rewriteDimensionTo,
pattern,
extractionFn,
filterTuning
);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
RegexFilter that = (RegexFilter) o;
return Objects.equals(pattern.toString(), that.pattern.toString());
}
@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), pattern.toString());
}
}

View File

@ -22,18 +22,25 @@ package org.apache.druid.segment.filter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Predicate;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.filter.DruidDoublePredicate;
import org.apache.druid.query.filter.DruidFloatPredicate;
import org.apache.druid.query.filter.DruidLongPredicate;
import org.apache.druid.query.filter.DruidPredicateFactory;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.filter.FilterTuning;
import org.apache.druid.query.search.SearchQuerySpec;
import java.util.Map;
import java.util.Objects;
/**
*/
public class SearchQueryFilter extends DimensionPredicateFilter
{
private final SearchQuerySpec query;
@JsonCreator
public SearchQueryFilter(
@JsonProperty("dimension") final String dimension,
@ -69,9 +76,68 @@ public class SearchQueryFilter extends DimensionPredicateFilter
{
return input -> query.accept(String.valueOf(input));
}
@Override
public String toString()
{
return "SearchFilter{" +
"query='" + query + '\'' +
'}';
}
},
extractionFn,
filterTuning
);
this.query = query;
}
@Override
public boolean supportsRequiredColumnRewrite()
{
return true;
}
@Override
public Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
{
String rewriteDimensionTo = columnRewrites.get(dimension);
if (rewriteDimensionTo == null) {
throw new IAE(
"Received a non-applicable rewrite: %s, filter's dimension: %s",
columnRewrites,
dimension
);
}
return new SearchQueryFilter(
rewriteDimensionTo,
query,
extractionFn,
filterTuning
);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
SearchQueryFilter that = (SearchQueryFilter) o;
return Objects.equals(query, that.query);
}
@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), query);
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.segment.filter;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.BitmapResultFactory;
import org.apache.druid.query.filter.BitmapIndexSelector;
@ -34,6 +35,7 @@ import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@ -128,6 +130,31 @@ public class SelectorFilter implements Filter
return ImmutableSet.of(dimension);
}
@Override
public boolean supportsRequiredColumnRewrite()
{
return true;
}
@Override
public Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
{
String rewriteDimensionTo = columnRewrites.get(dimension);
if (rewriteDimensionTo == null) {
throw new IAE(
"Received a non-applicable rewrite: %s, filter's dimension: %s",
columnRewrites,
dimension
);
}
return new SelectorFilter(
rewriteDimensionTo,
value
);
}
@Override
public String toString()
{

View File

@ -30,6 +30,7 @@ import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
/**
@ -95,6 +96,18 @@ public class TrueFilter implements Filter
return Collections.emptySet();
}
@Override
public boolean supportsRequiredColumnRewrite()
{
return true;
}
@Override
public Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
{
return this;
}
@Override
public double estimateSelectivity(BitmapIndexSelector indexSelector)
{

View File

@ -19,12 +19,9 @@
package org.apache.druid.segment.join.filter;
import com.google.common.collect.ImmutableList;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.VirtualColumn;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Optional;
/**
@ -40,19 +37,16 @@ public class JoinFilterAnalysis
private final boolean retainAfterJoin;
private final Filter originalFilter;
private final Optional<Filter> pushDownFilter;
private final List<VirtualColumn> pushDownVirtualColumns;
public JoinFilterAnalysis(
boolean retainAfterJoin,
Filter originalFilter,
@Nullable Filter pushDownFilter,
List<VirtualColumn> pushDownVirtualColumns
@Nullable Filter pushDownFilter
)
{
this.retainAfterJoin = retainAfterJoin;
this.originalFilter = originalFilter;
this.pushDownFilter = pushDownFilter == null ? Optional.empty() : Optional.of(pushDownFilter);
this.pushDownVirtualColumns = pushDownVirtualColumns;
}
public boolean isCanPushDown()
@ -75,11 +69,6 @@ public class JoinFilterAnalysis
return pushDownFilter;
}
public List<VirtualColumn> getPushDownVirtualColumns()
{
return pushDownVirtualColumns;
}
/**
* Utility method for generating an analysis that represents: "Filter cannot be pushed down"
*
@ -92,8 +81,7 @@ public class JoinFilterAnalysis
return new JoinFilterAnalysis(
true,
originalFilter,
null,
ImmutableList.of()
null
);
}
}

View File

@ -19,7 +19,8 @@
package org.apache.druid.segment.join.filter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.math.expr.Expr;
import org.apache.druid.query.filter.Filter;
@ -44,6 +45,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -60,7 +63,7 @@ import java.util.Set;
* A filter clause can be pushed down if it meets one of the following conditions:
* - The filter only applies to columns from the base table
* - The filter applies to columns from the join table, and we determine that the filter can be rewritten
* into a filter on columns from the base table
* into a filter on columns from the base table
*
* For the second case, where we rewrite filter clauses, the rewritten clause can be less selective than the original,
* so we preserve the original clause in the post-join filtering phase.
@ -86,7 +89,7 @@ public class JoinFilterAnalyzer
* where we convert the query filter (if any) into conjunctive normal form and then
* determine the structure of RHS filter rewrites (if any), since this information is shared across all
* per-segment operations.
*
*
* See {@link JoinFilterPreAnalysis} for details on the result of this pre-analysis step.
*
* @param joinableClauses The joinable clauses from the query
@ -124,8 +127,10 @@ public class JoinFilterAnalyzer
null,
null,
null,
null,
enableFilterPushDown,
enableFilterRewrite
enableFilterRewrite,
Collections.emptyMap()
);
}
@ -165,8 +170,10 @@ public class JoinFilterAnalyzer
normalizedBaseTableClauses,
normalizedJoinTableClauses,
null,
null,
enableFilterPushDown,
enableFilterRewrite
enableFilterRewrite,
Collections.emptyMap()
);
}
@ -182,64 +189,79 @@ public class JoinFilterAnalyzer
}
}
Map<String, Optional<Map<String, JoinFilterColumnCorrelationAnalysis>>> correlationsByPrefix = new HashMap<>();
// Determine candidates for filter rewrites.
// A candidate is an RHS column that appears in a filter, along with the value being filtered on, plus
// the joinable clause associated with the table that the RHS column is from.
Set<RhsRewriteCandidate> rhsRewriteCandidates = new HashSet<>();
Set<RhsRewriteCandidate> rhsRewriteCandidates = new LinkedHashSet<>();
for (Filter orClause : normalizedJoinTableClauses) {
if (filterMatchesNull(orClause)) {
continue;
}
if (orClause instanceof SelectorFilter) {
// this is a candidate for RHS filter rewrite, determine column correlations and correlated values
String reqColumn = ((SelectorFilter) orClause).getDimension();
String reqValue = ((SelectorFilter) orClause).getValue();
JoinableClause joinableClause = isColumnFromJoin(joinableClauses, reqColumn);
if (joinableClause != null) {
rhsRewriteCandidates.add(
new RhsRewriteCandidate(
joinableClause,
reqColumn,
reqValue
)
);
}
}
if (orClause instanceof OrFilter) {
for (Filter subFilter : ((OrFilter) orClause).getFilters()) {
if (subFilter instanceof SelectorFilter) {
String reqColumn = ((SelectorFilter) subFilter).getDimension();
String reqValue = ((SelectorFilter) subFilter).getValue();
JoinableClause joinableClause = isColumnFromJoin(joinableClauses, reqColumn);
if (joinableClause != null) {
rhsRewriteCandidates.add(
new RhsRewriteCandidate(
joinableClause,
reqColumn,
reqValue
)
);
}
Optional<RhsRewriteCandidate> rhsRewriteCandidate = determineRhsRewriteCandidatesForSingleFilter(
subFilter,
equiconditions,
joinableClauses
);
if (rhsRewriteCandidate.isPresent()) {
rhsRewriteCandidates.add(rhsRewriteCandidate.get());
}
}
continue;
}
Optional<RhsRewriteCandidate> rhsRewriteCandidate = determineRhsRewriteCandidatesForSingleFilter(
orClause,
equiconditions,
joinableClauses
);
if (rhsRewriteCandidate.isPresent()) {
rhsRewriteCandidates.add(rhsRewriteCandidate.get());
}
}
// Build a map of RHS table prefix -> JoinFilterColumnCorrelationAnalysis based on the RHS rewrite candidates
Map<String, Optional<Map<String, JoinFilterColumnCorrelationAnalysis>>> correlationsByPrefix = new HashMap<>();
Map<String, Optional<JoinFilterColumnCorrelationAnalysis>> directRewriteCorrelations = new HashMap<>();
for (RhsRewriteCandidate rhsRewriteCandidate : rhsRewriteCandidates) {
correlationsByPrefix.computeIfAbsent(
rhsRewriteCandidate.getJoinableClause().getPrefix(),
p -> findCorrelatedBaseTableColumns(
joinableClauses,
p,
rhsRewriteCandidate.getJoinableClause(),
equiconditions
)
);
if (rhsRewriteCandidate.isDirectRewrite()) {
directRewriteCorrelations.computeIfAbsent(
rhsRewriteCandidate.getRhsColumn(),
c -> {
Optional<Map<String, JoinFilterColumnCorrelationAnalysis>> correlatedBaseTableColumns =
findCorrelatedBaseTableColumns(
joinableClauses,
c,
rhsRewriteCandidate,
equiconditions
);
if (!correlatedBaseTableColumns.isPresent()) {
return Optional.empty();
} else {
JoinFilterColumnCorrelationAnalysis baseColumnAnalysis = correlatedBaseTableColumns.get().get(c);
// for direct rewrites, there will only be one analysis keyed by the RHS column
assert (baseColumnAnalysis != null);
return Optional.of(correlatedBaseTableColumns.get().get(c));
}
}
);
} else {
correlationsByPrefix.computeIfAbsent(
rhsRewriteCandidate.getJoinableClause().getPrefix(),
p -> findCorrelatedBaseTableColumns(
joinableClauses,
p,
rhsRewriteCandidate,
equiconditions
)
);
}
}
// Using the RHS table prefix -> JoinFilterColumnCorrelationAnalysis created in the previous step,
@ -248,23 +270,40 @@ public class JoinFilterAnalyzer
// JoinFilterColumnCorrelationAnalysis objects, which are shared across all rhsFilterColumn entries that belong
// to the same RHS table.
//
// The value is a List<JoinFilterColumnCorreationAnalysis> instead of a single value because a table can be joined
// The value is a List<JoinFilterColumnCorrelationAnalysis> instead of a single value because a table can be joined
// to another via multiple columns.
// (See JoinFilterAnalyzerTest.test_filterPushDown_factToRegionOneColumnToTwoRHSColumnsAndFilterOnRHS for an example)
Map<String, Optional<List<JoinFilterColumnCorrelationAnalysis>>> correlationsByFilteringColumn = new HashMap<>();
Map<String, List<JoinFilterColumnCorrelationAnalysis>> correlationsByFilteringColumn = new LinkedHashMap<>();
Map<String, List<JoinFilterColumnCorrelationAnalysis>> correlationsByDirectFilteringColumn = new LinkedHashMap<>();
for (RhsRewriteCandidate rhsRewriteCandidate : rhsRewriteCandidates) {
if (rhsRewriteCandidate.isDirectRewrite()) {
List<JoinFilterColumnCorrelationAnalysis> perColumnCorrelations =
correlationsByDirectFilteringColumn.computeIfAbsent(
rhsRewriteCandidate.getRhsColumn(),
(rhsCol) -> {
return new ArrayList<>();
}
);
perColumnCorrelations.add(
directRewriteCorrelations.get(rhsRewriteCandidate.getRhsColumn()).get()
);
continue;
}
Optional<Map<String, JoinFilterColumnCorrelationAnalysis>> correlationsForPrefix = correlationsByPrefix.get(
rhsRewriteCandidate.getJoinableClause().getPrefix()
);
if (correlationsForPrefix.isPresent()) {
for (Map.Entry<String, JoinFilterColumnCorrelationAnalysis> correlationForPrefix : correlationsForPrefix.get()
.entrySet()) {
Optional<List<JoinFilterColumnCorrelationAnalysis>> perColumnCorrelations =
List<JoinFilterColumnCorrelationAnalysis> perColumnCorrelations =
correlationsByFilteringColumn.computeIfAbsent(
rhsRewriteCandidate.getRhsColumn(),
(rhsCol) -> Optional.of(new ArrayList<>())
(rhsCol) -> {
return new ArrayList<>();
}
);
perColumnCorrelations.get().add(correlationForPrefix.getValue());
perColumnCorrelations.add(correlationForPrefix.getValue());
correlationForPrefix.getValue().getCorrelatedValuesMap().computeIfAbsent(
Pair.of(rhsRewriteCandidate.getRhsColumn(), rhsRewriteCandidate.getValueForRewrite()),
(rhsVal) -> {
@ -286,19 +325,30 @@ public class JoinFilterAnalyzer
);
}
} else {
correlationsByFilteringColumn.put(rhsRewriteCandidate.getRhsColumn(), Optional.empty());
correlationsByFilteringColumn.put(rhsRewriteCandidate.getRhsColumn(), null);
}
}
// Go through each per-column analysis list and prune duplicates
for (Map.Entry<String, Optional<List<JoinFilterColumnCorrelationAnalysis>>> correlation : correlationsByFilteringColumn.entrySet()) {
if (correlation.getValue().isPresent()) {
for (Map.Entry<String, List<JoinFilterColumnCorrelationAnalysis>> correlation : correlationsByFilteringColumn
.entrySet()) {
if (correlation.getValue() != null) {
List<JoinFilterColumnCorrelationAnalysis> dedupList = eliminateCorrelationDuplicates(
correlation.getValue().get()
correlation.getValue()
);
correlationsByFilteringColumn.put(correlation.getKey(), Optional.of(dedupList));
correlationsByFilteringColumn.put(correlation.getKey(), dedupList);
}
}
for (Map.Entry<String, List<JoinFilterColumnCorrelationAnalysis>> correlation : correlationsByDirectFilteringColumn
.entrySet()) {
if (correlation.getValue() != null) {
List<JoinFilterColumnCorrelationAnalysis> dedupList = eliminateCorrelationDuplicates(
correlation.getValue()
);
correlationsByDirectFilteringColumn.put(correlation.getKey(), dedupList);
}
}
return new JoinFilterPreAnalysis(
joinableClauses,
@ -307,13 +357,73 @@ public class JoinFilterAnalyzer
normalizedBaseTableClauses,
normalizedJoinTableClauses,
correlationsByFilteringColumn,
correlationsByDirectFilteringColumn,
enableFilterPushDown,
enableFilterRewrite
enableFilterRewrite,
equiconditions
);
}
private static Optional<RhsRewriteCandidate> determineRhsRewriteCandidatesForSingleFilter(
Filter orClause,
Map<String, Set<Expr>> equiconditions,
List<JoinableClause> joinableClauses
)
{
// Check if the filter clause is on the RHS join column. If so, we can rewrite the clause to filter on the
// LHS join column instead.
// Currently, we only support rewrites of filters that operate on a single column for simplicity.
Set<String> requiredColumns = orClause.getRequiredColumns();
if (orClause.supportsRequiredColumnRewrite() &&
doesRequiredColumnSetSupportDirectJoinFilterRewrite(requiredColumns, equiconditions)) {
String reqColumn = requiredColumns.iterator().next();
JoinableClause joinableClause = isColumnFromJoin(joinableClauses, reqColumn);
return Optional.of(
new RhsRewriteCandidate(
joinableClause,
reqColumn,
null,
true
)
);
} else if (orClause instanceof SelectorFilter) {
// this is a candidate for RHS filter rewrite, determine column correlations and correlated values
String reqColumn = ((SelectorFilter) orClause).getDimension();
String reqValue = ((SelectorFilter) orClause).getValue();
JoinableClause joinableClause = isColumnFromJoin(joinableClauses, reqColumn);
if (joinableClause != null) {
return Optional.of(
new RhsRewriteCandidate(
joinableClause,
reqColumn,
reqValue,
false
)
);
}
}
return Optional.empty();
}
private static boolean doesRequiredColumnSetSupportDirectJoinFilterRewrite(
Set<String> requiredColumns,
Map<String, Set<Expr>> equiconditions
)
{
if (requiredColumns.size() == 1) {
String reqColumn = requiredColumns.iterator().next();
if (equiconditions.containsKey(reqColumn)) {
return true;
}
}
return false;
}
/**
* @param joinFilterPreAnalysis The pre-analysis computed by {@link #computeJoinFilterPreAnalysis)}
*
* @return A JoinFilterSplit indicating what parts of the filter should be applied pre-join and post-join
*/
public static JoinFilterSplit splitFilter(
@ -324,14 +434,14 @@ public class JoinFilterAnalyzer
return new JoinFilterSplit(
null,
joinFilterPreAnalysis.getOriginalFilter(),
ImmutableList.of()
ImmutableSet.of()
);
}
// Pushdown filters, rewriting if necessary
List<Filter> leftFilters = new ArrayList<>();
List<Filter> rightFilters = new ArrayList<>();
List<VirtualColumn> pushDownVirtualColumns = new ArrayList<>();
Map<Expr, VirtualColumn> pushDownVirtualColumnsForLhsExprs = new HashMap<>();
for (Filter baseTableFilter : joinFilterPreAnalysis.getNormalizedBaseTableClauses()) {
if (!filterMatchesNull(baseTableFilter)) {
@ -344,14 +454,12 @@ public class JoinFilterAnalyzer
for (Filter orClause : joinFilterPreAnalysis.getNormalizedJoinTableClauses()) {
JoinFilterAnalysis joinFilterAnalysis = analyzeJoinFilterClause(
orClause,
joinFilterPreAnalysis
joinFilterPreAnalysis,
pushDownVirtualColumnsForLhsExprs
);
if (joinFilterAnalysis.isCanPushDown()) {
//noinspection OptionalGetWithoutIsPresent isCanPushDown checks isPresent
leftFilters.add(joinFilterAnalysis.getPushDownFilter().get());
if (!joinFilterAnalysis.getPushDownVirtualColumns().isEmpty()) {
pushDownVirtualColumns.addAll(joinFilterAnalysis.getPushDownVirtualColumns());
}
}
if (joinFilterAnalysis.isRetainAfterJoin()) {
rightFilters.add(joinFilterAnalysis.getOriginalFilter());
@ -361,7 +469,7 @@ public class JoinFilterAnalyzer
return new JoinFilterSplit(
Filters.and(leftFilters),
Filters.and(rightFilters),
pushDownVirtualColumns
new HashSet<>(pushDownVirtualColumnsForLhsExprs.values())
);
}
@ -370,14 +478,23 @@ public class JoinFilterAnalyzer
* Analyze a filter clause from a filter that is in conjunctive normal form (AND of ORs).
* The clause is expected to be an OR filter or a leaf filter.
*
* @param filterClause Individual filter clause (an OR filter or a leaf filter) from a filter that is in CNF
* @param joinFilterPreAnalysis The pre-analysis computed by {@link #computeJoinFilterPreAnalysis)}
* @param filterClause Individual filter clause (an OR filter or a leaf filter) from a filter that is in CNF
* @param joinFilterPreAnalysis The pre-analysis computed by {@link #computeJoinFilterPreAnalysis)}
* @param pushDownVirtualColumnsForLhsExprs Used when there are LHS expressions in the join equiconditions.
* If we rewrite an RHS filter such that it applies to the LHS expression instead,
* because the expression existed only in the equicondition, we must create a virtual column
* on the LHS with the same expression in order to apply the filter.
* The specific rewriting methods such as {@link #rewriteSelectorFilter} will use this
* as a cache for virtual columns that they need to created, keyed by the expression, so that
* they can avoid creating redundant virtual columns.
*
*
* @return a JoinFilterAnalysis that contains a possible filter rewrite and information on how to handle the filter.
*/
private static JoinFilterAnalysis analyzeJoinFilterClause(
Filter filterClause,
JoinFilterPreAnalysis joinFilterPreAnalysis
JoinFilterPreAnalysis joinFilterPreAnalysis,
Map<Expr, VirtualColumn> pushDownVirtualColumnsForLhsExprs
)
{
// NULL matching conditions are not currently pushed down.
@ -387,87 +504,178 @@ public class JoinFilterAnalyzer
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(filterClause);
}
if (filterClause instanceof OrFilter) {
return rewriteOrFilter(
(OrFilter) filterClause,
joinFilterPreAnalysis,
pushDownVirtualColumnsForLhsExprs
);
}
if (filterClause.supportsRequiredColumnRewrite() && doesRequiredColumnSetSupportDirectJoinFilterRewrite(
filterClause.getRequiredColumns(),
joinFilterPreAnalysis.getEquiconditions()
)) {
return rewriteFilterDirect(
filterClause,
joinFilterPreAnalysis,
pushDownVirtualColumnsForLhsExprs
);
}
// Currently we only support rewrites of selector filters and selector filters within OR filters.
if (filterClause instanceof SelectorFilter) {
return rewriteSelectorFilter(
(SelectorFilter) filterClause,
joinFilterPreAnalysis
);
}
if (filterClause instanceof OrFilter) {
return rewriteOrFilter(
(OrFilter) filterClause,
joinFilterPreAnalysis
joinFilterPreAnalysis,
pushDownVirtualColumnsForLhsExprs
);
}
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(filterClause);
}
private static JoinFilterAnalysis rewriteFilterDirect(
Filter filterClause,
JoinFilterPreAnalysis joinFilterPreAnalysis,
Map<Expr, VirtualColumn> pushDownVirtualColumnsForLhsExprs
)
{
if (!filterClause.supportsRequiredColumnRewrite()) {
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(filterClause);
}
List<Filter> newFilters = new ArrayList<>();
// we only support direct rewrites of filters that reference a single column
String reqColumn = filterClause.getRequiredColumns().iterator().next();
List<JoinFilterColumnCorrelationAnalysis> correlationAnalyses = joinFilterPreAnalysis.getCorrelationsByDirectFilteringColumn()
.get(reqColumn);
if (correlationAnalyses == null) {
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(filterClause);
}
for (JoinFilterColumnCorrelationAnalysis correlationAnalysis : correlationAnalyses) {
if (correlationAnalysis.supportsPushDown()) {
for (String correlatedBaseColumn : correlationAnalysis.getBaseColumns()) {
Filter rewrittenFilter = filterClause.rewriteRequiredColumns(ImmutableMap.of(
reqColumn,
correlatedBaseColumn
));
newFilters.add(rewrittenFilter);
}
for (Expr correlatedBaseExpr : correlationAnalysis.getBaseExpressions()) {
// We need to create a virtual column for the expressions when pushing down
VirtualColumn pushDownVirtualColumn = pushDownVirtualColumnsForLhsExprs.computeIfAbsent(
correlatedBaseExpr,
(expr) -> {
String vcName = getCorrelatedBaseExprVirtualColumnName(pushDownVirtualColumnsForLhsExprs.size());
return new ExpressionVirtualColumn(
vcName,
correlatedBaseExpr,
ValueType.STRING
);
}
);
Filter rewrittenFilter = filterClause.rewriteRequiredColumns(ImmutableMap.of(
reqColumn,
pushDownVirtualColumn.getOutputName()
));
newFilters.add(rewrittenFilter);
}
}
}
if (newFilters.isEmpty()) {
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(filterClause);
}
return new JoinFilterAnalysis(
false,
filterClause,
Filters.and(newFilters)
);
}
/**
* Potentially rewrite the subfilters of an OR filter so that the whole OR filter can be pushed down to
* the base table.
*
* @param orFilter OrFilter to be rewritten
* @param orFilter OrFilter to be rewritten
* @param joinFilterPreAnalysis The pre-analysis computed by {@link #computeJoinFilterPreAnalysis)}
*
* @param pushDownVirtualColumnsForLhsExprs See comments on {@link #analyzeJoinFilterClause}
* @return A JoinFilterAnalysis indicating how to handle the potentially rewritten filter
*/
private static JoinFilterAnalysis rewriteOrFilter(
OrFilter orFilter,
JoinFilterPreAnalysis joinFilterPreAnalysis
JoinFilterPreAnalysis joinFilterPreAnalysis,
Map<Expr, VirtualColumn> pushDownVirtualColumnsForLhsExprs
)
{
boolean retainRhs = false;
Set<Filter> newFilters = new HashSet<>();
boolean retainRhs = false;
for (Filter filter : orFilter.getFilters()) {
if (!areSomeColumnsFromJoin(joinFilterPreAnalysis.getJoinableClauses(), filter.getRequiredColumns())) {
newFilters.add(filter);
continue;
}
retainRhs = true;
if (filter instanceof SelectorFilter) {
JoinFilterAnalysis rewritten = rewriteSelectorFilter(
(SelectorFilter) filter,
joinFilterPreAnalysis
JoinFilterAnalysis rewritten = null;
if (doesRequiredColumnSetSupportDirectJoinFilterRewrite(
filter.getRequiredColumns(),
joinFilterPreAnalysis.getEquiconditions()
)) {
rewritten = rewriteFilterDirect(
filter,
joinFilterPreAnalysis,
pushDownVirtualColumnsForLhsExprs
);
if (!rewritten.isCanPushDown()) {
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(orFilter);
} else {
//noinspection OptionalGetWithoutIsPresent isCanPushDown checks isPresent
newFilters.add(rewritten.getPushDownFilter().get());
}
} else {
} else if (filter instanceof SelectorFilter) {
retainRhs = true;
// We could optimize retainRhs handling further by introducing a "filter to retain" property to the
// analysis, and only keeping the subfilters that need to be retained
rewritten = rewriteSelectorFilter(
(SelectorFilter) filter,
joinFilterPreAnalysis,
pushDownVirtualColumnsForLhsExprs
);
}
if (rewritten == null || !rewritten.isCanPushDown()) {
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(orFilter);
} else {
//noinspection OptionalGetWithoutIsPresent isCanPushDown checks isPresent
newFilters.add(rewritten.getPushDownFilter().get());
}
}
return new JoinFilterAnalysis(
retainRhs,
orFilter,
new OrFilter(newFilters),
ImmutableList.of()
Filters.or(newFilters)
);
}
/**
* Rewrites a selector filter on a join table into an IN filter on the base table.
*
* @param selectorFilter SelectorFilter to be rewritten
* @param selectorFilter SelectorFilter to be rewritten
* @param joinFilterPreAnalysis The pre-analysis computed by {@link #computeJoinFilterPreAnalysis)}
*
* @param pushDownVirtualColumnsForLhsExprs See comments on {@link #analyzeJoinFilterClause}
* @return A JoinFilterAnalysis that indicates how to handle the potentially rewritten filter
*/
private static JoinFilterAnalysis rewriteSelectorFilter(
SelectorFilter selectorFilter,
JoinFilterPreAnalysis joinFilterPreAnalysis
JoinFilterPreAnalysis joinFilterPreAnalysis,
Map<Expr, VirtualColumn> pushDownVirtualColumnsForLhsExprs
)
{
List<Filter> newFilters = new ArrayList<>();
List<VirtualColumn> pushdownVirtualColumns = new ArrayList<>();
String filteringColumn = selectorFilter.getDimension();
String filteringValue = selectorFilter.getValue();
@ -481,22 +689,20 @@ public class JoinFilterAnalyzer
if (!areSomeColumnsFromJoin(joinFilterPreAnalysis.getJoinableClauses(), selectorFilter.getRequiredColumns())) {
return new JoinFilterAnalysis(
true,
false,
selectorFilter,
selectorFilter,
pushdownVirtualColumns
selectorFilter
);
}
Optional<List<JoinFilterColumnCorrelationAnalysis>> correlationAnalyses = joinFilterPreAnalysis.getCorrelationsByFilteringColumn()
.get(filteringColumn);
List<JoinFilterColumnCorrelationAnalysis> correlationAnalyses = joinFilterPreAnalysis.getCorrelationsByFilteringColumn()
.get(filteringColumn);
if (!correlationAnalyses.isPresent()) {
if (correlationAnalyses == null) {
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(selectorFilter);
}
for (JoinFilterColumnCorrelationAnalysis correlationAnalysis : correlationAnalyses.get()) {
for (JoinFilterColumnCorrelationAnalysis correlationAnalysis : correlationAnalyses) {
if (correlationAnalysis.supportsPushDown()) {
Optional<Set<String>> correlatedValues = correlationAnalysis.getCorrelatedValuesMap().get(
Pair.of(filteringColumn, filteringValue)
@ -518,17 +724,20 @@ public class JoinFilterAnalyzer
for (Expr correlatedBaseExpr : correlationAnalysis.getBaseExpressions()) {
// We need to create a virtual column for the expressions when pushing down
String vcName = getCorrelatedBaseExprVirtualColumnName(pushdownVirtualColumns.size());
VirtualColumn correlatedBaseExprVirtualColumn = new ExpressionVirtualColumn(
vcName,
VirtualColumn pushDownVirtualColumn = pushDownVirtualColumnsForLhsExprs.computeIfAbsent(
correlatedBaseExpr,
ValueType.STRING
(expr) -> {
String vcName = getCorrelatedBaseExprVirtualColumnName(pushDownVirtualColumnsForLhsExprs.size());
return new ExpressionVirtualColumn(
vcName,
correlatedBaseExpr,
ValueType.STRING
);
}
);
pushdownVirtualColumns.add(correlatedBaseExprVirtualColumn);
Filter rewrittenFilter = new InDimFilter(
vcName,
pushDownVirtualColumn.getOutputName(),
correlatedValues.get(),
null,
null
@ -545,8 +754,7 @@ public class JoinFilterAnalyzer
return new JoinFilterAnalysis(
true,
selectorFilter,
Filters.and(newFilters),
pushdownVirtualColumns
Filters.and(newFilters)
);
}
@ -593,29 +801,30 @@ public class JoinFilterAnalyzer
* For each rhs column that appears in the equiconditions for a table's JoinableClause,
* we try to determine what base table columns are related to the rhs column through the total set of equiconditions.
* We do this by searching backwards through the chain of join equiconditions using the provided equicondition map.
*
*
* For example, suppose we have 3 tables, A,B,C, joined with the following conditions, where A is the base table:
* A.joinColumn == B.joinColumn
* B.joinColum == C.joinColumn
*
* A.joinColumn == B.joinColumn
* B.joinColum == C.joinColumn
*
* We would determine that C.joinColumn is correlated with A.joinColumn: we first see that
* C.joinColumn is linked to B.joinColumn which in turn is linked to A.joinColumn
*
*
* Suppose we had the following join conditions instead:
* f(A.joinColumn) == B.joinColumn
* B.joinColum == C.joinColumn
* f(A.joinColumn) == B.joinColumn
* B.joinColum == C.joinColumn
* In this case, the JoinFilterColumnCorrelationAnalysis for C.joinColumn would be linked to f(A.joinColumn).
*
*
* Suppose we had the following join conditions instead:
* A.joinColumn == B.joinColumn
* f(B.joinColum) == C.joinColumn
*
* A.joinColumn == B.joinColumn
* f(B.joinColum) == C.joinColumn
*
* Because we cannot reverse the function f() applied to the second table B in all cases,
* we cannot relate C.joinColumn to A.joinColumn, and we would not generate a correlation for C.joinColumn
*
* @param tablePrefix Prefix for a join table
* @param clauseForTablePrefix Joinable clause for the prefix
* @param equiConditions Map of equiconditions, keyed by the right hand columns
* @param joinableClauses List of joinable clauses for the query
* @param tablePrefix Prefix for a join table
* @param rhsRewriteCandidate RHS rewrite candidate that we find correlated base table columns for
* @param equiConditions Map of equiconditions, keyed by the right hand columns
*
* @return A list of correlatation analyses for the equicondition RHS columns that reside in the table associated with
* the tablePrefix
@ -623,18 +832,24 @@ public class JoinFilterAnalyzer
private static Optional<Map<String, JoinFilterColumnCorrelationAnalysis>> findCorrelatedBaseTableColumns(
List<JoinableClause> joinableClauses,
String tablePrefix,
JoinableClause clauseForTablePrefix,
RhsRewriteCandidate rhsRewriteCandidate,
Map<String, Set<Expr>> equiConditions
)
{
JoinableClause clauseForTablePrefix = rhsRewriteCandidate.getJoinableClause();
JoinConditionAnalysis jca = clauseForTablePrefix.getCondition();
Set<String> rhsColumns = new HashSet<>();
for (Equality eq : jca.getEquiConditions()) {
rhsColumns.add(tablePrefix + eq.getRightColumn());
if (rhsRewriteCandidate.isDirectRewrite()) {
// If we filter on a RHS join column, we only need to consider that column from the RHS side
rhsColumns.add(rhsRewriteCandidate.getRhsColumn());
} else {
for (Equality eq : jca.getEquiConditions()) {
rhsColumns.add(tablePrefix + eq.getRightColumn());
}
}
Map<String, JoinFilterColumnCorrelationAnalysis> correlations = new HashMap<>();
Map<String, JoinFilterColumnCorrelationAnalysis> correlations = new LinkedHashMap<>();
for (String rhsColumn : rhsColumns) {
Set<String> correlatedBaseColumns = new HashSet<>();
@ -674,9 +889,9 @@ public class JoinFilterAnalyzer
* and/or expressions for a single RHS column and adds them to the provided sets as it traverses the
* equicondition column relationships.
*
* @param equiConditions Map of equiconditions, keyed by the right hand columns
* @param rhsColumn RHS column to find base table correlations for
* @param correlatedBaseColumns Set of correlated base column names for the provided RHS column. Will be modified.
* @param equiConditions Map of equiconditions, keyed by the right hand columns
* @param rhsColumn RHS column to find base table correlations for
* @param correlatedBaseColumns Set of correlated base column names for the provided RHS column. Will be modified.
* @param correlatedBaseExpressions Set of correlated base column expressions for the provided RHS column. Will be
* modified.
*/
@ -747,9 +962,15 @@ public class JoinFilterAnalyzer
List<JoinFilterColumnCorrelationAnalysis> originalList
)
{
Map<List<String>, JoinFilterColumnCorrelationAnalysis> uniquesMap = new HashMap<>();
Map<Set<String>, JoinFilterColumnCorrelationAnalysis> uniquesMap = new HashMap<>();
for (JoinFilterColumnCorrelationAnalysis jca : originalList) {
uniquesMap.put(jca.getBaseColumns(), jca);
Set<String> mapKey = new HashSet<>(jca.getBaseColumns());
for (Expr expr : jca.getBaseExpressions()) {
mapKey.add(expr.stringify());
}
uniquesMap.put(mapKey, jca);
}
return new ArrayList<>(uniquesMap.values());
@ -833,6 +1054,7 @@ public class JoinFilterAnalyzer
private static class RhsRewriteCandidate
{
private final boolean isDirectRewrite;
private final JoinableClause joinableClause;
private final String rhsColumn;
private final String valueForRewrite;
@ -840,12 +1062,14 @@ public class JoinFilterAnalyzer
public RhsRewriteCandidate(
JoinableClause joinableClause,
String rhsColumn,
String valueForRewrite
String valueForRewrite,
boolean isDirectRewrite
)
{
this.joinableClause = joinableClause;
this.rhsColumn = rhsColumn;
this.valueForRewrite = valueForRewrite;
this.isDirectRewrite = isDirectRewrite;
}
public JoinableClause getJoinableClause()
@ -862,5 +1086,16 @@ public class JoinFilterAnalyzer
{
return valueForRewrite;
}
/**
* A direct rewrite occurs when we filter on an RHS column that is also part of a join equicondition.
*
* For example, if we have the filter (j.x = 'hello') and the join condition is (y = j.x), we can directly
* rewrite the j.x filter to (y = 'hello').
*/
public boolean isDirectRewrite()
{
return isDirectRewrite;
}
}
}

View File

@ -19,13 +19,14 @@
package org.apache.druid.segment.join.filter;
import org.apache.druid.math.expr.Expr;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.join.JoinableClause;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
* A JoinFilterPreAnalysis contains filter push down/rewrite information that does not have per-segment dependencies.
@ -35,6 +36,7 @@ import java.util.Optional;
* - A list of filter clauses from the original filter's CNF representation that only reference the base table
* - A list of filter clauses from the original filter's CNF representation that reference RHS join tables
* - A mapping of RHS filtering columns -> List<JoinFilterColumnCorrelationAnalysis>, used for filter rewrites
* - A second mapping of RHS filtering columns -> List<JoinFilterColumnCorrelationAnalysis>, used for direct filter rewrites
* - A list of virtual columns that can only be computed post-join
* - Control flag booleans for whether filter push down and RHS rewrites are enabled.
*/
@ -44,10 +46,12 @@ public class JoinFilterPreAnalysis
private final Filter originalFilter;
private final List<Filter> normalizedBaseTableClauses;
private final List<Filter> normalizedJoinTableClauses;
private final Map<String, Optional<List<JoinFilterColumnCorrelationAnalysis>>> correlationsByFilteringColumn;
private final Map<String, List<JoinFilterColumnCorrelationAnalysis>> correlationsByFilteringColumn;
private final Map<String, List<JoinFilterColumnCorrelationAnalysis>> correlationsByDirectFilteringColumn;
private final boolean enableFilterPushDown;
private final boolean enableFilterRewrite;
private final List<VirtualColumn> postJoinVirtualColumns;
private final Map<String, Set<Expr>> equiconditions;
public JoinFilterPreAnalysis(
final List<JoinableClause> joinableClauses,
@ -55,9 +59,11 @@ public class JoinFilterPreAnalysis
final List<VirtualColumn> postJoinVirtualColumns,
final List<Filter> normalizedBaseTableClauses,
final List<Filter> normalizedJoinTableClauses,
final Map<String, Optional<List<JoinFilterColumnCorrelationAnalysis>>> correlationsByFilteringColumn,
final Map<String, List<JoinFilterColumnCorrelationAnalysis>> correlationsByFilteringColumn,
final Map<String, List<JoinFilterColumnCorrelationAnalysis>> correlationsByDirectFilteringColumn,
final boolean enableFilterPushDown,
final boolean enableFilterRewrite
final boolean enableFilterRewrite,
final Map<String, Set<Expr>> equiconditions
)
{
this.joinableClauses = joinableClauses;
@ -66,8 +72,10 @@ public class JoinFilterPreAnalysis
this.normalizedBaseTableClauses = normalizedBaseTableClauses;
this.normalizedJoinTableClauses = normalizedJoinTableClauses;
this.correlationsByFilteringColumn = correlationsByFilteringColumn;
this.correlationsByDirectFilteringColumn = correlationsByDirectFilteringColumn;
this.enableFilterPushDown = enableFilterPushDown;
this.enableFilterRewrite = enableFilterRewrite;
this.equiconditions = equiconditions;
}
public List<JoinableClause> getJoinableClauses()
@ -95,11 +103,16 @@ public class JoinFilterPreAnalysis
return normalizedJoinTableClauses;
}
public Map<String, Optional<List<JoinFilterColumnCorrelationAnalysis>>> getCorrelationsByFilteringColumn()
public Map<String, List<JoinFilterColumnCorrelationAnalysis>> getCorrelationsByFilteringColumn()
{
return correlationsByFilteringColumn;
}
public Map<String, List<JoinFilterColumnCorrelationAnalysis>> getCorrelationsByDirectFilteringColumn()
{
return correlationsByDirectFilteringColumn;
}
public boolean isEnableFilterPushDown()
{
return enableFilterPushDown;
@ -109,5 +122,10 @@ public class JoinFilterPreAnalysis
{
return enableFilterRewrite;
}
public Map<String, Set<Expr>> getEquiconditions()
{
return equiconditions;
}
}

View File

@ -23,9 +23,9 @@ import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.VirtualColumn;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
/**
* Holds the result of splitting a filter into:
@ -37,12 +37,12 @@ public class JoinFilterSplit
{
final Optional<Filter> baseTableFilter;
final Optional<Filter> joinTableFilter;
final List<VirtualColumn> pushDownVirtualColumns;
final Set<VirtualColumn> pushDownVirtualColumns;
public JoinFilterSplit(
@Nullable Filter baseTableFilter,
@Nullable Filter joinTableFilter,
List<VirtualColumn> pushDownVirtualColumns
Set<VirtualColumn> pushDownVirtualColumns
)
{
this.baseTableFilter = baseTableFilter == null ? Optional.empty() : Optional.of(baseTableFilter);
@ -60,7 +60,7 @@ public class JoinFilterSplit
return joinTableFilter;
}
public List<VirtualColumn> getPushDownVirtualColumns()
public Set<VirtualColumn> getPushDownVirtualColumns()
{
return pushDownVirtualColumns;
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.query.filter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.query.extraction.SubstringDimExtractionFn;
import org.junit.Assert;
@ -68,4 +69,13 @@ public class LikeDimFilterTest
final DimFilter filter = new LikeDimFilter("foo", "bar%", "@", new SubstringDimExtractionFn(1, 2));
Assert.assertEquals(filter.getRequiredColumns(), Sets.newHashSet("foo"));
}
@Test
public void test_LikeMatcher_equals()
{
EqualsVerifier.forClass(LikeDimFilter.LikeMatcher.class)
.usingGetClass()
.withNonnullFields("suffixMatch", "prefix", "pattern")
.verify();
}
}

View File

@ -21,19 +21,25 @@ package org.apache.druid.segment.filter;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.js.JavaScriptConfig;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.extraction.JavaScriptExtractionFn;
import org.apache.druid.query.filter.BoundDimFilter;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.StorageAdapter;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -60,6 +66,9 @@ public class BoundFilterTest extends BaseFilterTest
super(testName, ROWS, indexBuilder, finisher, cnf, optimize);
}
@Rule
public ExpectedException expectedException = ExpectedException.none();
@AfterClass
public static void tearDown() throws Exception
{
@ -723,6 +732,26 @@ public class BoundFilterTest extends BaseFilterTest
);
}
@Test
public void testRequiredColumnRewrite()
{
BoundFilter filter = new BoundFilter(
new BoundDimFilter("dim0", "", "", false, false, true, null, StringComparators.ALPHANUMERIC)
);
BoundFilter filter2 = new BoundFilter(
new BoundDimFilter("dim1", "", "", false, false, true, null, StringComparators.ALPHANUMERIC)
);
Assert.assertTrue(filter.supportsRequiredColumnRewrite());
Assert.assertTrue(filter2.supportsRequiredColumnRewrite());
Filter rewrittenFilter = filter.rewriteRequiredColumns(ImmutableMap.of("dim0", "dim1"));
Assert.assertEquals(filter2, rewrittenFilter);
expectedException.expect(IAE.class);
expectedException.expectMessage("Received a non-applicable rewrite: {invalidName=dim1}, filter's dimension: dim0");
filter.rewriteRequiredColumns(ImmutableMap.of("invalidName", "dim1"));
}
@Test
public void test_equals()
{

View File

@ -37,12 +37,15 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.filter.ExpressionDimFilter;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -111,6 +114,9 @@ public class ExpressionFilterTest extends BaseFilterTest
);
}
@Rule
public ExpectedException expectedException = ExpectedException.none();
@AfterClass
public static void tearDown() throws Exception
{
@ -273,6 +279,17 @@ public class ExpressionFilterTest extends BaseFilterTest
Assert.assertEquals(edf("missing == ''").getRequiredColumns(), Sets.newHashSet("missing"));
}
@Test
public void testRequiredColumnRewrite()
{
Filter filter = edf("dim1 == '1'").toFilter();
Assert.assertFalse(filter.supportsRequiredColumnRewrite());
expectedException.expect(UnsupportedOperationException.class);
expectedException.expectMessage("Required column rewrite is not supported by this filter.");
filter.rewriteRequiredColumns(ImmutableMap.of("invalidName", "dim1"));
}
private static ExpressionDimFilter edf(final String expression)
{
return new ExpressionDimFilter(expression, null, TestExprMacroTable.INSTANCE);

View File

@ -19,6 +19,7 @@
package org.apache.druid.segment.filter;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.dimension.DimensionSpec;
@ -354,6 +355,16 @@ public class FilterCnfConversionTest
assertFilter(filter, expectedCnf, Filters.toCnf(filter));
}
@Test
public void testTrueFalseFilterRequiredColumnRewrite()
{
Assert.assertTrue(TrueFilter.instance().supportsRequiredColumnRewrite());
Assert.assertTrue(FalseFilter.instance().supportsRequiredColumnRewrite());
Assert.assertEquals(TrueFilter.instance(), TrueFilter.instance().rewriteRequiredColumns(ImmutableMap.of()));
Assert.assertEquals(FalseFilter.instance(), FalseFilter.instance().rewriteRequiredColumns(ImmutableMap.of()));
}
private void assertFilter(Filter original, Filter expectedConverted, Filter actualConverted)
{
assertEquivalent(original, expectedConverted);

View File

@ -32,19 +32,24 @@ import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.js.JavaScriptConfig;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.extraction.JavaScriptExtractionFn;
import org.apache.druid.query.extraction.MapLookupExtractor;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.filter.InDimFilter;
import org.apache.druid.query.lookup.LookupExtractionFn;
import org.apache.druid.query.lookup.LookupExtractor;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.StorageAdapter;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -85,6 +90,9 @@ public class InFilterTest extends BaseFilterTest
super(testName, ROWS, indexBuilder, finisher, cnf, optimize);
}
@Rule
public ExpectedException expectedException = ExpectedException.none();
@AfterClass
public static void tearDown() throws Exception
{
@ -353,6 +361,23 @@ public class InFilterTest extends BaseFilterTest
}
@Test
public void testRequiredColumnRewrite()
{
InFilter filter = (InFilter) toInFilter("dim0", "a", "c").toFilter();
InFilter filter2 = (InFilter) toInFilter("dim1", "a", "c").toFilter();
Assert.assertTrue(filter.supportsRequiredColumnRewrite());
Assert.assertTrue(filter2.supportsRequiredColumnRewrite());
Filter rewrittenFilter = filter.rewriteRequiredColumns(ImmutableMap.of("dim0", "dim1"));
Assert.assertEquals(filter2, rewrittenFilter);
expectedException.expect(IAE.class);
expectedException.expectMessage("Received a non-applicable rewrite: {invalidName=dim1}, filter's dimension: dim0");
filter.rewriteRequiredColumns(ImmutableMap.of("invalidName", "dim1"));
}
@Test
public void test_equals()
{

View File

@ -27,13 +27,17 @@ import org.apache.druid.java.util.common.Pair;
import org.apache.druid.js.JavaScriptConfig;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.extraction.MapLookupExtractor;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.filter.JavaScriptDimFilter;
import org.apache.druid.query.lookup.LookupExtractionFn;
import org.apache.druid.query.lookup.LookupExtractor;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.StorageAdapter;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -54,6 +58,9 @@ public class JavaScriptFilterTest extends BaseFilterTest
super(testName, DEFAULT_ROWS, indexBuilder, finisher, cnf, optimize);
}
@Rule
public ExpectedException expectedException = ExpectedException.none();
@AfterClass
public static void tearDown() throws Exception
{
@ -221,6 +228,17 @@ public class JavaScriptFilterTest extends BaseFilterTest
assertFilterMatchesSkipVectorize(newJavaScriptDimFilter("l0", jsNumericValueFilter("9001"), null), ImmutableList.of("4"));
}
@Test
public void testRequiredColumnRewrite()
{
Filter filter = newJavaScriptDimFilter("dim3", jsValueFilter("a"), null).toFilter();
Assert.assertFalse(filter.supportsRequiredColumnRewrite());
expectedException.expect(UnsupportedOperationException.class);
expectedException.expectMessage("Required column rewrite is not supported by this filter.");
filter.rewriteRequiredColumns(ImmutableMap.of("invalidName", "dim1"));
}
private JavaScriptDimFilter newJavaScriptDimFilter(
final String dimension,
final String function,

View File

@ -22,6 +22,7 @@ package org.apache.druid.segment.filter;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
@ -30,13 +31,18 @@ import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.extraction.SubstringDimExtractionFn;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.filter.LikeDimFilter;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.StorageAdapter;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -77,6 +83,9 @@ public class LikeFilterTest extends BaseFilterTest
super(testName, ROWS, indexBuilder, finisher, cnf, optimize);
}
@Rule
public ExpectedException expectedException = ExpectedException.none();
@AfterClass
public static void tearDown() throws Exception
{
@ -262,4 +271,30 @@ public class LikeFilterTest extends BaseFilterTest
ImmutableList.of("6")
);
}
@Test
public void testRequiredColumnRewrite()
{
Filter filter = new LikeDimFilter("dim0", "e%", null, new SubstringDimExtractionFn(1, 100)).toFilter();
Filter filter2 = new LikeDimFilter("dim1", "e%", null, new SubstringDimExtractionFn(1, 100)).toFilter();
Assert.assertTrue(filter.supportsRequiredColumnRewrite());
Assert.assertTrue(filter2.supportsRequiredColumnRewrite());
Filter rewrittenFilter = filter.rewriteRequiredColumns(ImmutableMap.of("dim0", "dim1"));
Assert.assertEquals(filter2, rewrittenFilter);
expectedException.expect(IAE.class);
expectedException.expectMessage("Received a non-applicable rewrite: {invalidName=dim1}, filter's dimension: dim0");
filter.rewriteRequiredColumns(ImmutableMap.of("invalidName", "dim1"));
}
@Test
public void test_equals()
{
EqualsVerifier.forClass(LikeFilter.class)
.usingGetClass()
.withNonnullFields("dimension", "likeMatcher")
.verify();
}
}

View File

@ -19,7 +19,10 @@
package org.apache.druid.segment.filter;
import com.google.common.collect.ImmutableMap;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.filter.ExpressionDimFilter;
import org.apache.druid.query.filter.Filter;
import org.junit.Assert;
import org.junit.Test;
@ -39,4 +42,21 @@ public class NotFilterTest
final Filter notFilter = FilterTestUtils.not(baseFilter);
Assert.assertNotEquals(notFilter.hashCode(), baseFilter.hashCode());
}
@Test
public void testRequiredColumnRewrite()
{
Filter filter = new NotFilter(new SelectorFilter("dim0", "B"));
Filter filter2 = new NotFilter(new SelectorFilter("dim1", "B"));
Assert.assertTrue(filter.supportsRequiredColumnRewrite());
Assert.assertTrue(filter2.supportsRequiredColumnRewrite());
Filter rewrittenFilter = filter.rewriteRequiredColumns(ImmutableMap.of("dim0", "dim1"));
Assert.assertEquals(filter2, rewrittenFilter);
Filter filter3 = new NotFilter(new ExpressionDimFilter("dim0 == 'B'", ExprMacroTable.nil()).toFilter());
Assert.assertFalse(filter3.supportsRequiredColumnRewrite());
}
}

View File

@ -21,16 +21,23 @@ package org.apache.druid.segment.filter;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.js.JavaScriptConfig;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.extraction.JavaScriptExtractionFn;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.filter.RegexDimFilter;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.StorageAdapter;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -50,6 +57,9 @@ public class RegexFilterTest extends BaseFilterTest
super(testName, DEFAULT_ROWS, indexBuilder, finisher, cnf, optimize);
}
@Rule
public ExpectedException expectedException = ExpectedException.none();
@AfterClass
public static void tearDown() throws Exception
{
@ -135,4 +145,31 @@ public class RegexFilterTest extends BaseFilterTest
assertFilterMatches(new RegexDimFilter("dim4", ".*ANYMORE", changeNullFn), ImmutableList.of("0", "1", "2", "3", "4", "5"));
assertFilterMatches(new RegexDimFilter("dim4", "a.*", changeNullFn), ImmutableList.of());
}
@Test
public void testRequiredColumnRewrite()
{
Filter filter = new RegexDimFilter("dim0", ".*", null).toFilter();
Filter filter2 = new RegexDimFilter("dim1", ".*", null).toFilter();
Assert.assertTrue(filter.supportsRequiredColumnRewrite());
Assert.assertTrue(filter2.supportsRequiredColumnRewrite());
Filter rewrittenFilter = filter.rewriteRequiredColumns(ImmutableMap.of("dim0", "dim1"));
Assert.assertEquals(filter2, rewrittenFilter);
expectedException.expect(IAE.class);
expectedException.expectMessage("Received a non-applicable rewrite: {invalidName=dim1}, filter's dimension: dim0");
filter.rewriteRequiredColumns(ImmutableMap.of("invalidName", "dim1"));
}
@Test
public void test_equals()
{
EqualsVerifier.forClass(RegexFilter.class)
.usingGetClass()
.withNonnullFields("dimension", "pattern")
.withIgnoredFields("predicateFactory")
.verify();
}
}

View File

@ -21,18 +21,25 @@ package org.apache.druid.segment.filter;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.js.JavaScriptConfig;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.extraction.JavaScriptExtractionFn;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.filter.SearchQueryDimFilter;
import org.apache.druid.query.search.ContainsSearchQuerySpec;
import org.apache.druid.query.search.SearchQuerySpec;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.StorageAdapter;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -52,6 +59,9 @@ public class SearchQueryFilterTest extends BaseFilterTest
super(testName, DEFAULT_ROWS, indexBuilder, finisher, cnf, optimize);
}
@Rule
public ExpectedException expectedException = ExpectedException.none();
@AfterClass
public static void tearDown() throws Exception
{
@ -172,4 +182,31 @@ public class SearchQueryFilterTest extends BaseFilterTest
assertFilterMatches(new SearchQueryDimFilter("dim4", specForValue("ANYMORE"), changeNullFn), ImmutableList.of("0", "1", "2", "3", "4", "5"));
assertFilterMatches(new SearchQueryDimFilter("dim4", specForValue("a"), changeNullFn), ImmutableList.of());
}
@Test
public void testRequiredColumnRewrite()
{
Filter filter = new SearchQueryDimFilter("dim0", specForValue("a"), null).toFilter();
Filter filter2 = new SearchQueryDimFilter("dim1", specForValue("a"), null).toFilter();
Assert.assertTrue(filter.supportsRequiredColumnRewrite());
Assert.assertTrue(filter2.supportsRequiredColumnRewrite());
Filter rewrittenFilter = filter.rewriteRequiredColumns(ImmutableMap.of("dim0", "dim1"));
Assert.assertEquals(filter2, rewrittenFilter);
expectedException.expect(IAE.class);
expectedException.expectMessage("Received a non-applicable rewrite: {invalidName=dim1}, filter's dimension: dim0");
filter.rewriteRequiredColumns(ImmutableMap.of("invalidName", "dim1"));
}
@Test
public void test_equals()
{
EqualsVerifier.forClass(SearchQueryFilter.class)
.usingGetClass()
.withNonnullFields("dimension", "query")
.withIgnoredFields("predicateFactory")
.verify();
}
}

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
@ -30,8 +31,10 @@ import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.filter.BoundDimFilter;
import org.apache.druid.query.filter.ExpressionDimFilter;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.filter.InDimFilter;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.filter.AndFilter;
@ -48,6 +51,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.Set;
public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTest
{
@ -74,7 +78,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
JoinFilterSplit expectedFilterSplit = new JoinFilterSplit(
new SelectorFilter("channel", "#en.wikipedia"),
null,
ImmutableList.of()
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -112,7 +116,6 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
);
}
@Test
public void test_filterPushDown_factToRegionExprToCountryLeftFilterOnCountryName()
{
@ -151,7 +154,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
JoinFilterSplit expectedFilterSplit = new JoinFilterSplit(
null,
new SelectorFilter("rtc.countryName", "United States"),
ImmutableList.of()
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -210,7 +213,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
)
),
new SelectorFilter("rtc.countryName", "United States"),
ImmutableList.of()
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -277,7 +280,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
new SelectorFilter("r1.regionName", null)
)
),
ImmutableList.of()
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
@ -320,6 +323,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
Filter originalFilter = new AndFilter(
ImmutableList.of(
new SelectorFilter("baseTableInvalidColumn", "abcd"),
new SelectorFilter("baseTableInvalidColumn2", null),
new SelectorFilter("rtc.invalidColumn", "abcd"),
new SelectorFilter("r1.invalidColumn", "abcd")
)
@ -340,11 +344,12 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
new SelectorFilter("baseTableInvalidColumn", "abcd"),
new AndFilter(
ImmutableList.of(
new SelectorFilter("baseTableInvalidColumn2", null),
new SelectorFilter("rtc.invalidColumn", "abcd"),
new SelectorFilter("r1.invalidColumn", "abcd")
)
),
ImmutableList.of()
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -406,7 +411,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
JoinFilterSplit expectedFilterSplit = new JoinFilterSplit(
new SelectorFilter("v1", "virtual-column-#en.wikipedia"),
null,
ImmutableList.of()
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -484,7 +489,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
JoinFilterSplit expectedFilterSplit = new JoinFilterSplit(
null,
new SelectorFilter("v0", "VIRGINIA"),
ImmutableList.of()
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -646,7 +651,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
)
)
),
ImmutableList.of()
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -717,7 +722,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
)
),
new SelectorFilter("rtc.countryName", "States United"),
ImmutableList.of()
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
@ -736,7 +741,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
actualFilterSplit.getJoinTableFilter()
);
ExpressionVirtualColumn actualVirtualColumn = (ExpressionVirtualColumn) actualFilterSplit.getPushDownVirtualColumns()
.get(0);
.iterator().next();
compareExpressionVirtualColumns(expectedVirtualColumn, actualVirtualColumn);
JoinTestHelper.verifyCursors(
@ -887,7 +892,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
)
)
),
ImmutableList.of()
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -964,7 +969,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
)
),
new SelectorFilter("c1.countryName", "Usca"),
ImmutableList.of(
ImmutableSet.of(
expectedVirtualColumn
)
);
@ -978,7 +983,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
actualFilterSplit.getJoinTableFilter()
);
ExpressionVirtualColumn actualVirtualColumn = (ExpressionVirtualColumn) actualFilterSplit.getPushDownVirtualColumns()
.get(0);
.iterator().next();
compareExpressionVirtualColumns(expectedVirtualColumn, actualVirtualColumn);
JoinTestHelper.verifyCursors(
@ -1053,7 +1058,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
)
),
new SelectorFilter("c1.v", "Usca"),
ImmutableList.of(
ImmutableSet.of(
expectedVirtualColumn
)
);
@ -1067,7 +1072,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
actualFilterSplit.getJoinTableFilter()
);
ExpressionVirtualColumn actualVirtualColumn = (ExpressionVirtualColumn) actualFilterSplit.getPushDownVirtualColumns()
.get(0);
.iterator().next();
compareExpressionVirtualColumns(expectedVirtualColumn, actualVirtualColumn);
JoinTestHelper.verifyCursors(
@ -1119,7 +1124,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
)
),
new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName", "Germany"),
ImmutableList.of()
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -1175,7 +1180,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
)
),
new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "v", "Germany"),
ImmutableList.of()
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -1230,7 +1235,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName", null)
)
),
ImmutableList.of()
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -1284,7 +1289,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "v", null)
)
),
ImmutableList.of()
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -1337,7 +1342,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
)
),
new SelectorFilter(FACT_TO_COUNTRY_ON_NUMBER_PREFIX + "countryName", "Australia"),
ImmutableList.of()
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -1399,7 +1404,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
)
),
new SelectorFilter(FACT_TO_COUNTRY_ON_NUMBER_PREFIX + "v", "Australia"),
ImmutableList.of()
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -1460,7 +1465,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
new SelectorFilter(FACT_TO_COUNTRY_ON_NUMBER_PREFIX + "countryName", null)
)
),
ImmutableList.of()
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -1513,7 +1518,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
new SelectorFilter(FACT_TO_COUNTRY_ON_NUMBER_PREFIX + "v", null)
)
),
ImmutableList.of()
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -1565,7 +1570,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
)
),
new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName", "El Salvador"),
ImmutableList.of()
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -1621,7 +1626,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
)
),
new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "v", "El Salvador"),
ImmutableList.of()
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -1676,7 +1681,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName", null)
)
),
ImmutableList.of()
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -1730,7 +1735,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "v", null)
)
),
ImmutableList.of()
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -1796,7 +1801,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
)
),
new SelectorFilter("r1.regionName", "Fourems Province"),
ImmutableList.of()
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -1840,7 +1845,12 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
List<JoinableClause> joinableClauses = ImmutableList.of(
factExprToRegon
);
Filter originalFilter = new SelectorFilter("r1.regionName", "Fourems Province");
Filter originalFilter = new OrFilter(
ImmutableList.of(
new SelectorFilter("r1.regionName", "Fourems Province"),
new SelectorFilter("r1.regionIsoCode", "AAAA")
)
);
JoinFilterPreAnalysis joinFilterPreAnalysis = simplePreAnalysis(
joinableClauses,
@ -1853,9 +1863,14 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
);
JoinFilterSplit expectedFilterSplit = new JoinFilterSplit(
new InDimFilter("regionIsoCode", ImmutableSet.of("MMMM"), null, null).toFilter(),
new SelectorFilter("r1.regionName", "Fourems Province"),
ImmutableList.of()
new OrFilter(
ImmutableList.of(
new InDimFilter("regionIsoCode", ImmutableSet.of("MMMM"), null, null).toFilter(),
new SelectorFilter("regionIsoCode", "AAAA")
)
),
originalFilter,
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -1879,6 +1894,83 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
);
}
@Test
public void test_filterPushDown_factToRegionThreeRHSColumnsAllDirectAndFilterOnRHS()
{
JoinableClause factExprToRegon = new JoinableClause(
FACT_TO_REGION_PREFIX,
new IndexedTableJoinable(regionsTable),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format(
"\"%sregionIsoCode\" == regionIsoCode && \"%scountryIsoCode\" == regionIsoCode && \"%sregionName\" == user",
FACT_TO_REGION_PREFIX,
FACT_TO_REGION_PREFIX,
FACT_TO_REGION_PREFIX
),
FACT_TO_REGION_PREFIX,
ExprMacroTable.nil()
)
);
List<JoinableClause> joinableClauses = ImmutableList.of(
factExprToRegon
);
Filter originalFilter = new OrFilter(
ImmutableList.of(
new SelectorFilter("r1.regionName", "Fourems Province"),
new SelectorFilter("r1.regionIsoCode", "AAAA")
)
);
JoinFilterPreAnalysis joinFilterPreAnalysis = simplePreAnalysis(
joinableClauses,
originalFilter
);
HashJoinSegmentStorageAdapter adapter = new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
joinFilterPreAnalysis
);
JoinFilterSplit expectedFilterSplit = new JoinFilterSplit(
new OrFilter(
ImmutableList.of(
new SelectorFilter("user", "Fourems Province"),
new SelectorFilter("regionIsoCode", "AAAA")
)
),
null,
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
// This query doesn't execute because regionName is not a key column, but we can still check the
// filter rewrites.
expectedException.expect(IAE.class);
expectedException.expectMessage(
"Cannot build hash-join matcher on non-key-based condition: Equality{leftExpr=user, rightColumn='regionName'}"
);
JoinTestHelper.verifyCursors(
adapter.makeCursors(
originalFilter,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
),
ImmutableList.of(
"page",
FACT_TO_REGION_PREFIX + "regionName"
),
ImmutableList.of()
);
}
@Test
public void test_filterPushDown_factToRegionToCountryLeftFilterOnPageDisablePushDown()
{
@ -1906,7 +1998,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
JoinFilterSplit expectedFilterSplit = new JoinFilterSplit(
null,
new SelectorFilter("page", "Peremptory norm"),
ImmutableList.of()
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -2002,7 +2094,7 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
)
)
),
ImmutableList.of()
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -2034,9 +2126,105 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
@Test
public void test_filterPushDown_factToRegionToCountryLeftFilterOnRHSJoinConditionColumns()
{
Filter originalFilter = new SelectorFilter("rtc.countryIsoCode", "CA");
test_filterPushDown_factToRegionToCountryLeftFilterOnRHSJoinConditionColumnsHelper(false);
}
@Test
public void test_filterPushDown_factToRegionToCountryLeftFilterOnRHSJoinConditionColumnsWithLhsExpr()
{
test_filterPushDown_factToRegionToCountryLeftFilterOnRHSJoinConditionColumnsHelper(true);
}
private void test_filterPushDown_factToRegionToCountryLeftFilterOnRHSJoinConditionColumnsHelper(boolean hasLhsExpressionInJoinCondition)
{
Filter expressionFilter = new ExpressionDimFilter(
"\"rtc.countryIsoCode\" == 'CA'",
ExprMacroTable.nil()
).toFilter();
Filter specialSelectorFilter = new SelectorFilter("rtc.countryIsoCode", "CA") {
@Override
public boolean supportsRequiredColumnRewrite()
{
return false;
}
};
Filter originalFilter = new AndFilter(
ImmutableList.of(
new SelectorFilter("r1.regionIsoCode", "ON"),
new SelectorFilter("rtc.countryIsoCode", "CA"),
specialSelectorFilter,
new BoundFilter(new BoundDimFilter(
"rtc.countryIsoCode",
"CA",
"CB",
false,
false,
null,
null,
null
)),
expressionFilter,
new InDimFilter("rtc.countryIsoCode", ImmutableSet.of("CA", "CA2", "CA3"), null, null).toFilter(),
new OrFilter(
ImmutableList.of(
new SelectorFilter("channel", "#fr.wikipedia"),
new SelectorFilter("rtc.countryIsoCode", "QQQ"),
new BoundFilter(new BoundDimFilter(
"rtc.countryIsoCode",
"YYY",
"ZZZ",
false,
false,
null,
null,
null
))
)
),
new OrFilter(
ImmutableList.of(
new SelectorFilter("namespace", "Main"),
new SelectorFilter("rtc.countryIsoCode", "ABCDEF"),
new SelectorFilter("rtc.countryName", "Canada"),
new BoundFilter(new BoundDimFilter(
"rtc.countryIsoCode",
"XYZXYZ",
"XYZXYZ",
false,
false,
null,
null,
null
))
)
)
)
);
JoinableClause factToRegionClause;
if (hasLhsExpressionInJoinCondition) {
factToRegionClause = new JoinableClause(
FACT_TO_REGION_PREFIX,
new IndexedTableJoinable(regionsTable),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format(
"\"%sregionIsoCode\" == upper(lower(regionIsoCode)) && \"%scountryIsoCode\" == upper(lower(countryIsoCode))",
FACT_TO_REGION_PREFIX,
FACT_TO_REGION_PREFIX
),
FACT_TO_REGION_PREFIX,
ExprMacroTable.nil()
)
);
} else {
factToRegionClause = factToRegion(JoinType.LEFT);
}
List<JoinableClause> joinableClauses = ImmutableList.of(
factToRegion(JoinType.LEFT),
factToRegionClause,
regionToCountry(JoinType.LEFT)
);
@ -2051,10 +2239,111 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
joinFilterPreAnalysis
);
String rewrittenCountryIsoCodeColumnName = hasLhsExpressionInJoinCondition
? "JOIN-FILTER-PUSHDOWN-VIRTUAL-COLUMN-0"
: "countryIsoCode";
String rewrittenRegionIsoCodeColumnName = hasLhsExpressionInJoinCondition
? "JOIN-FILTER-PUSHDOWN-VIRTUAL-COLUMN-1"
: "regionIsoCode";
Set<VirtualColumn> expectedVirtualColumns;
if (hasLhsExpressionInJoinCondition) {
expectedVirtualColumns = ImmutableSet.of(
new ExpressionVirtualColumn(
rewrittenRegionIsoCodeColumnName,
"(upper [(lower [regionIsoCode])])",
ValueType.STRING,
ExprMacroTable.nil()
),
new ExpressionVirtualColumn(
rewrittenCountryIsoCodeColumnName,
"(upper [(lower [countryIsoCode])])",
ValueType.STRING,
ExprMacroTable.nil()
)
);
} else {
expectedVirtualColumns = ImmutableSet.of();
}
JoinFilterSplit expectedFilterSplit = new JoinFilterSplit(
new InDimFilter("countryIsoCode", ImmutableSet.of("CA"), null, null).toFilter(),
new SelectorFilter("rtc.countryIsoCode", "CA"),
ImmutableList.of()
new AndFilter(
ImmutableList.of(
new SelectorFilter(rewrittenRegionIsoCodeColumnName, "ON"),
new SelectorFilter(rewrittenCountryIsoCodeColumnName, "CA"),
new BoundFilter(new BoundDimFilter(
rewrittenCountryIsoCodeColumnName,
"CA",
"CB",
false,
false,
null,
null,
null
)),
new InDimFilter(rewrittenCountryIsoCodeColumnName, ImmutableSet.of("CA", "CA2", "CA3"), null, null).toFilter(),
new InDimFilter(rewrittenCountryIsoCodeColumnName, ImmutableSet.of("CA"), null, null).toFilter(),
new OrFilter(
ImmutableList.of(
new SelectorFilter("channel", "#fr.wikipedia"),
new SelectorFilter(rewrittenCountryIsoCodeColumnName, "QQQ"),
new BoundFilter(new BoundDimFilter(
rewrittenCountryIsoCodeColumnName,
"YYY",
"ZZZ",
false,
false,
null,
null,
null
))
)
),
new OrFilter(
ImmutableList.of(
new SelectorFilter("namespace", "Main"),
new SelectorFilter(rewrittenCountryIsoCodeColumnName, "ABCDEF"),
new InDimFilter(rewrittenCountryIsoCodeColumnName, ImmutableSet.of("CA"), null, null).toFilter(),
new BoundFilter(new BoundDimFilter(
rewrittenCountryIsoCodeColumnName,
"XYZXYZ",
"XYZXYZ",
false,
false,
null,
null,
null
))
)
)
)
),
new AndFilter(
ImmutableList.of(
specialSelectorFilter,
expressionFilter,
new OrFilter(
ImmutableList.of(
new SelectorFilter("namespace", "Main"),
new SelectorFilter("rtc.countryIsoCode", "ABCDEF"),
new SelectorFilter("rtc.countryName", "Canada"),
new BoundFilter(new BoundDimFilter(
"rtc.countryIsoCode",
"XYZXYZ",
"XYZXYZ",
false,
false,
null,
null,
null
))
)
)
)
),
expectedVirtualColumns
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -2074,22 +2363,18 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
REGION_TO_COUNTRY_PREFIX + "countryName"
),
ImmutableList.of(
new Object[]{"Didier Leclair", "Ontario", "Canada"},
new Object[]{"Les Argonautes", "Quebec", "Canada"},
new Object[]{"Sarah Michelle Gellar", "Ontario", "Canada"}
new Object[]{"Didier Leclair", "Ontario", "Canada"}
)
);
}
@Test
public void test_filterPushDown_factToRegionToCountryLeftFilterOnTwoRHSColumnsSameValue()
{
Filter originalFilter = new AndFilter(
ImmutableList.of(
new SelectorFilter("r1.regionIsoCode", "CA"),
new SelectorFilter("r1.countryIsoCode", "CA")
new SelectorFilter("r1.regionName", "California"),
new SelectorFilter("r1.extraField", "California")
)
);
@ -2113,20 +2398,20 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
ImmutableList.of(
new AndFilter(
ImmutableList.of(
new InDimFilter("countryIsoCode", ImmutableSet.of("US"), null, null).toFilter(),
new InDimFilter("regionIsoCode", ImmutableSet.of("CA"), null, null).toFilter()
new InDimFilter("countryIsoCode", ImmutableSet.of("MMMM", "AAAA"), null, null).toFilter(),
new InDimFilter("regionIsoCode", ImmutableSet.of("MMMM", "AAAA"), null, null).toFilter()
)
),
new AndFilter(
ImmutableList.of(
new InDimFilter("countryIsoCode", ImmutableSet.of("CA"), null, null).toFilter(),
new InDimFilter("regionIsoCode", ImmutableSet.of("ON", "QC"), null, null).toFilter()
new InDimFilter("countryIsoCode", ImmutableSet.of("US"), null, null).toFilter(),
new InDimFilter("regionIsoCode", ImmutableSet.of("CA"), null, null).toFilter()
)
)
)
),
originalFilter,
ImmutableList.of()
ImmutableSet.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);

View File

@ -99,6 +99,7 @@ public class JoinTestHelper
.add("regionIsoCode", ValueType.STRING)
.add("countryIsoCode", ValueType.STRING)
.add("regionName", ValueType.STRING)
.add("extraField", ValueType.STRING)
.build();
private static final ColumnProcessorFactory<Supplier<Object>> SIMPLE_READER =

View File

@ -17,4 +17,5 @@
{"regionIsoCode":"VA","countryIsoCode":"US","regionName":"Virginia"}
{"regionIsoCode":"AV","countryIsoCode":"SU","regionName":"Ainigriv"}
{"regionIsoCode":"ZZ","countryIsoCode":"USCA","regionName":"Usca City"}
{"regionIsoCode":"MMMM","countryIsoCode":"MMMM","regionName":"Fourems Province"}
{"regionIsoCode":"MMMM","countryIsoCode":"MMMM","regionName":"Fourems Province", "extraField":"California"}
{"regionIsoCode":"AAAA","countryIsoCode":"AAAA","regionName":"Foureis Province", "extraField":"California"}