diff --git a/processing/src/main/java/org/apache/druid/segment/QueryableIndexStorageAdapter.java b/processing/src/main/java/org/apache/druid/segment/QueryableIndexStorageAdapter.java index 13b0541f546..cbb64461575 100644 --- a/processing/src/main/java/org/apache/druid/segment/QueryableIndexStorageAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/QueryableIndexStorageAdapter.java @@ -41,6 +41,7 @@ import org.apache.druid.segment.column.DictionaryEncodedColumn; import org.apache.druid.segment.column.NumericColumn; import org.apache.druid.segment.data.Indexed; import org.apache.druid.segment.filter.AndFilter; +import org.apache.druid.segment.filter.Filters; import org.apache.druid.segment.vector.VectorCursor; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -430,15 +431,6 @@ public class QueryableIndexStorageAdapter implements StorageAdapter } } - final Filter postFilter; - if (postFilters.size() == 0) { - postFilter = null; - } else if (postFilters.size() == 1) { - postFilter = postFilters.get(0); - } else { - postFilter = new AndFilter(postFilters); - } - if (queryMetrics != null) { queryMetrics.preFilters(preFilters); queryMetrics.postFilters(postFilters); @@ -446,7 +438,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter queryMetrics.reportPreFilteredRows(preFilteredRows); } - return new FilterAnalysis(preFilterBitmap, postFilter); + return new FilterAnalysis(preFilterBitmap, Filters.and(postFilters)); } @VisibleForTesting diff --git a/processing/src/main/java/org/apache/druid/segment/filter/AndFilter.java b/processing/src/main/java/org/apache/druid/segment/filter/AndFilter.java index 2192340f7fc..93c890a9808 100644 --- a/processing/src/main/java/org/apache/druid/segment/filter/AndFilter.java +++ b/processing/src/main/java/org/apache/druid/segment/filter/AndFilter.java @@ -39,6 +39,7 @@ import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import java.util.ArrayList; import java.util.List; +import java.util.Objects; /** */ @@ -234,4 +235,23 @@ public class AndFilter implements BooleanFilter } }; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AndFilter andFilter = (AndFilter) o; + return Objects.equals(getFilters(), andFilter.getFilters()); + } + + @Override + public int hashCode() + { + return Objects.hash(getFilters()); + } } diff --git a/processing/src/main/java/org/apache/druid/segment/filter/BoundFilter.java b/processing/src/main/java/org/apache/druid/segment/filter/BoundFilter.java index ee37e1d59fe..664740ac334 100644 --- a/processing/src/main/java/org/apache/druid/segment/filter/BoundFilter.java +++ b/processing/src/main/java/org/apache/druid/segment/filter/BoundFilter.java @@ -47,6 +47,7 @@ import org.apache.druid.segment.column.BitmapIndex; import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import java.util.Comparator; +import java.util.Objects; import java.util.Set; public class BoundFilter implements Filter @@ -306,4 +307,26 @@ public class BoundFilter implements Filter } return (lowerComparing >= 0) && (upperComparing >= 0); } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BoundFilter that = (BoundFilter) o; + return Objects.equals(boundDimFilter, that.boundDimFilter) && + Objects.equals(comparator, that.comparator) && + Objects.equals(extractionFn, that.extractionFn) && + Objects.equals(filterTuning, that.filterTuning); + } + + @Override + public int hashCode() + { + return Objects.hash(boundDimFilter, comparator, extractionFn, filterTuning); + } } diff --git a/processing/src/main/java/org/apache/druid/segment/filter/Filters.java b/processing/src/main/java/org/apache/druid/segment/filter/Filters.java index 79dfd407165..1b32aa07b95 100644 --- a/processing/src/main/java/org/apache/druid/segment/filter/Filters.java +++ b/processing/src/main/java/org/apache/druid/segment/filter/Filters.java @@ -641,4 +641,26 @@ public class Filters } return false; } + + /** + * Create a filter representing an AND relationship across a list of filters. + * + * @param filterList List of filters + * @return If filterList has more than one element, return an AND filter composed of the filters from filterList + * If filterList has a single element, return that element alone + * If filterList is empty, return null + */ + @Nullable + public static Filter and(List filterList) + { + if (filterList.isEmpty()) { + return null; + } + + if (filterList.size() == 1) { + return filterList.get(0); + } + + return new AndFilter(filterList); + } } diff --git a/processing/src/main/java/org/apache/druid/segment/filter/InFilter.java b/processing/src/main/java/org/apache/druid/segment/filter/InFilter.java index 1aec9cc0063..54ad64b5066 100644 --- a/processing/src/main/java/org/apache/druid/segment/filter/InFilter.java +++ b/processing/src/main/java/org/apache/druid/segment/filter/InFilter.java @@ -45,6 +45,7 @@ import org.apache.druid.segment.column.BitmapIndex; import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import java.util.Iterator; +import java.util.Objects; import java.util.Set; /** @@ -235,4 +236,26 @@ public class InFilter implements Filter } }; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + InFilter inFilter = (InFilter) o; + return Objects.equals(dimension, inFilter.dimension) && + Objects.equals(values, inFilter.values) && + Objects.equals(extractionFn, inFilter.extractionFn) && + Objects.equals(filterTuning, inFilter.filterTuning); + } + + @Override + public int hashCode() + { + return Objects.hash(dimension, values, extractionFn, filterTuning); + } } diff --git a/processing/src/main/java/org/apache/druid/segment/filter/OrFilter.java b/processing/src/main/java/org/apache/druid/segment/filter/OrFilter.java index 1c54de1b347..9e5314b19e0 100644 --- a/processing/src/main/java/org/apache/druid/segment/filter/OrFilter.java +++ b/processing/src/main/java/org/apache/druid/segment/filter/OrFilter.java @@ -39,6 +39,7 @@ import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import java.util.ArrayList; import java.util.List; +import java.util.Objects; /** */ @@ -218,4 +219,23 @@ public class OrFilter implements BooleanFilter } }; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + OrFilter orFilter = (OrFilter) o; + return Objects.equals(getFilters(), orFilter.getFilters()); + } + + @Override + public int hashCode() + { + return Objects.hash(getFilters()); + } } diff --git a/processing/src/main/java/org/apache/druid/segment/filter/SelectorFilter.java b/processing/src/main/java/org/apache/druid/segment/filter/SelectorFilter.java index 7d29d0e3feb..75874278d7a 100644 --- a/processing/src/main/java/org/apache/druid/segment/filter/SelectorFilter.java +++ b/processing/src/main/java/org/apache/druid/segment/filter/SelectorFilter.java @@ -34,6 +34,7 @@ import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import javax.annotation.Nullable; +import java.util.Objects; import java.util.Set; /** @@ -127,4 +128,35 @@ public class SelectorFilter implements Filter { return StringUtils.format("%s = %s", dimension, value); } + + public String getDimension() + { + return dimension; + } + + public String getValue() + { + return value; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SelectorFilter that = (SelectorFilter) o; + return Objects.equals(getDimension(), that.getDimension()) && + Objects.equals(getValue(), that.getValue()) && + Objects.equals(filterTuning, that.filterTuning); + } + + @Override + public int hashCode() + { + return Objects.hash(getDimension(), getValue(), filterTuning); + } } diff --git a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java index 72dc0f551cc..b269ec216e7 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java @@ -35,6 +35,8 @@ import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.data.Indexed; import org.apache.druid.segment.data.ListIndexed; +import org.apache.druid.segment.join.filter.JoinFilterAnalyzer; +import org.apache.druid.segment.join.filter.JoinFilterSplit; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -223,13 +225,19 @@ public class HashJoinSegmentStorageAdapter implements StorageAdapter } } + JoinFilterSplit joinFilterSplit = JoinFilterAnalyzer.splitFilter( + this, + filter + ); + preJoinVirtualColumns.addAll(joinFilterSplit.getPushDownVirtualColumns()); + // Soon, we will need a way to push filters past a join when possible. This could potentially be done right here // (by splitting out pushable pieces of 'filter') or it could be done at a higher level (i.e. in the SQL planner). // // If it's done in the SQL planner, that will likely mean adding a 'baseFilter' parameter to this class that would // be passed in to the below baseAdapter.makeCursors call (instead of the null filter). final Sequence baseCursorSequence = baseAdapter.makeCursors( - null, + joinFilterSplit.getBaseTableFilter().isPresent() ? joinFilterSplit.getBaseTableFilter().get() : null, interval, VirtualColumns.create(preJoinVirtualColumns), gran, @@ -246,16 +254,25 @@ public class HashJoinSegmentStorageAdapter implements StorageAdapter retVal = HashJoinEngine.makeJoinCursor(retVal, clause); } - return PostJoinCursor.wrap(retVal, VirtualColumns.create(postJoinVirtualColumns), filter); + return PostJoinCursor.wrap( + retVal, + VirtualColumns.create(postJoinVirtualColumns), + joinFilterSplit.getJoinTableFilter().isPresent() ? joinFilterSplit.getJoinTableFilter().get() : null + ); } ); } + public List getClauses() + { + return clauses; + } + /** * Returns whether "column" will be selected from "baseAdapter". This is true if it is not shadowed by any joinables * (i.e. if it does not start with any of their prefixes). */ - private boolean isBaseColumn(final String column) + public boolean isBaseColumn(final String column) { return !getClauseForColumn(column).isPresent(); } diff --git a/processing/src/main/java/org/apache/druid/segment/join/Joinable.java b/processing/src/main/java/org/apache/druid/segment/join/Joinable.java index b516afbc6cf..63c58b44031 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/Joinable.java +++ b/processing/src/main/java/org/apache/druid/segment/join/Joinable.java @@ -24,6 +24,7 @@ import org.apache.druid.segment.column.ColumnCapabilities; import javax.annotation.Nullable; import java.util.List; +import java.util.Set; /** * Represents something that can be the right-hand side of a join. @@ -75,4 +76,19 @@ public interface Joinable JoinConditionAnalysis condition, boolean remainderNeeded ); + + /** + * Searches a column from this Joinable for a particular value, finds rows that match, + * and returns values of a second column for those rows. + * + * @param searchColumnName Name of the search column + * @param searchColumnValue Target value of the search column + * @param retrievalColumnName The column to retrieve values from + * @return The set of correlated column values. If we cannot determine correlated values, return an empty set. + */ + Set getCorrelatedColumnValues( + String searchColumnName, + String searchColumnValue, + String retrievalColumnName + ); } diff --git a/processing/src/main/java/org/apache/druid/segment/join/filter/AllNullColumnSelectorFactory.java b/processing/src/main/java/org/apache/druid/segment/join/filter/AllNullColumnSelectorFactory.java new file mode 100644 index 00000000000..79756853063 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/join/filter/AllNullColumnSelectorFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.join.filter; + +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.NilColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; + +public class AllNullColumnSelectorFactory implements ColumnSelectorFactory +{ + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + return DimensionSelector.constant(null); + } + + @Override + public ColumnValueSelector makeColumnValueSelector(String columnName) + { + return NilColumnValueSelector.instance(); + } + + @Override + public ColumnCapabilities getColumnCapabilities(String columnName) + { + return null; + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalysis.java b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalysis.java new file mode 100644 index 00000000000..4f8166a62b0 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalysis.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.join.filter; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.query.filter.Filter; +import org.apache.druid.segment.VirtualColumn; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.Optional; + +/** + * Holds information about: + * - whether a filter can be pushed down + * - if it needs to be retained after the join, + * - a reference to the original filter + * - a potentially rewritten filter to be pushed down to the base table + * - a list of virtual columns that need to be created on the base table to support the pushed down filter + */ +public class JoinFilterAnalysis +{ + private final boolean retainAfterJoin; + private final Filter originalFilter; + private final Optional pushDownFilter; + private final List pushDownVirtualColumns; + + public JoinFilterAnalysis( + boolean retainAfterJoin, + Filter originalFilter, + @Nullable Filter pushDownFilter, + List pushDownVirtualColumns + ) + { + this.retainAfterJoin = retainAfterJoin; + this.originalFilter = originalFilter; + this.pushDownFilter = pushDownFilter == null ? Optional.empty() : Optional.of(pushDownFilter); + this.pushDownVirtualColumns = pushDownVirtualColumns; + } + + public boolean isCanPushDown() + { + return pushDownFilter.isPresent(); + } + + public boolean isRetainAfterJoin() + { + return retainAfterJoin; + } + + public Filter getOriginalFilter() + { + return originalFilter; + } + + public Optional getPushDownFilter() + { + return pushDownFilter; + } + + public List getPushDownVirtualColumns() + { + return pushDownVirtualColumns; + } + + /** + * Utility method for generating an analysis that represents: "Filter cannot be pushed down" + * + * @param originalFilter The original filter which cannot be pushed down + * + * @return analysis that represents: "Filter cannot be pushed down" + */ + public static JoinFilterAnalysis createNoPushdownFilterAnalysis(Filter originalFilter) + { + return new JoinFilterAnalysis( + true, + originalFilter, + null, + ImmutableList.of() + ); + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalyzer.java b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalyzer.java new file mode 100644 index 00000000000..c5a47c1eae0 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalyzer.java @@ -0,0 +1,521 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.join.filter; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.math.expr.Expr; +import org.apache.druid.query.filter.Filter; +import org.apache.druid.query.filter.InDimFilter; +import org.apache.druid.query.filter.ValueMatcher; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.filter.AndFilter; +import org.apache.druid.segment.filter.Filters; +import org.apache.druid.segment.filter.OrFilter; +import org.apache.druid.segment.filter.SelectorFilter; +import org.apache.druid.segment.join.Equality; +import org.apache.druid.segment.join.HashJoinSegmentStorageAdapter; +import org.apache.druid.segment.join.JoinConditionAnalysis; +import org.apache.druid.segment.join.JoinableClause; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * When there is a filter in a join query, we can sometimes improve performance by applying parts of the filter + * when we first read from the base table instead of after the join. + * + * This class provides a {@link #splitFilter(HashJoinSegmentStorageAdapter, Filter)} method that + * takes a filter and splits it into a portion that should be applied to the base table prior to the join, and a + * portion that should be applied after the join. + * + * The first step of the filter splitting is to convert the fllter into + * https://en.wikipedia.org/wiki/Conjunctive_normal_form (an AND of ORs). This allows us to consider each + * OR clause independently as a candidate for filter push down to the base table. + * + * A filter clause can be pushed down if it meets one of the following conditions: + * - The filter only applies to columns from the base table + * - The filter applies to columns from the join table, and we determine that the filter can be rewritten + * into a filter on columns from the base table + * + * For the second case, where we rewrite filter clauses, the rewritten clause can be less selective than the original, + * so we preserve the original clause in the post-join filtering phase. + */ +public class JoinFilterAnalyzer +{ + private static final String PUSH_DOWN_VIRTUAL_COLUMN_NAME_BASE = "JOIN-FILTER-PUSHDOWN-VIRTUAL-COLUMN-"; + private static final ColumnSelectorFactory ALL_NULL_COLUMN_SELECTOR_FACTORY = new AllNullColumnSelectorFactory(); + + public static JoinFilterSplit splitFilter( + HashJoinSegmentStorageAdapter hashJoinSegmentStorageAdapter, + @Nullable Filter originalFilter + ) + { + if (originalFilter == null) { + return new JoinFilterSplit( + null, + null, + ImmutableList.of() + ); + } + + Filter normalizedFilter = Filters.convertToCNF(originalFilter); + + // build the prefix and equicondition maps + // We should check that the prefixes do not duplicate or shadow each other. This is not currently implemented, + // but this is tracked at https://github.com/apache/druid/issues/9329 + // We should also consider the case where one RHS column is joined to multiple columns: + // https://github.com/apache/druid/issues/9328 + Map equiconditions = new HashMap<>(); + Map prefixes = new HashMap<>(); + for (JoinableClause clause : hashJoinSegmentStorageAdapter.getClauses()) { + prefixes.put(clause.getPrefix(), clause); + for (Equality equality : clause.getCondition().getEquiConditions()) { + equiconditions.put(clause.getPrefix() + equality.getRightColumn(), equality.getLeftExpr()); + } + } + + // List of candidates for pushdown + // CNF normalization will generate either + // - an AND filter with multiple subfilters + // - or a single non-AND subfilter which cannot be split further + List normalizedOrClauses; + if (normalizedFilter instanceof AndFilter) { + normalizedOrClauses = ((AndFilter) normalizedFilter).getFilters(); + } else { + normalizedOrClauses = Collections.singletonList(normalizedFilter); + } + + // Pushdown filters, rewriting if necessary + List leftFilters = new ArrayList<>(); + List rightFilters = new ArrayList<>(); + List pushDownVirtualColumns = new ArrayList<>(); + Map>> correlationCache = new HashMap<>(); + + for (Filter orClause : normalizedOrClauses) { + JoinFilterAnalysis joinFilterAnalysis = analyzeJoinFilterClause( + hashJoinSegmentStorageAdapter, + orClause, + prefixes, + equiconditions, + correlationCache + ); + if (joinFilterAnalysis.isCanPushDown()) { + leftFilters.add(joinFilterAnalysis.getPushDownFilter().get()); + if (!joinFilterAnalysis.getPushDownVirtualColumns().isEmpty()) { + pushDownVirtualColumns.addAll(joinFilterAnalysis.getPushDownVirtualColumns()); + } + } + if (joinFilterAnalysis.isRetainAfterJoin()) { + rightFilters.add(joinFilterAnalysis.getOriginalFilter()); + } + } + + return new JoinFilterSplit( + Filters.and(leftFilters), + Filters.and(rightFilters), + pushDownVirtualColumns + ); + } + + + + /** + * Analyze a filter clause from a filter that is in conjunctive normal form (AND of ORs). + * The clause is expected to be an OR filter or a leaf filter. + * + * @param adapter Adapter for the join + * @param filterClause Individual filter clause (an OR filter or a leaf filter) from a filter that is in CNF + * @param prefixes Map of table prefixes + * @param equiconditions Equicondition map + * @param correlationCache Cache of column correlation analyses. + * + * @return a JoinFilterAnalysis that contains a possible filter rewrite and information on how to handle the filter. + */ + private static JoinFilterAnalysis analyzeJoinFilterClause( + HashJoinSegmentStorageAdapter adapter, + Filter filterClause, + Map prefixes, + Map equiconditions, + Map>> correlationCache + ) + { + // NULL matching conditions are not currently pushed down. + // They require special consideration based on the join type, and for simplicity of the initial implementation + // this is not currently handled. + if (filterMatchesNull(filterClause)) { + return JoinFilterAnalysis.createNoPushdownFilterAnalysis(filterClause); + } + + // Currently we only support rewrites of selector filters and selector filters within OR filters. + if (filterClause instanceof SelectorFilter) { + return rewriteSelectorFilter( + adapter, + (SelectorFilter) filterClause, + prefixes, + equiconditions, + correlationCache + ); + } + + if (filterClause instanceof OrFilter) { + return rewriteOrFilter( + adapter, + (OrFilter) filterClause, + prefixes, + equiconditions, + correlationCache + ); + } + + for (String requiredColumn : filterClause.getRequiredColumns()) { + if (!adapter.isBaseColumn(requiredColumn)) { + return JoinFilterAnalysis.createNoPushdownFilterAnalysis(filterClause); + } + } + return new JoinFilterAnalysis( + false, + filterClause, + filterClause, + ImmutableList.of() + ); + } + + /** + * Potentially rewrite the subfilters of an OR filter so that the whole OR filter can be pushed down to + * the base table. + * + * @param adapter Adapter for the join + * @param orFilter OrFilter to be rewritten + * @param prefixes Map of table prefixes to clauses + * @param equiconditions Map of equiconditions + * @param correlationCache Column correlation analysis cache. This will be potentially modified by adding + * any new column correlation analyses to the cache. + * + * @return A JoinFilterAnalysis indicating how to handle the potentially rewritten filter + */ + private static JoinFilterAnalysis rewriteOrFilter( + HashJoinSegmentStorageAdapter adapter, + OrFilter orFilter, + Map prefixes, + Map equiconditions, + Map>> correlationCache + ) + { + boolean retainRhs = false; + + List newFilters = new ArrayList<>(); + for (Filter filter : orFilter.getFilters()) { + boolean allBaseColumns = true; + for (String requiredColumn : filter.getRequiredColumns()) { + if (!adapter.isBaseColumn(requiredColumn)) { + allBaseColumns = false; + } + } + + if (!allBaseColumns) { + retainRhs = true; + if (filter instanceof SelectorFilter) { + JoinFilterAnalysis rewritten = rewriteSelectorFilter( + adapter, + (SelectorFilter) filter, + prefixes, + equiconditions, + correlationCache + ); + if (!rewritten.isCanPushDown()) { + return JoinFilterAnalysis.createNoPushdownFilterAnalysis(orFilter); + } else { + newFilters.add(rewritten.getPushDownFilter().get()); + } + } else { + return JoinFilterAnalysis.createNoPushdownFilterAnalysis(orFilter); + } + } else { + newFilters.add(filter); + } + } + + return new JoinFilterAnalysis( + retainRhs, + orFilter, + new OrFilter(newFilters), + ImmutableList.of() + ); + } + + /** + * Rewrites a selector filter on a join table into an IN filter on the base table. + * + * @param baseAdapter The adapter for the join + * @param selectorFilter SelectorFilter to be rewritten + * @param prefixes Map of join table prefixes to clauses + * @param equiconditions Map of equiconditions + * @param correlationCache Cache of column correlation analyses. This will be potentially modified by adding + * any new column correlation analyses to the cache. + * + * @return A JoinFilterAnalysis that indicates how to handle the potentially rewritten filter + */ + private static JoinFilterAnalysis rewriteSelectorFilter( + HashJoinSegmentStorageAdapter baseAdapter, + SelectorFilter selectorFilter, + Map prefixes, + Map equiconditions, + Map>> correlationCache + ) + { + String filteringColumn = selectorFilter.getDimension(); + for (Map.Entry prefixAndClause : prefixes.entrySet()) { + if (prefixAndClause.getValue().includesColumn(filteringColumn)) { + Optional> correlations = correlationCache.computeIfAbsent( + prefixAndClause.getKey(), + p -> findCorrelatedBaseTableColumns( + baseAdapter, + p, + prefixes.get(p), + equiconditions + ) + ); + + if (!correlations.isPresent()) { + return JoinFilterAnalysis.createNoPushdownFilterAnalysis(selectorFilter); + } + + List newFilters = new ArrayList<>(); + List pushdownVirtualColumns = new ArrayList<>(); + + for (JoinFilterColumnCorrelationAnalysis correlationAnalysis : correlations.get()) { + if (correlationAnalysis.supportsPushDown()) { + Set correlatedValues = getCorrelatedValuesForPushDown( + selectorFilter.getDimension(), + selectorFilter.getValue(), + correlationAnalysis.getJoinColumn(), + prefixAndClause.getValue() + ); + + if (correlatedValues.isEmpty()) { + return JoinFilterAnalysis.createNoPushdownFilterAnalysis(selectorFilter); + } + + for (String correlatedBaseColumn : correlationAnalysis.getBaseColumns()) { + Filter rewrittenFilter = new InDimFilter( + correlatedBaseColumn, + correlatedValues, + null, + null + ).toFilter(); + newFilters.add(rewrittenFilter); + } + + for (Expr correlatedBaseExpr : correlationAnalysis.getBaseExpressions()) { + // We need to create a virtual column for the expressions when pushing down. + // Note that this block is never entered right now, since correlationAnalysis.supportsPushDown() + // will return false if there any correlated expressions on the base table. + // Pushdown of such filters is disabled until the expressions system supports converting an expression + // into a String representation that can be reparsed into the same expression. + // https://github.com/apache/druid/issues/9326 tracks this expressions issue. + String vcName = getCorrelatedBaseExprVirtualColumnName(pushdownVirtualColumns.size()); + + VirtualColumn correlatedBaseExprVirtualColumn = new ExpressionVirtualColumn( + vcName, + correlatedBaseExpr, + ValueType.STRING + ); + pushdownVirtualColumns.add(correlatedBaseExprVirtualColumn); + + Filter rewrittenFilter = new InDimFilter( + vcName, + correlatedValues, + null, + null + ).toFilter(); + newFilters.add(rewrittenFilter); + } + } + } + + if (newFilters.isEmpty()) { + return JoinFilterAnalysis.createNoPushdownFilterAnalysis(selectorFilter); + } + + return new JoinFilterAnalysis( + true, + selectorFilter, + Filters.and(newFilters), + pushdownVirtualColumns + ); + } + } + return new JoinFilterAnalysis( + false, + selectorFilter, + selectorFilter, + ImmutableList.of() + ); + } + + private static String getCorrelatedBaseExprVirtualColumnName(int counter) + { + // May want to have this check other column names to absolutely prevent name conflicts + return PUSH_DOWN_VIRTUAL_COLUMN_NAME_BASE + counter; + } + + /** + * Helper method for rewriting filters on join table columns into filters on base table columns. + * + * @param filterColumn A join table column that we're filtering on + * @param filterValue The value to filter on + * @param correlatedJoinColumn A join table column that appears as the RHS of an equicondition, which we can correlate + * with a column on the base table + * @param clauseForFilteredTable The joinable clause that corresponds to the join table being filtered on + * + * @return A list of values of the correlatedJoinColumn that appear in rows where filterColumn = filterValue + * Returns an empty set if we cannot determine the correlated values. + */ + private static Set getCorrelatedValuesForPushDown( + String filterColumn, + String filterValue, + String correlatedJoinColumn, + JoinableClause clauseForFilteredTable + ) + { + String filterColumnNoPrefix = filterColumn.substring(clauseForFilteredTable.getPrefix().length()); + String correlatedColumnNoPrefix = correlatedJoinColumn.substring(clauseForFilteredTable.getPrefix().length()); + + return clauseForFilteredTable.getJoinable().getCorrelatedColumnValues( + filterColumnNoPrefix, + filterValue, + correlatedColumnNoPrefix + ); + } + + /** + * For each rhs column that appears in the equiconditions for a table's JoinableClause, + * we try to determine what base table columns are related to the rhs column through the total set of equiconditions. + * We do this by searching backwards through the chain of join equiconditions using the provided equicondition map. + * + * For example, suppose we have 3 tables, A,B,C, joined with the following conditions, where A is the base table: + * A.joinColumn == B.joinColumn + * B.joinColum == C.joinColumn + * + * We would determine that C.joinColumn is correlated with A.joinColumn: we first see that + * C.joinColumn is linked to B.joinColumn which in turn is linked to A.joinColumn + * + * Suppose we had the following join conditions instead: + * f(A.joinColumn) == B.joinColumn + * B.joinColum == C.joinColumn + * In this case, the JoinFilterColumnCorrelationAnalysis for C.joinColumn would be linked to f(A.joinColumn). + * + * Suppose we had the following join conditions instead: + * A.joinColumn == B.joinColumn + * f(B.joinColum) == C.joinColumn + * + * Because we cannot reverse the function f() applied to the second table B in all cases, + * we cannot relate C.joinColumn to A.joinColumn, and we would not generate a correlation for C.joinColumn + * + * @param adapter The adapter for the join. Used to determine if a column is a base table column. + * @param tablePrefix Prefix for a join table + * @param clauseForTablePrefix Joinable clause for the prefix + * @param equiConditions Map of equiconditions, keyed by the right hand columns + * + * @return A list of correlatation analyses for the equicondition RHS columns that reside in the table associated with + * the tablePrefix + */ + private static Optional> findCorrelatedBaseTableColumns( + HashJoinSegmentStorageAdapter adapter, + String tablePrefix, + JoinableClause clauseForTablePrefix, + Map equiConditions + ) + { + JoinConditionAnalysis jca = clauseForTablePrefix.getCondition(); + + List rhsColumns = new ArrayList<>(); + for (Equality eq : jca.getEquiConditions()) { + rhsColumns.add(tablePrefix + eq.getRightColumn()); + } + + List correlations = new ArrayList<>(); + + + for (String rhsColumn : rhsColumns) { + List correlatedBaseColumns = new ArrayList<>(); + List correlatedBaseExpressions = new ArrayList<>(); + boolean terminate = false; + String findMappingFor = rhsColumn; + while (!terminate) { + Expr lhs = equiConditions.get(findMappingFor); + if (lhs == null) { + break; + } + + String identifier = lhs.getBindingIfIdentifier(); + if (identifier == null) { + // We push down if the function only requires base table columns + Expr.BindingDetails bindingDetails = lhs.analyzeInputs(); + Set requiredBindings = bindingDetails.getRequiredBindings(); + if (!requiredBindings.stream().allMatch(requiredBinding -> adapter.isBaseColumn(requiredBinding))) { + return Optional.empty(); + } + + terminate = true; + correlatedBaseExpressions.add(lhs); + } else { + // simple identifier, see if we can correlate it with a column on the base table + findMappingFor = identifier; + if (adapter.isBaseColumn(identifier)) { + terminate = true; + correlatedBaseColumns.add(findMappingFor); + } + } + } + + if (correlatedBaseColumns.isEmpty() && correlatedBaseExpressions.isEmpty()) { + return Optional.empty(); + } + + // We should merge correlation analyses if they're for the same rhsColumn + // See https://github.com/apache/druid/issues/9328 + correlations.add( + new JoinFilterColumnCorrelationAnalysis( + rhsColumn, + correlatedBaseColumns, + correlatedBaseExpressions + ) + ); + } + + return Optional.of(correlations); + } + + private static boolean filterMatchesNull(Filter filter) + { + ValueMatcher valueMatcher = filter.makeMatcher(ALL_NULL_COLUMN_SELECTOR_FACTORY); + return valueMatcher.matches(); + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterColumnCorrelationAnalysis.java b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterColumnCorrelationAnalysis.java new file mode 100644 index 00000000000..fb569b152bc --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterColumnCorrelationAnalysis.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.join.filter; + +import org.apache.druid.math.expr.Expr; + +import java.util.List; + +/** + * Represents an analysis of what base table columns, if any, can be correlated with a column that will + * be filtered on. + *

+ * For example, if we're joining on a base table via the equiconditions (id = j.id AND f(id2) = j.id2), + * then we can correlate j.id with id (base table column) and j.id2 with f(id2) (a base table expression). + */ +public class JoinFilterColumnCorrelationAnalysis +{ + private final String joinColumn; + private final List baseColumns; + private final List baseExpressions; + + public JoinFilterColumnCorrelationAnalysis( + String joinColumn, + List baseColumns, + List baseExpressions + ) + { + this.joinColumn = joinColumn; + this.baseColumns = baseColumns; + this.baseExpressions = baseExpressions; + } + + public String getJoinColumn() + { + return joinColumn; + } + + public List getBaseColumns() + { + return baseColumns; + } + + public List getBaseExpressions() + { + return baseExpressions; + } + + public boolean supportsPushDown() + { + return !baseColumns.isEmpty() && baseExpressions.isEmpty(); + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterSplit.java b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterSplit.java new file mode 100644 index 00000000000..10235ca162f --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterSplit.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.join.filter; + +import org.apache.druid.query.filter.Filter; +import org.apache.druid.segment.VirtualColumn; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * Holds the result of splitting a filter into: + * - a portion that can be pushed down to the base table + * - a portion that will be applied post-join + * - additional virtual columns that need to be created on the base table to support the pushed down filters. + */ +public class JoinFilterSplit +{ + final Optional baseTableFilter; + final Optional joinTableFilter; + final List pushDownVirtualColumns; + + public JoinFilterSplit( + @Nullable Filter baseTableFilter, + @Nullable Filter joinTableFilter, + List pushDownVirtualColumns + ) + { + this.baseTableFilter = baseTableFilter == null ? Optional.empty() : Optional.of(baseTableFilter); + this.joinTableFilter = joinTableFilter == null ? Optional.empty() : Optional.of(joinTableFilter); + this.pushDownVirtualColumns = pushDownVirtualColumns; + } + + public Optional getBaseTableFilter() + { + return baseTableFilter; + } + + public Optional getJoinTableFilter() + { + return joinTableFilter; + } + + public List getPushDownVirtualColumns() + { + return pushDownVirtualColumns; + } + + @Override + public String toString() + { + return "JoinFilterSplit{" + + "baseTableFilter=" + baseTableFilter + + ", joinTableFilter=" + joinTableFilter + + ", pushDownVirtualColumns=" + pushDownVirtualColumns + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JoinFilterSplit that = (JoinFilterSplit) o; + return Objects.equals(getBaseTableFilter(), that.getBaseTableFilter()) && + Objects.equals(getJoinTableFilter(), that.getJoinTableFilter()) && + Objects.equals(getPushDownVirtualColumns(), that.getPushDownVirtualColumns()); + } + + @Override + public int hashCode() + { + return Objects.hash(getBaseTableFilter(), getJoinTableFilter(), getPushDownVirtualColumns()); + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinable.java b/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinable.java index 47c9af912e1..1a739bc83f4 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinable.java +++ b/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinable.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.join.lookup; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.apache.druid.query.lookup.LookupExtractor; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.column.ColumnCapabilities; @@ -31,6 +32,7 @@ import org.apache.druid.segment.join.Joinable; import javax.annotation.Nullable; import java.util.List; +import java.util.Set; public class LookupJoinable implements Joinable { @@ -83,4 +85,28 @@ public class LookupJoinable implements Joinable { return LookupJoinMatcher.create(extractor, leftSelectorFactory, condition, remainderNeeded); } + + @Override + public Set getCorrelatedColumnValues( + String searchColumnName, + String searchColumnValue, + String retrievalColumnName + ) + { + Set correlatedValues; + if (LookupColumnSelectorFactory.KEY_COLUMN.equals(searchColumnName)) { + if (LookupColumnSelectorFactory.KEY_COLUMN.equals(retrievalColumnName)) { + correlatedValues = ImmutableSet.of(searchColumnValue); + } else { + correlatedValues = ImmutableSet.of(extractor.apply(searchColumnName)); + } + } else { + if (LookupColumnSelectorFactory.VALUE_COLUMN.equals(retrievalColumnName)) { + correlatedValues = ImmutableSet.of(searchColumnValue); + } else { + correlatedValues = ImmutableSet.copyOf(extractor.unapply(searchColumnValue)); + } + } + return correlatedValues; + } } diff --git a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java index df46d886904..3e2084dc0f6 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java +++ b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java @@ -19,6 +19,8 @@ package org.apache.druid.segment.join.table; +import com.google.common.collect.ImmutableSet; +import it.unimi.dsi.fastutil.ints.IntList; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.join.JoinConditionAnalysis; @@ -26,7 +28,9 @@ import org.apache.druid.segment.join.JoinMatcher; import org.apache.druid.segment.join.Joinable; import javax.annotation.Nullable; +import java.util.HashSet; import java.util.List; +import java.util.Set; public class IndexedTableJoinable implements Joinable { @@ -75,4 +79,41 @@ public class IndexedTableJoinable implements Joinable remainderNeeded ); } + + @Override + public Set getCorrelatedColumnValues( + String searchColumnName, + String searchColumnValue, + String retrievalColumnName + ) + { + int filterColumnPosition = table.allColumns().indexOf(searchColumnName); + int correlatedColumnPosition = table.allColumns().indexOf(retrievalColumnName); + + if (filterColumnPosition < 0 || correlatedColumnPosition < 0) { + return ImmutableSet.of(); + } + + Set correlatedValues = new HashSet<>(); + if (table.keyColumns().contains(searchColumnName)) { + IndexedTable.Index index = table.columnIndex(filterColumnPosition); + IndexedTable.Reader reader = table.columnReader(correlatedColumnPosition); + IntList rowIndex = index.find(searchColumnValue); + for (int i = 0; i < rowIndex.size(); i++) { + int rowNum = rowIndex.getInt(i); + correlatedValues.add(reader.read(rowNum).toString()); + } + return correlatedValues; + } else { + IndexedTable.Reader dimNameReader = table.columnReader(filterColumnPosition); + IndexedTable.Reader correlatedColumnReader = table.columnReader(correlatedColumnPosition); + for (int i = 0; i < table.numRows(); i++) { + if (searchColumnValue.equals(dimNameReader.read(i).toString())) { + correlatedValues.add(correlatedColumnReader.read(i).toString()); + } + } + + return correlatedValues; + } + } } diff --git a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVirtualColumn.java b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVirtualColumn.java index 0d9feefc459..48236515293 100644 --- a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVirtualColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVirtualColumn.java @@ -21,7 +21,9 @@ package org.apache.druid.segment.virtual; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; @@ -62,6 +64,23 @@ public class ExpressionVirtualColumn implements VirtualColumn this.parsedExpression = Suppliers.memoize(() -> Parser.parse(expression, macroTable)); } + /** + * Constructor for creating an ExpressionVirtualColumn from a pre-parsed expression. + */ + public ExpressionVirtualColumn( + String name, + Expr parsedExpression, + ValueType outputType + ) + { + this.name = Preconditions.checkNotNull(name, "name"); + // Unfortunately this string representation can't be reparsed into the same expression, might be useful + // if the expression system supported that + this.expression = parsedExpression.toString(); + this.outputType = outputType != null ? outputType : ValueType.FLOAT; + this.parsedExpression = Suppliers.ofInstance(parsedExpression); + } + @JsonProperty("name") @Override public String getOutputName() @@ -81,6 +100,13 @@ public class ExpressionVirtualColumn implements VirtualColumn return outputType; } + @JsonIgnore + @VisibleForTesting + public Supplier getParsedExpression() + { + return parsedExpression; + } + @Override public DimensionSelector makeDimensionSelector( final DimensionSpec dimensionSpec, diff --git a/processing/src/test/java/org/apache/druid/segment/filter/AndFilterTest.java b/processing/src/test/java/org/apache/druid/segment/filter/AndFilterTest.java index c469c31356e..3af7bf89ec1 100644 --- a/processing/src/test/java/org/apache/druid/segment/filter/AndFilterTest.java +++ b/processing/src/test/java/org/apache/druid/segment/filter/AndFilterTest.java @@ -22,6 +22,7 @@ package org.apache.druid.segment.filter; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InputRowParser; @@ -175,4 +176,10 @@ public class AndFilterTest extends BaseFilterTest ImmutableList.of("0", "1", "2", "3", "4", "5") ); } + + @Test + public void test_equals() + { + EqualsVerifier.forClass(AndFilter.class).usingGetClass().withNonnullFields("filters").verify(); + } } diff --git a/processing/src/test/java/org/apache/druid/segment/filter/BoundFilterTest.java b/processing/src/test/java/org/apache/druid/segment/filter/BoundFilterTest.java index f66df163ec7..cda4eba50f4 100644 --- a/processing/src/test/java/org/apache/druid/segment/filter/BoundFilterTest.java +++ b/processing/src/test/java/org/apache/druid/segment/filter/BoundFilterTest.java @@ -21,6 +21,7 @@ package org.apache.druid.segment.filter; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.InputRow; import org.apache.druid.java.util.common.Pair; @@ -721,4 +722,14 @@ public class BoundFilterTest extends BaseFilterTest ImmutableList.of("1", "2", "4", "5", "6") ); } + + @Test + public void test_equals() + { + EqualsVerifier.forClass(BoundFilter.class) + .usingGetClass() + .withNonnullFields("boundDimFilter", "comparator") + .withIgnoredFields("longPredicateSupplier", "floatPredicateSupplier", "doublePredicateSupplier") + .verify(); + } } diff --git a/processing/src/test/java/org/apache/druid/segment/filter/InFilterTest.java b/processing/src/test/java/org/apache/druid/segment/filter/InFilterTest.java index 77aa41ff9c2..e4a8fde5750 100644 --- a/processing/src/test/java/org/apache/druid/segment/filter/InFilterTest.java +++ b/processing/src/test/java/org/apache/druid/segment/filter/InFilterTest.java @@ -23,6 +23,7 @@ import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -348,6 +349,16 @@ public class InFilterTest extends BaseFilterTest } + @Test + public void test_equals() + { + EqualsVerifier.forClass(InFilter.class) + .usingGetClass() + .withNonnullFields("dimension", "values") + .withIgnoredFields("longPredicateSupplier", "floatPredicateSupplier", "doublePredicateSupplier") + .verify(); + } + private DimFilter toInFilter(String dim) { List emptyList = new ArrayList<>(); diff --git a/processing/src/test/java/org/apache/druid/segment/filter/OrFilterTest.java b/processing/src/test/java/org/apache/druid/segment/filter/OrFilterTest.java new file mode 100644 index 00000000000..844a0895e94 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/filter/OrFilterTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.filter; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.Test; + +public class OrFilterTest +{ + @Test + public void test_equals() + { + EqualsVerifier.forClass(OrFilter.class).usingGetClass().withNonnullFields("filters").verify(); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/filter/SelectorFilterTest.java b/processing/src/test/java/org/apache/druid/segment/filter/SelectorFilterTest.java index 38c817fe9e0..f0d5fe95583 100644 --- a/processing/src/test/java/org/apache/druid/segment/filter/SelectorFilterTest.java +++ b/processing/src/test/java/org/apache/druid/segment/filter/SelectorFilterTest.java @@ -22,6 +22,7 @@ package org.apache.druid.segment.filter; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.Pair; import org.apache.druid.query.extraction.MapLookupExtractor; @@ -328,4 +329,10 @@ public class SelectorFilterTest extends BaseFilterTest assertFilterMatches(new SelectorDimFilter("l0", null, null), ImmutableList.of("3")); } } + + @Test + public void test_equals() + { + EqualsVerifier.forClass(SelectorFilter.class).usingGetClass().withNonnullFields("dimension").verify(); + } } diff --git a/processing/src/test/java/org/apache/druid/segment/join/BaseHashJoinSegmentStorageAdapterTest.java b/processing/src/test/java/org/apache/druid/segment/join/BaseHashJoinSegmentStorageAdapterTest.java new file mode 100644 index 00000000000..6ebcf29a2d5 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/join/BaseHashJoinSegmentStorageAdapterTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.join; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.lookup.LookupExtractor; +import org.apache.druid.segment.QueryableIndexSegment; +import org.apache.druid.segment.join.lookup.LookupJoinable; +import org.apache.druid.segment.join.table.IndexedTable; +import org.apache.druid.segment.join.table.IndexedTableJoinable; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.timeline.SegmentId; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; + +public class BaseHashJoinSegmentStorageAdapterTest +{ + public static final String FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX = "c1."; + public static final String FACT_TO_COUNTRY_ON_NUMBER_PREFIX = "c2."; + public static final String FACT_TO_REGION_PREFIX = "r1."; + public static final String REGION_TO_COUNTRY_PREFIX = "rtc."; + public static Long NULL_COUNTRY; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + public QueryableIndexSegment factSegment; + public LookupExtractor countryIsoCodeToNameLookup; + public LookupExtractor countryNumberToNameLookup; + public IndexedTable countriesTable; + public IndexedTable regionsTable; + + @BeforeClass + public static void setUpStatic() + { + NullHandling.initializeForTests(); + NULL_COUNTRY = NullHandling.sqlCompatible() ? null : 0L; + } + + @Before + public void setUp() throws IOException + { + factSegment = new QueryableIndexSegment( + JoinTestHelper.createFactIndexBuilder(temporaryFolder.newFolder()).buildMMappedIndex(), + SegmentId.dummy("facts") + ); + countryIsoCodeToNameLookup = JoinTestHelper.createCountryIsoCodeToNameLookup(); + countryNumberToNameLookup = JoinTestHelper.createCountryNumberToNameLookup(); + countriesTable = JoinTestHelper.createCountriesIndexedTable(); + regionsTable = JoinTestHelper.createRegionsIndexedTable(); + } + + @After + public void tearDown() + { + if (factSegment != null) { + factSegment.close(); + } + } + + protected JoinableClause factToCountryNameUsingIsoCodeLookup(final JoinType joinType) + { + return new JoinableClause( + FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX, + LookupJoinable.wrap(countryIsoCodeToNameLookup), + joinType, + JoinConditionAnalysis.forExpression( + StringUtils.format("\"%sk\" == countryIsoCode", FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX), + FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX, + ExprMacroTable.nil() + ) + ); + } + + protected JoinableClause factToCountryNameUsingNumberLookup(final JoinType joinType) + { + return new JoinableClause( + FACT_TO_COUNTRY_ON_NUMBER_PREFIX, + LookupJoinable.wrap(countryNumberToNameLookup), + joinType, + JoinConditionAnalysis.forExpression( + StringUtils.format("\"%sk\" == countryNumber", FACT_TO_COUNTRY_ON_NUMBER_PREFIX), + FACT_TO_COUNTRY_ON_NUMBER_PREFIX, + ExprMacroTable.nil() + ) + ); + } + + protected JoinableClause factToCountryOnIsoCode(final JoinType joinType) + { + return new JoinableClause( + FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX, + new IndexedTableJoinable(countriesTable), + joinType, + JoinConditionAnalysis.forExpression( + StringUtils.format("\"%scountryIsoCode\" == countryIsoCode", FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX), + FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX, + ExprMacroTable.nil() + ) + ); + } + + protected JoinableClause factToCountryOnNumber(final JoinType joinType) + { + return new JoinableClause( + FACT_TO_COUNTRY_ON_NUMBER_PREFIX, + new IndexedTableJoinable(countriesTable), + joinType, + JoinConditionAnalysis.forExpression( + StringUtils.format("\"%scountryNumber\" == countryNumber", FACT_TO_COUNTRY_ON_NUMBER_PREFIX), + FACT_TO_COUNTRY_ON_NUMBER_PREFIX, + ExprMacroTable.nil() + ) + ); + } + + protected JoinableClause factToRegion(final JoinType joinType) + { + return new JoinableClause( + FACT_TO_REGION_PREFIX, + new IndexedTableJoinable(regionsTable), + joinType, + JoinConditionAnalysis.forExpression( + StringUtils.format( + "\"%sregionIsoCode\" == regionIsoCode && \"%scountryIsoCode\" == countryIsoCode", + FACT_TO_REGION_PREFIX, + FACT_TO_REGION_PREFIX + ), + FACT_TO_REGION_PREFIX, + ExprMacroTable.nil() + ) + ); + } + + protected JoinableClause regionToCountry(final JoinType joinType) + { + return new JoinableClause( + REGION_TO_COUNTRY_PREFIX, + new IndexedTableJoinable(countriesTable), + joinType, + JoinConditionAnalysis.forExpression( + StringUtils.format( + "\"%scountryIsoCode\" == \"%scountryIsoCode\"", + FACT_TO_REGION_PREFIX, + REGION_TO_COUNTRY_PREFIX + ), + REGION_TO_COUNTRY_PREFIX, + ExprMacroTable.nil() + ) + ); + } + + protected HashJoinSegmentStorageAdapter makeFactToCountrySegment() + { + return new HashJoinSegmentStorageAdapter( + factSegment.asStorageAdapter(), + ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT)) + ); + } + + protected void compareExpressionVirtualColumns( + ExpressionVirtualColumn expectedVirtualColumn, + ExpressionVirtualColumn actualVirtualColumn + ) + { + Assert.assertEquals( + expectedVirtualColumn.getOutputName(), + actualVirtualColumn.getOutputName() + ); + Assert.assertEquals( + expectedVirtualColumn.getOutputType(), + actualVirtualColumn.getOutputType() + ); + Assert.assertEquals( + expectedVirtualColumn.getParsedExpression().get().toString(), + actualVirtualColumn.getParsedExpression().get().toString() + ); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapterTest.java b/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapterTest.java index 27de9aaa7ca..5455fdd43ee 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapterTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapterTest.java @@ -30,81 +30,24 @@ import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.filter.ExpressionDimFilter; import org.apache.druid.query.filter.OrDimFilter; import org.apache.druid.query.filter.SelectorDimFilter; -import org.apache.druid.query.lookup.LookupExtractor; -import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.join.lookup.LookupJoinable; -import org.apache.druid.segment.join.table.IndexedTable; import org.apache.druid.segment.join.table.IndexedTableJoinable; import org.apache.druid.segment.virtual.ExpressionVirtualColumn; -import org.apache.druid.timeline.SegmentId; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.rules.TemporaryFolder; -import java.io.IOException; import java.util.Collections; -public class HashJoinSegmentStorageAdapterTest +public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorageAdapterTest { - private static final String FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX = "c1."; - private static final String FACT_TO_COUNTRY_ON_NUMBER_PREFIX = "c2."; - private static final String FACT_TO_REGION_PREFIX = "r1."; - private static final String REGION_TO_COUNTRY_PREFIX = "rtc."; - private static Long NULL_COUNTRY; - - @Rule - public ExpectedException expectedException = ExpectedException.none(); - - @Rule - public TemporaryFolder temporaryFolder = new TemporaryFolder(); - - public QueryableIndexSegment factSegment; - public LookupExtractor countryIsoCodeToNameLookup; - public LookupExtractor countryNumberToNameLookup; - public IndexedTable countriesTable; - public IndexedTable regionsTable; - - @BeforeClass - public static void setUpStatic() - { - NullHandling.initializeForTests(); - NULL_COUNTRY = NullHandling.sqlCompatible() ? null : 0L; - } - - @Before - public void setUp() throws IOException - { - factSegment = new QueryableIndexSegment( - JoinTestHelper.createFactIndexBuilder(temporaryFolder.newFolder()).buildMMappedIndex(), - SegmentId.dummy("facts") - ); - countryIsoCodeToNameLookup = JoinTestHelper.createCountryIsoCodeToNameLookup(); - countryNumberToNameLookup = JoinTestHelper.createCountryNumberToNameLookup(); - countriesTable = JoinTestHelper.createCountriesIndexedTable(); - regionsTable = JoinTestHelper.createRegionsIndexedTable(); - } - - @After - public void tearDown() - { - if (factSegment != null) { - factSegment.close(); - } - } - @Test public void test_getInterval_factToCountry() { Assert.assertEquals( - Intervals.of("2015-09-12/2015-09-12T02:33:40.060Z"), + Intervals.of("2015-09-12/2015-09-12T04:43:40.060Z"), makeFactToCountrySegment().getInterval() ); } @@ -145,7 +88,7 @@ public class HashJoinSegmentStorageAdapterTest public void test_getDimensionCardinality_factToCountryFactColumn() { Assert.assertEquals( - 15, + 17, makeFactToCountrySegment().getDimensionCardinality("countryIsoCode") ); } @@ -154,7 +97,7 @@ public class HashJoinSegmentStorageAdapterTest public void test_getDimensionCardinality_factToCountryJoinColumn() { Assert.assertEquals( - 15, + 17, makeFactToCountrySegment().getDimensionCardinality(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName") ); } @@ -190,7 +133,7 @@ public class HashJoinSegmentStorageAdapterTest public void test_getMaxTime_factToCountry() { Assert.assertEquals( - DateTimes.of("2015-09-12T02:33:40.059Z"), + DateTimes.of("2015-09-12T04:43:40.059Z"), makeFactToCountrySegment().getMaxTime() ); } @@ -325,7 +268,7 @@ public class HashJoinSegmentStorageAdapterTest public void test_getMaxIngestedEventTime_factToCountry() { Assert.assertEquals( - DateTimes.of("2015-09-12T02:33:40.059Z"), + DateTimes.of("2015-09-12T04:43:40.059Z"), makeFactToCountrySegment().getMaxIngestedEventTime() ); } @@ -396,7 +339,9 @@ public class HashJoinSegmentStorageAdapterTest new Object[]{"Wendigo", "SV", "SV", "El Salvador", 12L}, new Object[]{"Алиса в Зазеркалье", "NO", "NO", "Norway", 11L}, new Object[]{"Gabinete Ministerial de Rafael Correa", "EC", "EC", "Ecuador", 4L}, - new Object[]{"Old Anatolian Turkish", "US", "US", "United States", 13L} + new Object[]{"Old Anatolian Turkish", "US", "US", "United States", 13L}, + new Object[]{"Cream Soda", "SU", "SU", "States United", 15L}, + new Object[]{"Orange Soda", "MatchNothing", null, null, NULL_COUNTRY} ) ); } @@ -444,7 +389,8 @@ public class HashJoinSegmentStorageAdapterTest new Object[]{"Wendigo", "SV", "SV", "El Salvador", 12L}, new Object[]{"Алиса в Зазеркалье", "NO", "NO", "Norway", 11L}, new Object[]{"Gabinete Ministerial de Rafael Correa", "EC", "EC", "Ecuador", 4L}, - new Object[]{"Old Anatolian Turkish", "US", "US", "United States", 13L} + new Object[]{"Old Anatolian Turkish", "US", "US", "United States", 13L}, + new Object[]{"Cream Soda", "SU", "SU", "States United", 15L} ) ); } @@ -491,7 +437,8 @@ public class HashJoinSegmentStorageAdapterTest new Object[]{"Wendigo", "SV", "SV", "El Salvador"}, new Object[]{"Алиса в Зазеркалье", "NO", "NO", "Norway"}, new Object[]{"Gabinete Ministerial de Rafael Correa", "EC", "EC", "Ecuador"}, - new Object[]{"Old Anatolian Turkish", "US", "US", "United States"} + new Object[]{"Old Anatolian Turkish", "US", "US", "United States"}, + new Object[]{"Cream Soda", "SU", "SU", "States United"} ) ); } @@ -532,7 +479,8 @@ public class HashJoinSegmentStorageAdapterTest new Object[]{"Carlo Curti", "US", "US", "United States", 13L}, new Object[]{"Giusy Ferreri discography", "IT", "IT", "Italy", 7L}, new Object[]{"Roma-Bangkok", "IT", "IT", "Italy", 7L}, - new Object[]{"Old Anatolian Turkish", "US", "US", "United States", 13L} + new Object[]{"Old Anatolian Turkish", "US", "US", "United States", 13L}, + new Object[]{"Cream Soda", "SU", "SU", "States United", 15L} ) : ImmutableList.of( new Object[]{"Talk:Oswald Tilghman", null, "AU", "Australia", 0L}, @@ -545,7 +493,8 @@ public class HashJoinSegmentStorageAdapterTest new Object[]{"Carlo Curti", "US", "US", "United States", 13L}, new Object[]{"Giusy Ferreri discography", "IT", "IT", "Italy", 7L}, new Object[]{"Roma-Bangkok", "IT", "IT", "Italy", 7L}, - new Object[]{"Old Anatolian Turkish", "US", "US", "United States", 13L} + new Object[]{"Old Anatolian Turkish", "US", "US", "United States", 13L}, + new Object[]{"Cream Soda", "SU", "SU", "States United", 15L} ) ); } @@ -584,7 +533,8 @@ public class HashJoinSegmentStorageAdapterTest new Object[]{"Carlo Curti", "US", "United States"}, new Object[]{"Giusy Ferreri discography", "IT", "Italy"}, new Object[]{"Roma-Bangkok", "IT", "Italy"}, - new Object[]{"Old Anatolian Turkish", "US", "United States"} + new Object[]{"Old Anatolian Turkish", "US", "United States"}, + new Object[]{"Cream Soda", "SU", "States United"} ) : ImmutableList.of( new Object[]{"Talk:Oswald Tilghman", null, "Australia"}, @@ -597,7 +547,8 @@ public class HashJoinSegmentStorageAdapterTest new Object[]{"Carlo Curti", "US", "United States"}, new Object[]{"Giusy Ferreri discography", "IT", "Italy"}, new Object[]{"Roma-Bangkok", "IT", "Italy"}, - new Object[]{"Old Anatolian Turkish", "US", "United States"} + new Object[]{"Old Anatolian Turkish", "US", "United States"}, + new Object[]{"Cream Soda", "SU", "States United"} ) ); } @@ -654,7 +605,8 @@ public class HashJoinSegmentStorageAdapterTest FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryNumber" ), ImmutableList.of( - new Object[]{null, null, NullHandling.sqlCompatible() ? null : 0L, "AX", "Atlantis", 14L} + new Object[]{null, null, NullHandling.sqlCompatible() ? null : 0L, "AX", "Atlantis", 14L}, + new Object[]{null, null, NullHandling.sqlCompatible() ? null : 0L, "USCA", "Usca", 16L} ) ); } @@ -683,7 +635,8 @@ public class HashJoinSegmentStorageAdapterTest FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryNumber" ), ImmutableList.of( - new Object[]{null, null, NullHandling.sqlCompatible() ? null : 0L, "AX", "Atlantis", 14L} + new Object[]{null, null, NullHandling.sqlCompatible() ? null : 0L, "AX", "Atlantis", 14L}, + new Object[]{null, null, NullHandling.sqlCompatible() ? null : 0L, "USCA", "Usca", 16L} ) ); } @@ -846,7 +799,8 @@ public class HashJoinSegmentStorageAdapterTest new Object[]{"Wendigo", "SV", "SV", "El Salvador", 12L}, new Object[]{"Алиса в Зазеркалье", "NO", "NO", "Norway", 11L}, new Object[]{"Gabinete Ministerial de Rafael Correa", "EC", "EC", "Ecuador", 4L}, - new Object[]{"Old Anatolian Turkish", "US", "US", "United States", 13L} + new Object[]{"Old Anatolian Turkish", "US", "US", "United States", 13L}, + new Object[]{"Cream Soda", "SU", "SU", "States United", 15L} ) ); } @@ -900,7 +854,9 @@ public class HashJoinSegmentStorageAdapterTest new Object[]{"Wendigo", "Departamento de San Salvador", "El Salvador"}, new Object[]{"Алиса в Зазеркалье", "Finnmark Fylke", "Norway"}, new Object[]{"Gabinete Ministerial de Rafael Correa", "Provincia del Guayas", "Ecuador"}, - new Object[]{"Old Anatolian Turkish", "Virginia", "United States"} + new Object[]{"Old Anatolian Turkish", "Virginia", "United States"}, + new Object[]{"Cream Soda", "Ainigriv", "States United"}, + new Object[]{"Orange Soda", null, null} ) ); } @@ -950,7 +906,9 @@ public class HashJoinSegmentStorageAdapterTest new Object[]{"Diskussion:Sebastian Schulz", "Norway"}, new Object[]{"Diskussion:Sebastian Schulz", "El Salvador"}, new Object[]{"Diskussion:Sebastian Schulz", "United States"}, - new Object[]{"Diskussion:Sebastian Schulz", "Atlantis"} + new Object[]{"Diskussion:Sebastian Schulz", "Atlantis"}, + new Object[]{"Diskussion:Sebastian Schulz", "States United"}, + new Object[]{"Diskussion:Sebastian Schulz", "Usca"} ) ); } @@ -1036,7 +994,9 @@ public class HashJoinSegmentStorageAdapterTest new Object[]{"Diskussion:Sebastian Schulz", "Norway"}, new Object[]{"Diskussion:Sebastian Schulz", "El Salvador"}, new Object[]{"Diskussion:Sebastian Schulz", "United States"}, - new Object[]{"Diskussion:Sebastian Schulz", "Atlantis"} + new Object[]{"Diskussion:Sebastian Schulz", "Atlantis"}, + new Object[]{"Diskussion:Sebastian Schulz", "States United"}, + new Object[]{"Diskussion:Sebastian Schulz", "Usca"} ) ); } @@ -1287,104 +1247,4 @@ public class HashJoinSegmentStorageAdapterTest ImmutableList.of() ); } - - private JoinableClause factToCountryNameUsingIsoCodeLookup(final JoinType joinType) - { - return new JoinableClause( - FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX, - LookupJoinable.wrap(countryIsoCodeToNameLookup), - joinType, - JoinConditionAnalysis.forExpression( - StringUtils.format("\"%sk\" == countryIsoCode", FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX), - FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX, - ExprMacroTable.nil() - ) - ); - } - - private JoinableClause factToCountryNameUsingNumberLookup(final JoinType joinType) - { - return new JoinableClause( - FACT_TO_COUNTRY_ON_NUMBER_PREFIX, - LookupJoinable.wrap(countryNumberToNameLookup), - joinType, - JoinConditionAnalysis.forExpression( - StringUtils.format("\"%sk\" == countryNumber", FACT_TO_COUNTRY_ON_NUMBER_PREFIX), - FACT_TO_COUNTRY_ON_NUMBER_PREFIX, - ExprMacroTable.nil() - ) - ); - } - - private JoinableClause factToCountryOnIsoCode(final JoinType joinType) - { - return new JoinableClause( - FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX, - new IndexedTableJoinable(countriesTable), - joinType, - JoinConditionAnalysis.forExpression( - StringUtils.format("\"%scountryIsoCode\" == countryIsoCode", FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX), - FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX, - ExprMacroTable.nil() - ) - ); - } - - private JoinableClause factToCountryOnNumber(final JoinType joinType) - { - return new JoinableClause( - FACT_TO_COUNTRY_ON_NUMBER_PREFIX, - new IndexedTableJoinable(countriesTable), - joinType, - JoinConditionAnalysis.forExpression( - StringUtils.format("\"%scountryNumber\" == countryNumber", FACT_TO_COUNTRY_ON_NUMBER_PREFIX), - FACT_TO_COUNTRY_ON_NUMBER_PREFIX, - ExprMacroTable.nil() - ) - ); - } - - private JoinableClause factToRegion(final JoinType joinType) - { - return new JoinableClause( - FACT_TO_REGION_PREFIX, - new IndexedTableJoinable(regionsTable), - joinType, - JoinConditionAnalysis.forExpression( - StringUtils.format( - "\"%sregionIsoCode\" == regionIsoCode && \"%scountryIsoCode\" == countryIsoCode", - FACT_TO_REGION_PREFIX, - FACT_TO_REGION_PREFIX - ), - FACT_TO_REGION_PREFIX, - ExprMacroTable.nil() - ) - ); - } - - private JoinableClause regionToCountry(final JoinType joinType) - { - return new JoinableClause( - REGION_TO_COUNTRY_PREFIX, - new IndexedTableJoinable(countriesTable), - joinType, - JoinConditionAnalysis.forExpression( - StringUtils.format( - "\"%scountryIsoCode\" == \"%scountryIsoCode\"", - FACT_TO_REGION_PREFIX, - REGION_TO_COUNTRY_PREFIX - ), - REGION_TO_COUNTRY_PREFIX, - ExprMacroTable.nil() - ) - ); - } - - private HashJoinSegmentStorageAdapter makeFactToCountrySegment() - { - return new HashJoinSegmentStorageAdapter( - factSegment.asStorageAdapter(), - ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT)) - ); - } } diff --git a/processing/src/test/java/org/apache/druid/segment/join/JoinFilterAnalyzerTest.java b/processing/src/test/java/org/apache/druid/segment/join/JoinFilterAnalyzerTest.java new file mode 100644 index 00000000000..16ee7173c4a --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/join/JoinFilterAnalyzerTest.java @@ -0,0 +1,1256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.join; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.query.filter.BoundDimFilter; +import org.apache.druid.query.filter.Filter; +import org.apache.druid.query.filter.InDimFilter; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.filter.AndFilter; +import org.apache.druid.segment.filter.BoundFilter; +import org.apache.druid.segment.filter.OrFilter; +import org.apache.druid.segment.filter.SelectorFilter; +import org.apache.druid.segment.join.filter.JoinFilterAnalyzer; +import org.apache.druid.segment.join.filter.JoinFilterSplit; +import org.apache.druid.segment.join.table.IndexedTableJoinable; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTest +{ + @Test + public void test_filterPushDown_factToRegionToCountryLeftFilterOnChannel() + { + HashJoinSegmentStorageAdapter adapter = new HashJoinSegmentStorageAdapter( + factSegment.asStorageAdapter(), + ImmutableList.of( + factToRegion(JoinType.LEFT), + regionToCountry(JoinType.LEFT) + ) + ); + Filter originalFilter = new SelectorFilter("channel", "#en.wikipedia"); + + JoinFilterSplit expectedFilterSplit = new JoinFilterSplit( + new SelectorFilter("channel", "#en.wikipedia"), + null, + ImmutableList.of() + ); + JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter( + adapter, + originalFilter + ); + Assert.assertEquals(expectedFilterSplit, actualFilterSplit); + + JoinTestHelper.verifyCursors( + adapter.makeCursors( + originalFilter, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ), + ImmutableList.of( + "page", + FACT_TO_REGION_PREFIX + "regionName", + REGION_TO_COUNTRY_PREFIX + "countryName" + ), + ImmutableList.of( + new Object[]{"Talk:Oswald Tilghman", null, null}, + new Object[]{"Peremptory norm", "New South Wales", "Australia"}, + new Object[]{"President of India", "California", "United States"}, + new Object[]{"Glasgow", "Kingston upon Hull", "United Kingdom"}, + new Object[]{"Otjiwarongo Airport", "California", "United States"}, + new Object[]{"Sarah Michelle Gellar", "Ontario", "Canada"}, + new Object[]{"DirecTV", "North Carolina", "United States"}, + new Object[]{"Carlo Curti", "California", "United States"}, + new Object[]{"Giusy Ferreri discography", "Provincia di Varese", "Italy"}, + new Object[]{"Roma-Bangkok", "Provincia di Varese", "Italy"}, + new Object[]{"Old Anatolian Turkish", "Virginia", "United States"}, + new Object[]{"Cream Soda", "Ainigriv", "States United"}, + new Object[][]{new Object[]{"Orange Soda", null, null}} + ) + ); + } + + @Test + public void test_filterPushDown_factToRegionExprToCountryLeftFilterOnCountryName() + { + JoinableClause regionExprToCountry = new JoinableClause( + REGION_TO_COUNTRY_PREFIX, + new IndexedTableJoinable(countriesTable), + JoinType.LEFT, + JoinConditionAnalysis.forExpression( + StringUtils.format( + "reverse(\"%scountryIsoCode\") == \"%scountryIsoCode\"", + FACT_TO_REGION_PREFIX, + REGION_TO_COUNTRY_PREFIX + ), + REGION_TO_COUNTRY_PREFIX, + ExprMacroTable.nil() + ) + ); + HashJoinSegmentStorageAdapter adapter = new HashJoinSegmentStorageAdapter( + factSegment.asStorageAdapter(), + ImmutableList.of( + factToRegion(JoinType.LEFT), + regionExprToCountry + ) + ); + Filter originalFilter = new SelectorFilter("rtc.countryName", "United States"); + + JoinFilterSplit expectedFilterSplit = new JoinFilterSplit( + null, + new SelectorFilter("rtc.countryName", "United States"), + ImmutableList.of() + ); + JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter( + adapter, + originalFilter + ); + Assert.assertEquals(expectedFilterSplit, actualFilterSplit); + + JoinTestHelper.verifyCursors( + adapter.makeCursors( + originalFilter, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ), + ImmutableList.of( + "page", + FACT_TO_REGION_PREFIX + "regionName", + REGION_TO_COUNTRY_PREFIX + "countryName" + ), + ImmutableList.of( + new Object[]{"Cream Soda", "Ainigriv", "United States"} + ) + ); + } + + @Test + public void test_filterPushDown_factToRegionToCountryLeftFilterOnChannelAndCountryName() + { + HashJoinSegmentStorageAdapter adapter = new HashJoinSegmentStorageAdapter( + factSegment.asStorageAdapter(), + ImmutableList.of( + factToRegion(JoinType.LEFT), + regionToCountry(JoinType.LEFT) + ) + ); + Filter originalFilter = new AndFilter( + ImmutableList.of( + new SelectorFilter("channel", "#en.wikipedia"), + new SelectorFilter("rtc.countryName", "United States") + ) + ); + + JoinFilterSplit expectedFilterSplit = new JoinFilterSplit( + new AndFilter( + ImmutableList.of( + new SelectorFilter("channel", "#en.wikipedia"), + new InDimFilter("countryIsoCode", ImmutableSet.of("US"), null, null).toFilter() + ) + ), + new SelectorFilter("rtc.countryName", "United States"), + ImmutableList.of() + ); + JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter( + adapter, + originalFilter + ); + Assert.assertEquals(expectedFilterSplit, actualFilterSplit); + + JoinTestHelper.verifyCursors( + adapter.makeCursors( + originalFilter, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ), + ImmutableList.of( + "page", + FACT_TO_REGION_PREFIX + "regionName", + REGION_TO_COUNTRY_PREFIX + "countryName" + ), + ImmutableList.of( + new Object[]{"President of India", "California", "United States"}, + new Object[]{"Otjiwarongo Airport", "California", "United States"}, + new Object[]{"DirecTV", "North Carolina", "United States"}, + new Object[]{"Carlo Curti", "California", "United States"}, + new Object[]{"Old Anatolian Turkish", "Virginia", "United States"} + ) + ); + } + + @Test + public void test_filterPushDown_factToRegionToCountryLeftFilterOnNullColumns() + { + HashJoinSegmentStorageAdapter adapter = new HashJoinSegmentStorageAdapter( + factSegment.asStorageAdapter(), + ImmutableList.of( + factToRegion(JoinType.LEFT), + regionToCountry(JoinType.LEFT) + ) + ); + Filter originalFilter = new AndFilter( + ImmutableList.of( + new SelectorFilter("countryIsoCode", null), + new SelectorFilter("countryNumber", null), + new SelectorFilter("rtc.countryName", null), + new SelectorFilter("r1.regionName", null) + ) + ); + + JoinFilterSplit expectedFilterSplit = new JoinFilterSplit( + null, + new AndFilter( + ImmutableList.of( + new SelectorFilter("countryIsoCode", null), + new SelectorFilter("countryNumber", null), + new SelectorFilter("rtc.countryName", null), + new SelectorFilter("r1.regionName", null) + ) + ), + ImmutableList.of() + ); + JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter( + adapter, + originalFilter + ); + Assert.assertEquals(expectedFilterSplit, actualFilterSplit); + + JoinTestHelper.verifyCursors( + adapter.makeCursors( + originalFilter, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ), + ImmutableList.of( + "page", + FACT_TO_REGION_PREFIX + "regionName", + REGION_TO_COUNTRY_PREFIX + "countryName" + ), + NullHandling.sqlCompatible() ? + ImmutableList.of( + new Object[]{"Talk:Oswald Tilghman", null, null}, + new Object[]{"Rallicula", null, null}, + new Object[]{"Apamea abruzzorum", null, null}, + new Object[]{"Atractus flammigerus", null, null}, + new Object[]{"Agama mossambica", null, null} + ) : + ImmutableList.of() // when not running in SQL compatible mode, countryNumber does not have nulls + ); + } + + @Test + public void test_filterPushDown_factToRegionToCountryLeftFilterOnInvalidColumns() + { + HashJoinSegmentStorageAdapter adapter = new HashJoinSegmentStorageAdapter( + factSegment.asStorageAdapter(), + ImmutableList.of( + factToRegion(JoinType.LEFT), + regionToCountry(JoinType.LEFT) + ) + ); + Filter originalFilter = new AndFilter( + ImmutableList.of( + new SelectorFilter("baseTableInvalidColumn", "abcd"), + new SelectorFilter("rtc.invalidColumn", "abcd"), + new SelectorFilter("r1.invalidColumn", "abcd") + ) + ); + + JoinFilterSplit expectedFilterSplit = new JoinFilterSplit( + new SelectorFilter("baseTableInvalidColumn", "abcd"), + new AndFilter( + ImmutableList.of( + new SelectorFilter("rtc.invalidColumn", "abcd"), + new SelectorFilter("r1.invalidColumn", "abcd") + ) + ), + ImmutableList.of() + ); + JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter( + adapter, + originalFilter + ); + Assert.assertEquals(expectedFilterSplit, actualFilterSplit); + + JoinTestHelper.verifyCursors( + adapter.makeCursors( + originalFilter, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ), + ImmutableList.of( + "page", + FACT_TO_REGION_PREFIX + "regionName", + REGION_TO_COUNTRY_PREFIX + "countryName" + ), + ImmutableList.of() + ); + } + + @Test + public void test_filterPushDown_factToRegionToCountryLeftFilterOnChannelVirtualColumn() + { + HashJoinSegmentStorageAdapter adapter = new HashJoinSegmentStorageAdapter( + factSegment.asStorageAdapter(), + ImmutableList.of( + factToRegion(JoinType.LEFT), + regionToCountry(JoinType.LEFT) + ) + ); + Filter originalFilter = new AndFilter( + ImmutableList.of( + new SelectorFilter("v1", "virtual-column-#en.wikipedia") + ) + ); + + JoinFilterSplit expectedFilterSplit = new JoinFilterSplit( + new SelectorFilter("v1", "virtual-column-#en.wikipedia"), + null, + ImmutableList.of() + ); + JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter( + adapter, + originalFilter + ); + Assert.assertEquals(expectedFilterSplit, actualFilterSplit); + + JoinTestHelper.verifyCursors( + adapter.makeCursors( + originalFilter, + Intervals.ETERNITY, + VirtualColumns.create( + ImmutableList.of( + new ExpressionVirtualColumn( + "v1", + "concat('virtual-column-', \"channel\")", + ValueType.STRING, + TestExprMacroTable.INSTANCE + ) + ) + ), + Granularities.ALL, + false, + null + ), + ImmutableList.of( + "page", + FACT_TO_REGION_PREFIX + "regionName", + REGION_TO_COUNTRY_PREFIX + "countryName" + ), + ImmutableList.of( + new Object[]{"Talk:Oswald Tilghman", null, null}, + new Object[]{"Peremptory norm", "New South Wales", "Australia"}, + new Object[]{"President of India", "California", "United States"}, + new Object[]{"Glasgow", "Kingston upon Hull", "United Kingdom"}, + new Object[]{"Otjiwarongo Airport", "California", "United States"}, + new Object[]{"Sarah Michelle Gellar", "Ontario", "Canada"}, + new Object[]{"DirecTV", "North Carolina", "United States"}, + new Object[]{"Carlo Curti", "California", "United States"}, + new Object[]{"Giusy Ferreri discography", "Provincia di Varese", "Italy"}, + new Object[]{"Roma-Bangkok", "Provincia di Varese", "Italy"}, + new Object[]{"Old Anatolian Turkish", "Virginia", "United States"}, + new Object[]{"Cream Soda", "Ainigriv", "States United"}, + new Object[][]{new Object[]{"Orange Soda", null, null}} + ) + ); + } + + @Test + public void test_filterPushDown_factToRegionToCountryLeftFilterNormalizedAlreadyPushDownVariety() + { + HashJoinSegmentStorageAdapter adapter = new HashJoinSegmentStorageAdapter( + factSegment.asStorageAdapter(), + ImmutableList.of( + factToRegion(JoinType.LEFT), + regionToCountry(JoinType.LEFT) + ) + ); + Filter originalFilter = new AndFilter( + ImmutableList.of( + new SelectorFilter("channel", "#fr.wikipedia"), + new BoundFilter(new BoundDimFilter( + "page", + "Les Argonautes", + "Les Argonautes", + false, + false, + null, + null, + null + )), + new SelectorFilter("rtc.countryName", "Canada"), + new BoundFilter(new BoundDimFilter( + "rtc.countryName", + "Canada", + "Canada", + false, + false, + null, + null, + null + )), + new OrFilter( + ImmutableList.of( + new SelectorFilter("namespace", "main"), + new BoundFilter(new BoundDimFilter( + "user", + "24.122.168.111", + "24.122.168.111", + false, + false, + null, + null, + null + )) + ) + ), + new OrFilter( + ImmutableList.of( + new SelectorFilter("namespace", "main"), + new BoundFilter(new BoundDimFilter( + "r1.regionName", + "Quebec", + "Quebec", + false, + false, + null, + null, + null + )) + ) + ) + ) + ); + + JoinFilterSplit expectedFilterSplit = new JoinFilterSplit( + new AndFilter( + ImmutableList.of( + new SelectorFilter("channel", "#fr.wikipedia"), + new BoundFilter(new BoundDimFilter( + "page", + "Les Argonautes", + "Les Argonautes", + false, + false, + null, + null, + null + )), + new InDimFilter("countryIsoCode", ImmutableSet.of("CA"), null, null).toFilter(), + new OrFilter( + ImmutableList.of( + new SelectorFilter("namespace", "main"), + new BoundFilter(new BoundDimFilter( + "user", + "24.122.168.111", + "24.122.168.111", + false, + false, + null, + null, + null + )) + ) + ) + ) + ), + new AndFilter( + ImmutableList.of( + new SelectorFilter("rtc.countryName", "Canada"), + new BoundFilter(new BoundDimFilter( + "rtc.countryName", + "Canada", + "Canada", + false, + false, + null, + null, + null + )), + new OrFilter( + ImmutableList.of( + new SelectorFilter("namespace", "main"), + new BoundFilter(new BoundDimFilter( + "r1.regionName", + "Quebec", + "Quebec", + false, + false, + null, + null, + null + )) + ) + ) + ) + ), + ImmutableList.of() + ); + JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter( + adapter, + originalFilter + ); + Assert.assertEquals(expectedFilterSplit, actualFilterSplit); + + JoinTestHelper.verifyCursors( + adapter.makeCursors( + originalFilter, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ), + ImmutableList.of( + "page", + FACT_TO_REGION_PREFIX + "regionName", + REGION_TO_COUNTRY_PREFIX + "countryName" + ), + ImmutableList.of( + new Object[]{"Les Argonautes", "Quebec", "Canada"} + ) + ); + } + + // Rewriting filters on rhs columns is currently disabled when the lhs of the equicondition is an expression + @Ignore + @Test + public void test_filterPushDown_factExpressionsToRegionToCountryLeftFilterOnChannelAndCountryName() + { + JoinableClause factExprToRegon = new JoinableClause( + FACT_TO_REGION_PREFIX, + new IndexedTableJoinable(regionsTable), + JoinType.LEFT, + JoinConditionAnalysis.forExpression( + StringUtils.format( + "\"%sregionIsoCode\" == reverse(regionIsoCode) && \"%scountryIsoCode\" == reverse(countryIsoCode)", + FACT_TO_REGION_PREFIX, + FACT_TO_REGION_PREFIX + ), + FACT_TO_REGION_PREFIX, + ExprMacroTable.nil() + ) + ); + HashJoinSegmentStorageAdapter adapter = new HashJoinSegmentStorageAdapter( + factSegment.asStorageAdapter(), + ImmutableList.of( + factExprToRegon, + regionToCountry(JoinType.LEFT) + ) + ); + Filter originalFilter = new AndFilter( + ImmutableList.of( + new SelectorFilter("channel", "#en.wikipedia"), + new SelectorFilter("rtc.countryName", "States United") + ) + ); + + JoinFilterSplit expectedFilterSplit = new JoinFilterSplit( + new AndFilter( + ImmutableList.of( + new SelectorFilter("channel", "#en.wikipedia"), + new InDimFilter("JOIN-FILTER-PUSHDOWN-VIRTUAL-COLUMN-0", ImmutableSet.of("SU"), null, null).toFilter() + ) + ), + new SelectorFilter("rtc.countryName", "States United"), + ImmutableList.of() + ); + JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter( + adapter, + originalFilter + ); + ExpressionVirtualColumn expectedVirtualColumn = new ExpressionVirtualColumn( + "JOIN-FILTER-PUSHDOWN-VIRTUAL-COLUMN-0", + "reverse(countryIsoCode)", + ValueType.STRING, + ExprMacroTable.nil() + ); + Assert.assertEquals( + expectedFilterSplit.getBaseTableFilter(), + actualFilterSplit.getBaseTableFilter() + ); + Assert.assertEquals( + expectedFilterSplit.getJoinTableFilter(), + actualFilterSplit.getJoinTableFilter() + ); + ExpressionVirtualColumn actualVirtualColumn = (ExpressionVirtualColumn) actualFilterSplit.getPushDownVirtualColumns() + .get(0); + compareExpressionVirtualColumns(expectedVirtualColumn, actualVirtualColumn); + + JoinTestHelper.verifyCursors( + adapter.makeCursors( + originalFilter, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ), + ImmutableList.of( + "page", + FACT_TO_REGION_PREFIX + "regionName", + REGION_TO_COUNTRY_PREFIX + "countryName" + ), + ImmutableList.of( + new Object[]{"Old Anatolian Turkish", "Ainigriv", "States United"} + ) + ); + } + + @Test + public void test_filterPushDown_factToRegionToCountryNotEquiJoinLeftFilterOnChannelAndCountryName() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Cannot build hash-join matcher on non-equi-join condition: \"r1.regionIsoCode\" == regionIsoCode && reverse(\"r1.countryIsoCode\") == countryIsoCode"); + + JoinableClause factExprToRegon = new JoinableClause( + FACT_TO_REGION_PREFIX, + new IndexedTableJoinable(regionsTable), + JoinType.LEFT, + JoinConditionAnalysis.forExpression( + StringUtils.format( + "\"%sregionIsoCode\" == regionIsoCode && reverse(\"%scountryIsoCode\") == countryIsoCode", + FACT_TO_REGION_PREFIX, + FACT_TO_REGION_PREFIX + ), + FACT_TO_REGION_PREFIX, + ExprMacroTable.nil() + ) + ); + HashJoinSegmentStorageAdapter adapter = new HashJoinSegmentStorageAdapter( + factSegment.asStorageAdapter(), + ImmutableList.of( + factExprToRegon, + regionToCountry(JoinType.LEFT) + ) + ); + JoinTestHelper.verifyCursors( + adapter.makeCursors( + new AndFilter( + ImmutableList.of( + new SelectorFilter("channel", "#en.wikipedia"), + new SelectorFilter("rtc.countryName", "States United") + ) + ), + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ), + ImmutableList.of( + "page", + FACT_TO_REGION_PREFIX + "regionName", + REGION_TO_COUNTRY_PREFIX + "countryName" + ), + ImmutableList.of( + new Object[]{"Old Anatolian Turkish", "Ainigriv", "States United"} + ) + ); + } + + @Test + public void test_filterPushDown_factToRegionToCountryLeftUnnormalizedFilter() + { + HashJoinSegmentStorageAdapter adapter = new HashJoinSegmentStorageAdapter( + factSegment.asStorageAdapter(), + ImmutableList.of( + factToRegion(JoinType.LEFT), + regionToCountry(JoinType.LEFT) + ) + ); + Filter originalFilter = new OrFilter( + ImmutableList.of( + new SelectorFilter("channel", "#ko.wikipedia"), + new AndFilter( + ImmutableList.of( + new SelectorFilter("rtc.countryName", "United States"), + new SelectorFilter("r1.regionName", "Virginia") + ) + ) + ) + ); + + JoinFilterSplit expectedFilterSplit = new JoinFilterSplit( + new AndFilter( + ImmutableList.of( + new OrFilter( + ImmutableList.of( + new SelectorFilter("channel", "#ko.wikipedia"), + new InDimFilter("countryIsoCode", ImmutableSet.of("US"), null, null).toFilter() + ) + ), + new OrFilter( + ImmutableList.of( + new SelectorFilter("channel", "#ko.wikipedia"), + new AndFilter( + ImmutableList.of( + new InDimFilter("regionIsoCode", ImmutableSet.of("VA"), null, null).toFilter(), + new InDimFilter("countryIsoCode", ImmutableSet.of("US"), null, null).toFilter() + ) + ) + ) + ) + ) + ), + new AndFilter( + ImmutableList.of( + new OrFilter( + ImmutableList.of( + new SelectorFilter("channel", "#ko.wikipedia"), + new SelectorFilter("rtc.countryName", "United States") + ) + ), + new OrFilter( + ImmutableList.of( + new SelectorFilter("channel", "#ko.wikipedia"), + new SelectorFilter("r1.regionName", "Virginia") + ) + ) + ) + ), + ImmutableList.of() + ); + JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter( + adapter, + originalFilter + ); + Assert.assertEquals(expectedFilterSplit, actualFilterSplit); + + JoinTestHelper.verifyCursors( + adapter.makeCursors( + originalFilter, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ), + ImmutableList.of( + "page", + FACT_TO_REGION_PREFIX + "regionName", + REGION_TO_COUNTRY_PREFIX + "countryName" + ), + ImmutableList.of( + new Object[]{"유희왕 GX", "Seoul", "Republic of Korea"}, + new Object[]{"Old Anatolian Turkish", "Virginia", "United States"} + ) + ); + } + + // Rewriting filters on rhs columns is currently disabled when the lhs of the equicondition is an expression + @Ignore + @Test + public void test_filterPushDown_factConcatExpressionToCountryLeftFilterOnChannelAndCountryName() + { + JoinableClause factExprToCountry = new JoinableClause( + FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX, + new IndexedTableJoinable(countriesTable), + JoinType.LEFT, + JoinConditionAnalysis.forExpression( + StringUtils.format( + "\"%scountryIsoCode\" == concat(countryIsoCode, regionIsoCode)", + FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + ), + FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX, + ExprMacroTable.nil() + ) + ); + HashJoinSegmentStorageAdapter adapter = new HashJoinSegmentStorageAdapter( + factSegment.asStorageAdapter(), + ImmutableList.of( + factExprToCountry + ) + ); + Filter filter = new AndFilter( + ImmutableList.of( + new SelectorFilter("channel", "#en.wikipedia"), + new SelectorFilter("c1.countryName", "Usca") + ) + ); + + ExpressionVirtualColumn expectedVirtualColumn = new ExpressionVirtualColumn( + "JOIN-FILTER-PUSHDOWN-VIRTUAL-COLUMN-0", + "concat(countryIsoCode, regionIsoCode)", + ValueType.STRING, + ExprMacroTable.nil() + ); + JoinFilterSplit expectedFilterSplit = new JoinFilterSplit( + new AndFilter( + ImmutableList.of( + new SelectorFilter("channel", "#en.wikipedia"), + new InDimFilter("JOIN-FILTER-PUSHDOWN-VIRTUAL-COLUMN-0", ImmutableSet.of("USCA"), null, null).toFilter() + ) + ), + new SelectorFilter("c1.countryName", "Usca"), + ImmutableList.of( + expectedVirtualColumn + ) + ); + JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter( + adapter, + filter + ); + Assert.assertEquals( + expectedFilterSplit.getBaseTableFilter(), + actualFilterSplit.getBaseTableFilter() + ); + Assert.assertEquals( + expectedFilterSplit.getJoinTableFilter(), + actualFilterSplit.getJoinTableFilter() + ); + ExpressionVirtualColumn actualVirtualColumn = (ExpressionVirtualColumn) actualFilterSplit.getPushDownVirtualColumns() + .get(0); + compareExpressionVirtualColumns(expectedVirtualColumn, actualVirtualColumn); + + JoinTestHelper.verifyCursors( + adapter.makeCursors( + filter, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ), + ImmutableList.of( + "page", + FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName" + ), + ImmutableList.of( + new Object[]{"President of India", "Usca"}, + new Object[]{"Otjiwarongo Airport", "Usca"}, + new Object[]{"Carlo Curti", "Usca"} + ) + ); + } + + @Test + public void test_filterPushDown_factToCountryRightWithFilterOnChannelAndJoinable() + { + HashJoinSegmentStorageAdapter adapter = new HashJoinSegmentStorageAdapter( + factSegment.asStorageAdapter(), + ImmutableList.of(factToCountryOnIsoCode(JoinType.RIGHT)) + ); + Filter originalFilter = new AndFilter( + ImmutableList.of( + new SelectorFilter("channel", "#de.wikipedia"), + new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName", "Germany") + ) + ); + + JoinFilterSplit expectedFilterSplit = new JoinFilterSplit( + new AndFilter( + ImmutableList.of( + new SelectorFilter("channel", "#de.wikipedia"), + new InDimFilter("countryIsoCode", ImmutableSet.of("DE"), null, null).toFilter() + ) + ), + new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName", "Germany"), + ImmutableList.of() + ); + JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter( + adapter, + originalFilter + ); + Assert.assertEquals(expectedFilterSplit, actualFilterSplit); + + JoinTestHelper.verifyCursors( + adapter.makeCursors( + originalFilter, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ), + ImmutableList.of( + "page", + "countryIsoCode", + "countryNumber", + FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryIsoCode", + FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName", + FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryNumber" + ), + ImmutableList.of( + new Object[]{"Diskussion:Sebastian Schulz", "DE", 3L, "DE", "Germany", 3L} + ) + ); + } + + @Test + public void test_filterPushDown_factToCountryRightWithFilterOnNullColumns() + { + HashJoinSegmentStorageAdapter adapter = new HashJoinSegmentStorageAdapter( + factSegment.asStorageAdapter(), + ImmutableList.of(factToCountryOnIsoCode(JoinType.RIGHT)) + ); + Filter originalFilter = new AndFilter( + ImmutableList.of( + new SelectorFilter("channel", null), + new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName", null) + ) + ); + + JoinFilterSplit expectedFilterSplit = new JoinFilterSplit( + null, + new AndFilter( + ImmutableList.of( + new SelectorFilter("channel", null), + new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName", null) + ) + ), + ImmutableList.of() + ); + JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter( + adapter, + originalFilter + ); + Assert.assertEquals(expectedFilterSplit, actualFilterSplit); + + JoinTestHelper.verifyCursors( + adapter.makeCursors( + originalFilter, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ), + ImmutableList.of( + "page", + "countryIsoCode", + "countryNumber", + FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryIsoCode", + FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName", + FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryNumber" + ), + ImmutableList.of() + ); + } + + @Test + public void test_filterPushDown_factToCountryInnerUsingCountryNumberFilterOnChannelAndCountryName() + { + HashJoinSegmentStorageAdapter adapter = new HashJoinSegmentStorageAdapter( + factSegment.asStorageAdapter(), + ImmutableList.of(factToCountryOnNumber(JoinType.INNER)) + ); + Filter originalFilter = new AndFilter( + ImmutableList.of( + new SelectorFilter("channel", "#en.wikipedia"), + new SelectorFilter(FACT_TO_COUNTRY_ON_NUMBER_PREFIX + "countryName", "Australia") + ) + ); + + JoinFilterSplit expectedFilterSplit = new JoinFilterSplit( + new AndFilter( + ImmutableList.of( + new SelectorFilter("channel", "#en.wikipedia"), + new InDimFilter("countryNumber", ImmutableSet.of("0"), null, null).toFilter() + ) + ), + new SelectorFilter(FACT_TO_COUNTRY_ON_NUMBER_PREFIX + "countryName", "Australia"), + ImmutableList.of() + ); + JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter( + adapter, + originalFilter + ); + Assert.assertEquals(expectedFilterSplit, actualFilterSplit); + + // In non-SQL-compatible mode, we get an extra row, since the 'null' countryNumber for "Talk:Oswald Tilghman" + // is interpreted as 0 (a.k.a. Australia). + JoinTestHelper.verifyCursors( + adapter.makeCursors( + originalFilter, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ), + ImmutableList.of( + "page", + "countryIsoCode", + FACT_TO_COUNTRY_ON_NUMBER_PREFIX + "countryIsoCode", + FACT_TO_COUNTRY_ON_NUMBER_PREFIX + "countryName", + FACT_TO_COUNTRY_ON_NUMBER_PREFIX + "countryNumber" + ), + NullHandling.sqlCompatible() ? + ImmutableList.of( + new Object[]{"Peremptory norm", "AU", "AU", "Australia", 0L} + ) : + ImmutableList.of( + new Object[]{"Talk:Oswald Tilghman", null, "AU", "Australia", 0L}, + new Object[]{"Peremptory norm", "AU", "AU", "Australia", 0L} + ) + ); + } + + @Test + public void test_filterPushDown_factToCountryInnerUsingCountryNumberFilterOnNulls() + { + HashJoinSegmentStorageAdapter adapter = new HashJoinSegmentStorageAdapter( + factSegment.asStorageAdapter(), + ImmutableList.of(factToCountryOnNumber(JoinType.INNER)) + ); + Filter originalFilter = new AndFilter( + ImmutableList.of( + new SelectorFilter("channel", null), + new SelectorFilter(FACT_TO_COUNTRY_ON_NUMBER_PREFIX + "countryName", null) + ) + ); + + JoinFilterSplit expectedFilterSplit = new JoinFilterSplit( + null, + new AndFilter( + ImmutableList.of( + new SelectorFilter("channel", null), + new SelectorFilter(FACT_TO_COUNTRY_ON_NUMBER_PREFIX + "countryName", null) + ) + ), + ImmutableList.of() + ); + JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter( + adapter, + originalFilter + ); + Assert.assertEquals(expectedFilterSplit, actualFilterSplit); + + JoinTestHelper.verifyCursors( + adapter.makeCursors( + originalFilter, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ), + ImmutableList.of( + "page", + "countryIsoCode", + FACT_TO_COUNTRY_ON_NUMBER_PREFIX + "countryIsoCode", + FACT_TO_COUNTRY_ON_NUMBER_PREFIX + "countryName", + FACT_TO_COUNTRY_ON_NUMBER_PREFIX + "countryNumber" + ), + ImmutableList.of() + ); + } + + @Test + public void test_filterPushDown_factToCountryFullWithFilterOnChannelAndCountryName() + { + HashJoinSegmentStorageAdapter adapter = new HashJoinSegmentStorageAdapter( + factSegment.asStorageAdapter(), + ImmutableList.of(factToCountryOnIsoCode(JoinType.FULL)) + ); + Filter filter = new AndFilter( + ImmutableList.of( + new SelectorFilter("channel", "#es.wikipedia"), + new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName", "El Salvador") + ) + ); + + JoinFilterSplit expectedFilterSplit = new JoinFilterSplit( + new AndFilter( + ImmutableList.of( + new SelectorFilter("channel", "#es.wikipedia"), + new InDimFilter("countryIsoCode", ImmutableSet.of("SV"), null, null).toFilter() + ) + ), + new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName", "El Salvador"), + ImmutableList.of() + ); + JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter( + adapter, + filter + ); + Assert.assertEquals(expectedFilterSplit, actualFilterSplit); + + JoinTestHelper.verifyCursors( + adapter.makeCursors( + filter, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ), + ImmutableList.of( + "page", + "countryIsoCode", + "countryNumber", + FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryIsoCode", + FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName", + FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryNumber" + ), + ImmutableList.of( + new Object[]{"Wendigo", "SV", 12L, "SV", "El Salvador", 12L} + ) + ); + } + + @Test + public void test_filterPushDown_factToCountryFullWithFilterOnNulls() + { + HashJoinSegmentStorageAdapter adapter = new HashJoinSegmentStorageAdapter( + factSegment.asStorageAdapter(), + ImmutableList.of(factToCountryOnIsoCode(JoinType.FULL)) + ); + Filter originalFilter = new AndFilter( + ImmutableList.of( + new SelectorFilter("channel", null), + new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName", null) + ) + ); + + JoinFilterSplit expectedFilterSplit = new JoinFilterSplit( + null, + new AndFilter( + ImmutableList.of( + new SelectorFilter("channel", null), + new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName", null) + ) + ), + ImmutableList.of() + ); + JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter( + adapter, + originalFilter + ); + Assert.assertEquals(expectedFilterSplit, actualFilterSplit); + + JoinTestHelper.verifyCursors( + adapter.makeCursors( + originalFilter, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ), + ImmutableList.of( + "page", + "countryIsoCode", + "countryNumber", + FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryIsoCode", + FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName", + FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryNumber" + ), + ImmutableList.of() + ); + } + + + @Test + public void test_filterPushDown_factToRegionTwoColumnsToOneRHSColumnAndFilterOnRHS() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Cannot build hash-join matcher on non-key-based condition: Equality{leftExpr=countryIsoCode, rightColumn='regionIsoCode_0'}"); + + JoinableClause factExprToRegon = new JoinableClause( + FACT_TO_REGION_PREFIX, + new IndexedTableJoinable(regionsTable), + JoinType.LEFT, + JoinConditionAnalysis.forExpression( + StringUtils.format( + "\"%sregionIsoCode\" == regionIsoCode && \"%sregionIsoCode\" == countryIsoCode", + FACT_TO_REGION_PREFIX, + FACT_TO_REGION_PREFIX + ), + FACT_TO_REGION_PREFIX, + ExprMacroTable.nil() + ) + ); + + HashJoinSegmentStorageAdapter adapter = new HashJoinSegmentStorageAdapter( + factSegment.asStorageAdapter(), + ImmutableList.of( + factExprToRegon + ) + ); + Filter originalFilter = new SelectorFilter("r1.regionName", "Blah"); + + JoinFilterSplit expectedFilterSplit = new JoinFilterSplit( + null, + new SelectorFilter("r1.regionName", "Blah"), + ImmutableList.of() + ); + JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter( + adapter, + originalFilter + ); + Assert.assertEquals(expectedFilterSplit, actualFilterSplit); + + JoinTestHelper.verifyCursors( + adapter.makeCursors( + originalFilter, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ), + ImmutableList.of( + "page", + FACT_TO_REGION_PREFIX + "regionName", + REGION_TO_COUNTRY_PREFIX + "countryName" + ), + ImmutableList.of() + ); + } + + + @Test + public void test_JoinFilterSplit_equals() + { + EqualsVerifier.forClass(JoinFilterSplit.class) + .usingGetClass() + .withNonnullFields("baseTableFilter", "pushDownVirtualColumns") + .verify(); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/join/table/RowBasedIndexedTableTest.java b/processing/src/test/java/org/apache/druid/segment/join/table/RowBasedIndexedTableTest.java index fed0a701985..8bcbe5ea82c 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/table/RowBasedIndexedTableTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/table/RowBasedIndexedTableTest.java @@ -93,7 +93,7 @@ public class RowBasedIndexedTableTest @Test public void test_numRows_countries() { - Assert.assertEquals(15, countriesTable.numRows()); + Assert.assertEquals(17, countriesTable.numRows()); } @Test diff --git a/processing/src/test/resources/wikipedia/countries.json b/processing/src/test/resources/wikipedia/countries.json index ac813e60b6c..6c71bb3bc47 100644 --- a/processing/src/test/resources/wikipedia/countries.json +++ b/processing/src/test/resources/wikipedia/countries.json @@ -13,3 +13,5 @@ {"countryNumber":12,"countryIsoCode":"SV","countryName":"El Salvador"} {"countryNumber":13,"countryIsoCode":"US","countryName":"United States"} {"countryNumber":14,"countryIsoCode":"AX","countryName":"Atlantis"} +{"countryNumber":15,"countryIsoCode":"SU","countryName":"States United"} +{"countryNumber":16,"countryIsoCode":"USCA","countryName":"Usca"} diff --git a/processing/src/test/resources/wikipedia/data.json b/processing/src/test/resources/wikipedia/data.json index 026e39c1870..eac2911afad 100644 --- a/processing/src/test/resources/wikipedia/data.json +++ b/processing/src/test/resources/wikipedia/data.json @@ -24,3 +24,5 @@ {"time":"2015-09-12T01:01:00.474Z","channel":"#ru.wikipedia","regionIsoCode":"20","countryNumber":11,"countryIsoCode":"NO","user":"85.113.179.226","delta":48,"isRobot":false,"isAnonymous":true,"page":"Алиса в Зазеркалье","namespace":"Main"} {"time":"2015-09-12T01:02:08.440Z","channel":"#es.wikipedia","regionIsoCode":"G","countryNumber":4,"countryIsoCode":"EC","user":"181.39.132.136","delta":29,"isRobot":false,"isAnonymous":true,"page":"Gabinete Ministerial de Rafael Correa","namespace":"Main"} {"time":"2015-09-12T02:33:40.059Z","channel":"#en.wikipedia","regionIsoCode":"VA","countryNumber":13,"countryIsoCode":"US","user":"68.100.166.227","delta":14,"isRobot":false,"isAnonymous":true,"page":"Old Anatolian Turkish","namespace":"Main"} +{"time":"2015-09-12T03:43:40.059Z","channel":"#en.wikipedia","regionIsoCode":"AV","countryNumber":15,"countryIsoCode":"SU","user":"68.100.166.227","delta":14,"isRobot":false,"isAnonymous":true,"page":"Cream Soda","namespace":"Main"} +{"time":"2015-09-12T04:43:40.059Z","channel":"#en.wikipedia","regionIsoCode":"MatchNothing","countryNumber":99,"countryIsoCode":"MatchNothing","user":"68.100.166.227","delta":14,"isRobot":false,"isAnonymous":true,"page":"Orange Soda","namespace":"Main"} diff --git a/processing/src/test/resources/wikipedia/regions.json b/processing/src/test/resources/wikipedia/regions.json index 2f22bfa9966..6d44b519f85 100644 --- a/processing/src/test/resources/wikipedia/regions.json +++ b/processing/src/test/resources/wikipedia/regions.json @@ -15,3 +15,5 @@ {"regionIsoCode":"SS","countryIsoCode":"SV","regionName":"Departamento de San Salvador"} {"regionIsoCode":"VA","countryIsoCode":"IT","regionName":"Provincia di Varese"} {"regionIsoCode":"VA","countryIsoCode":"US","regionName":"Virginia"} +{"regionIsoCode":"AV","countryIsoCode":"SU","regionName":"Ainigriv"} +{"regionIsoCode":"ZZ","countryIsoCode":"USCA","regionName":"Usca City"}