Join filter pushdown initial implementation (#9301)

* Join filter pushdown initial implementation

* Fix test and spotbugs check

* Address PR comments

* More PR comments

* Address some PR comments

* Address more PR comments

* Fix TC failures and address PR comments
This commit is contained in:
Jonathan Wei 2020-02-07 16:23:37 -08:00 committed by GitHub
parent e81230f9ab
commit ad8afc565c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 2682 additions and 190 deletions

View File

@ -41,6 +41,7 @@ import org.apache.druid.segment.column.DictionaryEncodedColumn;
import org.apache.druid.segment.column.NumericColumn;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.filter.AndFilter;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.vector.VectorCursor;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -430,15 +431,6 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
}
}
final Filter postFilter;
if (postFilters.size() == 0) {
postFilter = null;
} else if (postFilters.size() == 1) {
postFilter = postFilters.get(0);
} else {
postFilter = new AndFilter(postFilters);
}
if (queryMetrics != null) {
queryMetrics.preFilters(preFilters);
queryMetrics.postFilters(postFilters);
@ -446,7 +438,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
queryMetrics.reportPreFilteredRows(preFilteredRows);
}
return new FilterAnalysis(preFilterBitmap, postFilter);
return new FilterAnalysis(preFilterBitmap, Filters.and(postFilters));
}
@VisibleForTesting

View File

@ -39,6 +39,7 @@ import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
/**
*/
@ -234,4 +235,23 @@ public class AndFilter implements BooleanFilter
}
};
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AndFilter andFilter = (AndFilter) o;
return Objects.equals(getFilters(), andFilter.getFilters());
}
@Override
public int hashCode()
{
return Objects.hash(getFilters());
}
}

View File

@ -47,6 +47,7 @@ import org.apache.druid.segment.column.BitmapIndex;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import java.util.Comparator;
import java.util.Objects;
import java.util.Set;
public class BoundFilter implements Filter
@ -306,4 +307,26 @@ public class BoundFilter implements Filter
}
return (lowerComparing >= 0) && (upperComparing >= 0);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
BoundFilter that = (BoundFilter) o;
return Objects.equals(boundDimFilter, that.boundDimFilter) &&
Objects.equals(comparator, that.comparator) &&
Objects.equals(extractionFn, that.extractionFn) &&
Objects.equals(filterTuning, that.filterTuning);
}
@Override
public int hashCode()
{
return Objects.hash(boundDimFilter, comparator, extractionFn, filterTuning);
}
}

View File

@ -641,4 +641,26 @@ public class Filters
}
return false;
}
/**
* Create a filter representing an AND relationship across a list of filters.
*
* @param filterList List of filters
* @return If filterList has more than one element, return an AND filter composed of the filters from filterList
* If filterList has a single element, return that element alone
* If filterList is empty, return null
*/
@Nullable
public static Filter and(List<Filter> filterList)
{
if (filterList.isEmpty()) {
return null;
}
if (filterList.size() == 1) {
return filterList.get(0);
}
return new AndFilter(filterList);
}
}

View File

@ -45,6 +45,7 @@ import org.apache.druid.segment.column.BitmapIndex;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
/**
@ -235,4 +236,26 @@ public class InFilter implements Filter
}
};
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
InFilter inFilter = (InFilter) o;
return Objects.equals(dimension, inFilter.dimension) &&
Objects.equals(values, inFilter.values) &&
Objects.equals(extractionFn, inFilter.extractionFn) &&
Objects.equals(filterTuning, inFilter.filterTuning);
}
@Override
public int hashCode()
{
return Objects.hash(dimension, values, extractionFn, filterTuning);
}
}

View File

@ -39,6 +39,7 @@ import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
/**
*/
@ -218,4 +219,23 @@ public class OrFilter implements BooleanFilter
}
};
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
OrFilter orFilter = (OrFilter) o;
return Objects.equals(getFilters(), orFilter.getFilters());
}
@Override
public int hashCode()
{
return Objects.hash(getFilters());
}
}

View File

@ -34,6 +34,7 @@ import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import javax.annotation.Nullable;
import java.util.Objects;
import java.util.Set;
/**
@ -127,4 +128,35 @@ public class SelectorFilter implements Filter
{
return StringUtils.format("%s = %s", dimension, value);
}
public String getDimension()
{
return dimension;
}
public String getValue()
{
return value;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SelectorFilter that = (SelectorFilter) o;
return Objects.equals(getDimension(), that.getDimension()) &&
Objects.equals(getValue(), that.getValue()) &&
Objects.equals(filterTuning, that.filterTuning);
}
@Override
public int hashCode()
{
return Objects.hash(getDimension(), getValue(), filterTuning);
}
}

View File

@ -35,6 +35,8 @@ import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.data.ListIndexed;
import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
import org.apache.druid.segment.join.filter.JoinFilterSplit;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -223,13 +225,19 @@ public class HashJoinSegmentStorageAdapter implements StorageAdapter
}
}
JoinFilterSplit joinFilterSplit = JoinFilterAnalyzer.splitFilter(
this,
filter
);
preJoinVirtualColumns.addAll(joinFilterSplit.getPushDownVirtualColumns());
// Soon, we will need a way to push filters past a join when possible. This could potentially be done right here
// (by splitting out pushable pieces of 'filter') or it could be done at a higher level (i.e. in the SQL planner).
//
// If it's done in the SQL planner, that will likely mean adding a 'baseFilter' parameter to this class that would
// be passed in to the below baseAdapter.makeCursors call (instead of the null filter).
final Sequence<Cursor> baseCursorSequence = baseAdapter.makeCursors(
null,
joinFilterSplit.getBaseTableFilter().isPresent() ? joinFilterSplit.getBaseTableFilter().get() : null,
interval,
VirtualColumns.create(preJoinVirtualColumns),
gran,
@ -246,16 +254,25 @@ public class HashJoinSegmentStorageAdapter implements StorageAdapter
retVal = HashJoinEngine.makeJoinCursor(retVal, clause);
}
return PostJoinCursor.wrap(retVal, VirtualColumns.create(postJoinVirtualColumns), filter);
return PostJoinCursor.wrap(
retVal,
VirtualColumns.create(postJoinVirtualColumns),
joinFilterSplit.getJoinTableFilter().isPresent() ? joinFilterSplit.getJoinTableFilter().get() : null
);
}
);
}
public List<JoinableClause> getClauses()
{
return clauses;
}
/**
* Returns whether "column" will be selected from "baseAdapter". This is true if it is not shadowed by any joinables
* (i.e. if it does not start with any of their prefixes).
*/
private boolean isBaseColumn(final String column)
public boolean isBaseColumn(final String column)
{
return !getClauseForColumn(column).isPresent();
}

View File

@ -24,6 +24,7 @@ import org.apache.druid.segment.column.ColumnCapabilities;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Set;
/**
* Represents something that can be the right-hand side of a join.
@ -75,4 +76,19 @@ public interface Joinable
JoinConditionAnalysis condition,
boolean remainderNeeded
);
/**
* Searches a column from this Joinable for a particular value, finds rows that match,
* and returns values of a second column for those rows.
*
* @param searchColumnName Name of the search column
* @param searchColumnValue Target value of the search column
* @param retrievalColumnName The column to retrieve values from
* @return The set of correlated column values. If we cannot determine correlated values, return an empty set.
*/
Set<String> getCorrelatedColumnValues(
String searchColumnName,
String searchColumnValue,
String retrievalColumnName
);
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join.filter;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
public class AllNullColumnSelectorFactory implements ColumnSelectorFactory
{
@Override
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
{
return DimensionSelector.constant(null);
}
@Override
public ColumnValueSelector<?> makeColumnValueSelector(String columnName)
{
return NilColumnValueSelector.instance();
}
@Override
public ColumnCapabilities getColumnCapabilities(String columnName)
{
return null;
}
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join.filter;
import com.google.common.collect.ImmutableList;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.VirtualColumn;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Optional;
/**
* Holds information about:
* - whether a filter can be pushed down
* - if it needs to be retained after the join,
* - a reference to the original filter
* - a potentially rewritten filter to be pushed down to the base table
* - a list of virtual columns that need to be created on the base table to support the pushed down filter
*/
public class JoinFilterAnalysis
{
private final boolean retainAfterJoin;
private final Filter originalFilter;
private final Optional<Filter> pushDownFilter;
private final List<VirtualColumn> pushDownVirtualColumns;
public JoinFilterAnalysis(
boolean retainAfterJoin,
Filter originalFilter,
@Nullable Filter pushDownFilter,
List<VirtualColumn> pushDownVirtualColumns
)
{
this.retainAfterJoin = retainAfterJoin;
this.originalFilter = originalFilter;
this.pushDownFilter = pushDownFilter == null ? Optional.empty() : Optional.of(pushDownFilter);
this.pushDownVirtualColumns = pushDownVirtualColumns;
}
public boolean isCanPushDown()
{
return pushDownFilter.isPresent();
}
public boolean isRetainAfterJoin()
{
return retainAfterJoin;
}
public Filter getOriginalFilter()
{
return originalFilter;
}
public Optional<Filter> getPushDownFilter()
{
return pushDownFilter;
}
public List<VirtualColumn> getPushDownVirtualColumns()
{
return pushDownVirtualColumns;
}
/**
* Utility method for generating an analysis that represents: "Filter cannot be pushed down"
*
* @param originalFilter The original filter which cannot be pushed down
*
* @return analysis that represents: "Filter cannot be pushed down"
*/
public static JoinFilterAnalysis createNoPushdownFilterAnalysis(Filter originalFilter)
{
return new JoinFilterAnalysis(
true,
originalFilter,
null,
ImmutableList.of()
);
}
}

View File

@ -0,0 +1,521 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join.filter;
import com.google.common.collect.ImmutableList;
import org.apache.druid.math.expr.Expr;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.filter.InDimFilter;
import org.apache.druid.query.filter.ValueMatcher;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.filter.AndFilter;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.filter.OrFilter;
import org.apache.druid.segment.filter.SelectorFilter;
import org.apache.druid.segment.join.Equality;
import org.apache.druid.segment.join.HashJoinSegmentStorageAdapter;
import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.JoinableClause;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
* When there is a filter in a join query, we can sometimes improve performance by applying parts of the filter
* when we first read from the base table instead of after the join.
*
* This class provides a {@link #splitFilter(HashJoinSegmentStorageAdapter, Filter)} method that
* takes a filter and splits it into a portion that should be applied to the base table prior to the join, and a
* portion that should be applied after the join.
*
* The first step of the filter splitting is to convert the fllter into
* https://en.wikipedia.org/wiki/Conjunctive_normal_form (an AND of ORs). This allows us to consider each
* OR clause independently as a candidate for filter push down to the base table.
*
* A filter clause can be pushed down if it meets one of the following conditions:
* - The filter only applies to columns from the base table
* - The filter applies to columns from the join table, and we determine that the filter can be rewritten
* into a filter on columns from the base table
*
* For the second case, where we rewrite filter clauses, the rewritten clause can be less selective than the original,
* so we preserve the original clause in the post-join filtering phase.
*/
public class JoinFilterAnalyzer
{
private static final String PUSH_DOWN_VIRTUAL_COLUMN_NAME_BASE = "JOIN-FILTER-PUSHDOWN-VIRTUAL-COLUMN-";
private static final ColumnSelectorFactory ALL_NULL_COLUMN_SELECTOR_FACTORY = new AllNullColumnSelectorFactory();
public static JoinFilterSplit splitFilter(
HashJoinSegmentStorageAdapter hashJoinSegmentStorageAdapter,
@Nullable Filter originalFilter
)
{
if (originalFilter == null) {
return new JoinFilterSplit(
null,
null,
ImmutableList.of()
);
}
Filter normalizedFilter = Filters.convertToCNF(originalFilter);
// build the prefix and equicondition maps
// We should check that the prefixes do not duplicate or shadow each other. This is not currently implemented,
// but this is tracked at https://github.com/apache/druid/issues/9329
// We should also consider the case where one RHS column is joined to multiple columns:
// https://github.com/apache/druid/issues/9328
Map<String, Expr> equiconditions = new HashMap<>();
Map<String, JoinableClause> prefixes = new HashMap<>();
for (JoinableClause clause : hashJoinSegmentStorageAdapter.getClauses()) {
prefixes.put(clause.getPrefix(), clause);
for (Equality equality : clause.getCondition().getEquiConditions()) {
equiconditions.put(clause.getPrefix() + equality.getRightColumn(), equality.getLeftExpr());
}
}
// List of candidates for pushdown
// CNF normalization will generate either
// - an AND filter with multiple subfilters
// - or a single non-AND subfilter which cannot be split further
List<Filter> normalizedOrClauses;
if (normalizedFilter instanceof AndFilter) {
normalizedOrClauses = ((AndFilter) normalizedFilter).getFilters();
} else {
normalizedOrClauses = Collections.singletonList(normalizedFilter);
}
// Pushdown filters, rewriting if necessary
List<Filter> leftFilters = new ArrayList<>();
List<Filter> rightFilters = new ArrayList<>();
List<VirtualColumn> pushDownVirtualColumns = new ArrayList<>();
Map<String, Optional<List<JoinFilterColumnCorrelationAnalysis>>> correlationCache = new HashMap<>();
for (Filter orClause : normalizedOrClauses) {
JoinFilterAnalysis joinFilterAnalysis = analyzeJoinFilterClause(
hashJoinSegmentStorageAdapter,
orClause,
prefixes,
equiconditions,
correlationCache
);
if (joinFilterAnalysis.isCanPushDown()) {
leftFilters.add(joinFilterAnalysis.getPushDownFilter().get());
if (!joinFilterAnalysis.getPushDownVirtualColumns().isEmpty()) {
pushDownVirtualColumns.addAll(joinFilterAnalysis.getPushDownVirtualColumns());
}
}
if (joinFilterAnalysis.isRetainAfterJoin()) {
rightFilters.add(joinFilterAnalysis.getOriginalFilter());
}
}
return new JoinFilterSplit(
Filters.and(leftFilters),
Filters.and(rightFilters),
pushDownVirtualColumns
);
}
/**
* Analyze a filter clause from a filter that is in conjunctive normal form (AND of ORs).
* The clause is expected to be an OR filter or a leaf filter.
*
* @param adapter Adapter for the join
* @param filterClause Individual filter clause (an OR filter or a leaf filter) from a filter that is in CNF
* @param prefixes Map of table prefixes
* @param equiconditions Equicondition map
* @param correlationCache Cache of column correlation analyses.
*
* @return a JoinFilterAnalysis that contains a possible filter rewrite and information on how to handle the filter.
*/
private static JoinFilterAnalysis analyzeJoinFilterClause(
HashJoinSegmentStorageAdapter adapter,
Filter filterClause,
Map<String, JoinableClause> prefixes,
Map<String, Expr> equiconditions,
Map<String, Optional<List<JoinFilterColumnCorrelationAnalysis>>> correlationCache
)
{
// NULL matching conditions are not currently pushed down.
// They require special consideration based on the join type, and for simplicity of the initial implementation
// this is not currently handled.
if (filterMatchesNull(filterClause)) {
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(filterClause);
}
// Currently we only support rewrites of selector filters and selector filters within OR filters.
if (filterClause instanceof SelectorFilter) {
return rewriteSelectorFilter(
adapter,
(SelectorFilter) filterClause,
prefixes,
equiconditions,
correlationCache
);
}
if (filterClause instanceof OrFilter) {
return rewriteOrFilter(
adapter,
(OrFilter) filterClause,
prefixes,
equiconditions,
correlationCache
);
}
for (String requiredColumn : filterClause.getRequiredColumns()) {
if (!adapter.isBaseColumn(requiredColumn)) {
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(filterClause);
}
}
return new JoinFilterAnalysis(
false,
filterClause,
filterClause,
ImmutableList.of()
);
}
/**
* Potentially rewrite the subfilters of an OR filter so that the whole OR filter can be pushed down to
* the base table.
*
* @param adapter Adapter for the join
* @param orFilter OrFilter to be rewritten
* @param prefixes Map of table prefixes to clauses
* @param equiconditions Map of equiconditions
* @param correlationCache Column correlation analysis cache. This will be potentially modified by adding
* any new column correlation analyses to the cache.
*
* @return A JoinFilterAnalysis indicating how to handle the potentially rewritten filter
*/
private static JoinFilterAnalysis rewriteOrFilter(
HashJoinSegmentStorageAdapter adapter,
OrFilter orFilter,
Map<String, JoinableClause> prefixes,
Map<String, Expr> equiconditions,
Map<String, Optional<List<JoinFilterColumnCorrelationAnalysis>>> correlationCache
)
{
boolean retainRhs = false;
List<Filter> newFilters = new ArrayList<>();
for (Filter filter : orFilter.getFilters()) {
boolean allBaseColumns = true;
for (String requiredColumn : filter.getRequiredColumns()) {
if (!adapter.isBaseColumn(requiredColumn)) {
allBaseColumns = false;
}
}
if (!allBaseColumns) {
retainRhs = true;
if (filter instanceof SelectorFilter) {
JoinFilterAnalysis rewritten = rewriteSelectorFilter(
adapter,
(SelectorFilter) filter,
prefixes,
equiconditions,
correlationCache
);
if (!rewritten.isCanPushDown()) {
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(orFilter);
} else {
newFilters.add(rewritten.getPushDownFilter().get());
}
} else {
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(orFilter);
}
} else {
newFilters.add(filter);
}
}
return new JoinFilterAnalysis(
retainRhs,
orFilter,
new OrFilter(newFilters),
ImmutableList.of()
);
}
/**
* Rewrites a selector filter on a join table into an IN filter on the base table.
*
* @param baseAdapter The adapter for the join
* @param selectorFilter SelectorFilter to be rewritten
* @param prefixes Map of join table prefixes to clauses
* @param equiconditions Map of equiconditions
* @param correlationCache Cache of column correlation analyses. This will be potentially modified by adding
* any new column correlation analyses to the cache.
*
* @return A JoinFilterAnalysis that indicates how to handle the potentially rewritten filter
*/
private static JoinFilterAnalysis rewriteSelectorFilter(
HashJoinSegmentStorageAdapter baseAdapter,
SelectorFilter selectorFilter,
Map<String, JoinableClause> prefixes,
Map<String, Expr> equiconditions,
Map<String, Optional<List<JoinFilterColumnCorrelationAnalysis>>> correlationCache
)
{
String filteringColumn = selectorFilter.getDimension();
for (Map.Entry<String, JoinableClause> prefixAndClause : prefixes.entrySet()) {
if (prefixAndClause.getValue().includesColumn(filteringColumn)) {
Optional<List<JoinFilterColumnCorrelationAnalysis>> correlations = correlationCache.computeIfAbsent(
prefixAndClause.getKey(),
p -> findCorrelatedBaseTableColumns(
baseAdapter,
p,
prefixes.get(p),
equiconditions
)
);
if (!correlations.isPresent()) {
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(selectorFilter);
}
List<Filter> newFilters = new ArrayList<>();
List<VirtualColumn> pushdownVirtualColumns = new ArrayList<>();
for (JoinFilterColumnCorrelationAnalysis correlationAnalysis : correlations.get()) {
if (correlationAnalysis.supportsPushDown()) {
Set<String> correlatedValues = getCorrelatedValuesForPushDown(
selectorFilter.getDimension(),
selectorFilter.getValue(),
correlationAnalysis.getJoinColumn(),
prefixAndClause.getValue()
);
if (correlatedValues.isEmpty()) {
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(selectorFilter);
}
for (String correlatedBaseColumn : correlationAnalysis.getBaseColumns()) {
Filter rewrittenFilter = new InDimFilter(
correlatedBaseColumn,
correlatedValues,
null,
null
).toFilter();
newFilters.add(rewrittenFilter);
}
for (Expr correlatedBaseExpr : correlationAnalysis.getBaseExpressions()) {
// We need to create a virtual column for the expressions when pushing down.
// Note that this block is never entered right now, since correlationAnalysis.supportsPushDown()
// will return false if there any correlated expressions on the base table.
// Pushdown of such filters is disabled until the expressions system supports converting an expression
// into a String representation that can be reparsed into the same expression.
// https://github.com/apache/druid/issues/9326 tracks this expressions issue.
String vcName = getCorrelatedBaseExprVirtualColumnName(pushdownVirtualColumns.size());
VirtualColumn correlatedBaseExprVirtualColumn = new ExpressionVirtualColumn(
vcName,
correlatedBaseExpr,
ValueType.STRING
);
pushdownVirtualColumns.add(correlatedBaseExprVirtualColumn);
Filter rewrittenFilter = new InDimFilter(
vcName,
correlatedValues,
null,
null
).toFilter();
newFilters.add(rewrittenFilter);
}
}
}
if (newFilters.isEmpty()) {
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(selectorFilter);
}
return new JoinFilterAnalysis(
true,
selectorFilter,
Filters.and(newFilters),
pushdownVirtualColumns
);
}
}
return new JoinFilterAnalysis(
false,
selectorFilter,
selectorFilter,
ImmutableList.of()
);
}
private static String getCorrelatedBaseExprVirtualColumnName(int counter)
{
// May want to have this check other column names to absolutely prevent name conflicts
return PUSH_DOWN_VIRTUAL_COLUMN_NAME_BASE + counter;
}
/**
* Helper method for rewriting filters on join table columns into filters on base table columns.
*
* @param filterColumn A join table column that we're filtering on
* @param filterValue The value to filter on
* @param correlatedJoinColumn A join table column that appears as the RHS of an equicondition, which we can correlate
* with a column on the base table
* @param clauseForFilteredTable The joinable clause that corresponds to the join table being filtered on
*
* @return A list of values of the correlatedJoinColumn that appear in rows where filterColumn = filterValue
* Returns an empty set if we cannot determine the correlated values.
*/
private static Set<String> getCorrelatedValuesForPushDown(
String filterColumn,
String filterValue,
String correlatedJoinColumn,
JoinableClause clauseForFilteredTable
)
{
String filterColumnNoPrefix = filterColumn.substring(clauseForFilteredTable.getPrefix().length());
String correlatedColumnNoPrefix = correlatedJoinColumn.substring(clauseForFilteredTable.getPrefix().length());
return clauseForFilteredTable.getJoinable().getCorrelatedColumnValues(
filterColumnNoPrefix,
filterValue,
correlatedColumnNoPrefix
);
}
/**
* For each rhs column that appears in the equiconditions for a table's JoinableClause,
* we try to determine what base table columns are related to the rhs column through the total set of equiconditions.
* We do this by searching backwards through the chain of join equiconditions using the provided equicondition map.
*
* For example, suppose we have 3 tables, A,B,C, joined with the following conditions, where A is the base table:
* A.joinColumn == B.joinColumn
* B.joinColum == C.joinColumn
*
* We would determine that C.joinColumn is correlated with A.joinColumn: we first see that
* C.joinColumn is linked to B.joinColumn which in turn is linked to A.joinColumn
*
* Suppose we had the following join conditions instead:
* f(A.joinColumn) == B.joinColumn
* B.joinColum == C.joinColumn
* In this case, the JoinFilterColumnCorrelationAnalysis for C.joinColumn would be linked to f(A.joinColumn).
*
* Suppose we had the following join conditions instead:
* A.joinColumn == B.joinColumn
* f(B.joinColum) == C.joinColumn
*
* Because we cannot reverse the function f() applied to the second table B in all cases,
* we cannot relate C.joinColumn to A.joinColumn, and we would not generate a correlation for C.joinColumn
*
* @param adapter The adapter for the join. Used to determine if a column is a base table column.
* @param tablePrefix Prefix for a join table
* @param clauseForTablePrefix Joinable clause for the prefix
* @param equiConditions Map of equiconditions, keyed by the right hand columns
*
* @return A list of correlatation analyses for the equicondition RHS columns that reside in the table associated with
* the tablePrefix
*/
private static Optional<List<JoinFilterColumnCorrelationAnalysis>> findCorrelatedBaseTableColumns(
HashJoinSegmentStorageAdapter adapter,
String tablePrefix,
JoinableClause clauseForTablePrefix,
Map<String, Expr> equiConditions
)
{
JoinConditionAnalysis jca = clauseForTablePrefix.getCondition();
List<String> rhsColumns = new ArrayList<>();
for (Equality eq : jca.getEquiConditions()) {
rhsColumns.add(tablePrefix + eq.getRightColumn());
}
List<JoinFilterColumnCorrelationAnalysis> correlations = new ArrayList<>();
for (String rhsColumn : rhsColumns) {
List<String> correlatedBaseColumns = new ArrayList<>();
List<Expr> correlatedBaseExpressions = new ArrayList<>();
boolean terminate = false;
String findMappingFor = rhsColumn;
while (!terminate) {
Expr lhs = equiConditions.get(findMappingFor);
if (lhs == null) {
break;
}
String identifier = lhs.getBindingIfIdentifier();
if (identifier == null) {
// We push down if the function only requires base table columns
Expr.BindingDetails bindingDetails = lhs.analyzeInputs();
Set<String> requiredBindings = bindingDetails.getRequiredBindings();
if (!requiredBindings.stream().allMatch(requiredBinding -> adapter.isBaseColumn(requiredBinding))) {
return Optional.empty();
}
terminate = true;
correlatedBaseExpressions.add(lhs);
} else {
// simple identifier, see if we can correlate it with a column on the base table
findMappingFor = identifier;
if (adapter.isBaseColumn(identifier)) {
terminate = true;
correlatedBaseColumns.add(findMappingFor);
}
}
}
if (correlatedBaseColumns.isEmpty() && correlatedBaseExpressions.isEmpty()) {
return Optional.empty();
}
// We should merge correlation analyses if they're for the same rhsColumn
// See https://github.com/apache/druid/issues/9328
correlations.add(
new JoinFilterColumnCorrelationAnalysis(
rhsColumn,
correlatedBaseColumns,
correlatedBaseExpressions
)
);
}
return Optional.of(correlations);
}
private static boolean filterMatchesNull(Filter filter)
{
ValueMatcher valueMatcher = filter.makeMatcher(ALL_NULL_COLUMN_SELECTOR_FACTORY);
return valueMatcher.matches();
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join.filter;
import org.apache.druid.math.expr.Expr;
import java.util.List;
/**
* Represents an analysis of what base table columns, if any, can be correlated with a column that will
* be filtered on.
* <p>
* For example, if we're joining on a base table via the equiconditions (id = j.id AND f(id2) = j.id2),
* then we can correlate j.id with id (base table column) and j.id2 with f(id2) (a base table expression).
*/
public class JoinFilterColumnCorrelationAnalysis
{
private final String joinColumn;
private final List<String> baseColumns;
private final List<Expr> baseExpressions;
public JoinFilterColumnCorrelationAnalysis(
String joinColumn,
List<String> baseColumns,
List<Expr> baseExpressions
)
{
this.joinColumn = joinColumn;
this.baseColumns = baseColumns;
this.baseExpressions = baseExpressions;
}
public String getJoinColumn()
{
return joinColumn;
}
public List<String> getBaseColumns()
{
return baseColumns;
}
public List<Expr> getBaseExpressions()
{
return baseExpressions;
}
public boolean supportsPushDown()
{
return !baseColumns.isEmpty() && baseExpressions.isEmpty();
}
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join.filter;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.VirtualColumn;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/**
* Holds the result of splitting a filter into:
* - a portion that can be pushed down to the base table
* - a portion that will be applied post-join
* - additional virtual columns that need to be created on the base table to support the pushed down filters.
*/
public class JoinFilterSplit
{
final Optional<Filter> baseTableFilter;
final Optional<Filter> joinTableFilter;
final List<VirtualColumn> pushDownVirtualColumns;
public JoinFilterSplit(
@Nullable Filter baseTableFilter,
@Nullable Filter joinTableFilter,
List<VirtualColumn> pushDownVirtualColumns
)
{
this.baseTableFilter = baseTableFilter == null ? Optional.empty() : Optional.of(baseTableFilter);
this.joinTableFilter = joinTableFilter == null ? Optional.empty() : Optional.of(joinTableFilter);
this.pushDownVirtualColumns = pushDownVirtualColumns;
}
public Optional<Filter> getBaseTableFilter()
{
return baseTableFilter;
}
public Optional<Filter> getJoinTableFilter()
{
return joinTableFilter;
}
public List<VirtualColumn> getPushDownVirtualColumns()
{
return pushDownVirtualColumns;
}
@Override
public String toString()
{
return "JoinFilterSplit{" +
"baseTableFilter=" + baseTableFilter +
", joinTableFilter=" + joinTableFilter +
", pushDownVirtualColumns=" + pushDownVirtualColumns +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
JoinFilterSplit that = (JoinFilterSplit) o;
return Objects.equals(getBaseTableFilter(), that.getBaseTableFilter()) &&
Objects.equals(getJoinTableFilter(), that.getJoinTableFilter()) &&
Objects.equals(getPushDownVirtualColumns(), that.getPushDownVirtualColumns());
}
@Override
public int hashCode()
{
return Objects.hash(getBaseTableFilter(), getJoinTableFilter(), getPushDownVirtualColumns());
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.segment.join.lookup;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.query.lookup.LookupExtractor;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
@ -31,6 +32,7 @@ import org.apache.druid.segment.join.Joinable;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Set;
public class LookupJoinable implements Joinable
{
@ -83,4 +85,28 @@ public class LookupJoinable implements Joinable
{
return LookupJoinMatcher.create(extractor, leftSelectorFactory, condition, remainderNeeded);
}
@Override
public Set<String> getCorrelatedColumnValues(
String searchColumnName,
String searchColumnValue,
String retrievalColumnName
)
{
Set<String> correlatedValues;
if (LookupColumnSelectorFactory.KEY_COLUMN.equals(searchColumnName)) {
if (LookupColumnSelectorFactory.KEY_COLUMN.equals(retrievalColumnName)) {
correlatedValues = ImmutableSet.of(searchColumnValue);
} else {
correlatedValues = ImmutableSet.of(extractor.apply(searchColumnName));
}
} else {
if (LookupColumnSelectorFactory.VALUE_COLUMN.equals(retrievalColumnName)) {
correlatedValues = ImmutableSet.of(searchColumnValue);
} else {
correlatedValues = ImmutableSet.copyOf(extractor.unapply(searchColumnValue));
}
}
return correlatedValues;
}
}

View File

@ -19,6 +19,8 @@
package org.apache.druid.segment.join.table;
import com.google.common.collect.ImmutableSet;
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.join.JoinConditionAnalysis;
@ -26,7 +28,9 @@ import org.apache.druid.segment.join.JoinMatcher;
import org.apache.druid.segment.join.Joinable;
import javax.annotation.Nullable;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class IndexedTableJoinable implements Joinable
{
@ -75,4 +79,41 @@ public class IndexedTableJoinable implements Joinable
remainderNeeded
);
}
@Override
public Set<String> getCorrelatedColumnValues(
String searchColumnName,
String searchColumnValue,
String retrievalColumnName
)
{
int filterColumnPosition = table.allColumns().indexOf(searchColumnName);
int correlatedColumnPosition = table.allColumns().indexOf(retrievalColumnName);
if (filterColumnPosition < 0 || correlatedColumnPosition < 0) {
return ImmutableSet.of();
}
Set<String> correlatedValues = new HashSet<>();
if (table.keyColumns().contains(searchColumnName)) {
IndexedTable.Index index = table.columnIndex(filterColumnPosition);
IndexedTable.Reader reader = table.columnReader(correlatedColumnPosition);
IntList rowIndex = index.find(searchColumnValue);
for (int i = 0; i < rowIndex.size(); i++) {
int rowNum = rowIndex.getInt(i);
correlatedValues.add(reader.read(rowNum).toString());
}
return correlatedValues;
} else {
IndexedTable.Reader dimNameReader = table.columnReader(filterColumnPosition);
IndexedTable.Reader correlatedColumnReader = table.columnReader(correlatedColumnPosition);
for (int i = 0; i < table.numRows(); i++) {
if (searchColumnValue.equals(dimNameReader.read(i).toString())) {
correlatedValues.add(correlatedColumnReader.read(i).toString());
}
}
return correlatedValues;
}
}
}

View File

@ -21,7 +21,9 @@ package org.apache.druid.segment.virtual;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
@ -62,6 +64,23 @@ public class ExpressionVirtualColumn implements VirtualColumn
this.parsedExpression = Suppliers.memoize(() -> Parser.parse(expression, macroTable));
}
/**
* Constructor for creating an ExpressionVirtualColumn from a pre-parsed expression.
*/
public ExpressionVirtualColumn(
String name,
Expr parsedExpression,
ValueType outputType
)
{
this.name = Preconditions.checkNotNull(name, "name");
// Unfortunately this string representation can't be reparsed into the same expression, might be useful
// if the expression system supported that
this.expression = parsedExpression.toString();
this.outputType = outputType != null ? outputType : ValueType.FLOAT;
this.parsedExpression = Suppliers.ofInstance(parsedExpression);
}
@JsonProperty("name")
@Override
public String getOutputName()
@ -81,6 +100,13 @@ public class ExpressionVirtualColumn implements VirtualColumn
return outputType;
}
@JsonIgnore
@VisibleForTesting
public Supplier<Expr> getParsedExpression()
{
return parsedExpression;
}
@Override
public DimensionSelector makeDimensionSelector(
final DimensionSpec dimensionSpec,

View File

@ -22,6 +22,7 @@ package org.apache.druid.segment.filter;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
@ -175,4 +176,10 @@ public class AndFilterTest extends BaseFilterTest
ImmutableList.of("0", "1", "2", "3", "4", "5")
);
}
@Test
public void test_equals()
{
EqualsVerifier.forClass(AndFilter.class).usingGetClass().withNonnullFields("filters").verify();
}
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.segment.filter;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.Pair;
@ -721,4 +722,14 @@ public class BoundFilterTest extends BaseFilterTest
ImmutableList.of("1", "2", "4", "5", "6")
);
}
@Test
public void test_equals()
{
EqualsVerifier.forClass(BoundFilter.class)
.usingGetClass()
.withNonnullFields("boundDimFilter", "comparator")
.withIgnoredFields("longPredicateSupplier", "floatPredicateSupplier", "doublePredicateSupplier")
.verify();
}
}

View File

@ -23,6 +23,7 @@ import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
@ -348,6 +349,16 @@ public class InFilterTest extends BaseFilterTest
}
@Test
public void test_equals()
{
EqualsVerifier.forClass(InFilter.class)
.usingGetClass()
.withNonnullFields("dimension", "values")
.withIgnoredFields("longPredicateSupplier", "floatPredicateSupplier", "doublePredicateSupplier")
.verify();
}
private DimFilter toInFilter(String dim)
{
List<String> emptyList = new ArrayList<>();

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.filter;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.junit.Test;
public class OrFilterTest
{
@Test
public void test_equals()
{
EqualsVerifier.forClass(OrFilter.class).usingGetClass().withNonnullFields("filters").verify();
}
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.segment.filter;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.extraction.MapLookupExtractor;
@ -328,4 +329,10 @@ public class SelectorFilterTest extends BaseFilterTest
assertFilterMatches(new SelectorDimFilter("l0", null, null), ImmutableList.of("3"));
}
}
@Test
public void test_equals()
{
EqualsVerifier.forClass(SelectorFilter.class).usingGetClass().withNonnullFields("dimension").verify();
}
}

View File

@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join;
import com.google.common.collect.ImmutableList;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.lookup.LookupExtractor;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.join.lookup.LookupJoinable;
import org.apache.druid.segment.join.table.IndexedTable;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.timeline.SegmentId;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
public class BaseHashJoinSegmentStorageAdapterTest
{
public static final String FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX = "c1.";
public static final String FACT_TO_COUNTRY_ON_NUMBER_PREFIX = "c2.";
public static final String FACT_TO_REGION_PREFIX = "r1.";
public static final String REGION_TO_COUNTRY_PREFIX = "rtc.";
public static Long NULL_COUNTRY;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
public QueryableIndexSegment factSegment;
public LookupExtractor countryIsoCodeToNameLookup;
public LookupExtractor countryNumberToNameLookup;
public IndexedTable countriesTable;
public IndexedTable regionsTable;
@BeforeClass
public static void setUpStatic()
{
NullHandling.initializeForTests();
NULL_COUNTRY = NullHandling.sqlCompatible() ? null : 0L;
}
@Before
public void setUp() throws IOException
{
factSegment = new QueryableIndexSegment(
JoinTestHelper.createFactIndexBuilder(temporaryFolder.newFolder()).buildMMappedIndex(),
SegmentId.dummy("facts")
);
countryIsoCodeToNameLookup = JoinTestHelper.createCountryIsoCodeToNameLookup();
countryNumberToNameLookup = JoinTestHelper.createCountryNumberToNameLookup();
countriesTable = JoinTestHelper.createCountriesIndexedTable();
regionsTable = JoinTestHelper.createRegionsIndexedTable();
}
@After
public void tearDown()
{
if (factSegment != null) {
factSegment.close();
}
}
protected JoinableClause factToCountryNameUsingIsoCodeLookup(final JoinType joinType)
{
return new JoinableClause(
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
LookupJoinable.wrap(countryIsoCodeToNameLookup),
joinType,
JoinConditionAnalysis.forExpression(
StringUtils.format("\"%sk\" == countryIsoCode", FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX),
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
ExprMacroTable.nil()
)
);
}
protected JoinableClause factToCountryNameUsingNumberLookup(final JoinType joinType)
{
return new JoinableClause(
FACT_TO_COUNTRY_ON_NUMBER_PREFIX,
LookupJoinable.wrap(countryNumberToNameLookup),
joinType,
JoinConditionAnalysis.forExpression(
StringUtils.format("\"%sk\" == countryNumber", FACT_TO_COUNTRY_ON_NUMBER_PREFIX),
FACT_TO_COUNTRY_ON_NUMBER_PREFIX,
ExprMacroTable.nil()
)
);
}
protected JoinableClause factToCountryOnIsoCode(final JoinType joinType)
{
return new JoinableClause(
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
new IndexedTableJoinable(countriesTable),
joinType,
JoinConditionAnalysis.forExpression(
StringUtils.format("\"%scountryIsoCode\" == countryIsoCode", FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX),
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
ExprMacroTable.nil()
)
);
}
protected JoinableClause factToCountryOnNumber(final JoinType joinType)
{
return new JoinableClause(
FACT_TO_COUNTRY_ON_NUMBER_PREFIX,
new IndexedTableJoinable(countriesTable),
joinType,
JoinConditionAnalysis.forExpression(
StringUtils.format("\"%scountryNumber\" == countryNumber", FACT_TO_COUNTRY_ON_NUMBER_PREFIX),
FACT_TO_COUNTRY_ON_NUMBER_PREFIX,
ExprMacroTable.nil()
)
);
}
protected JoinableClause factToRegion(final JoinType joinType)
{
return new JoinableClause(
FACT_TO_REGION_PREFIX,
new IndexedTableJoinable(regionsTable),
joinType,
JoinConditionAnalysis.forExpression(
StringUtils.format(
"\"%sregionIsoCode\" == regionIsoCode && \"%scountryIsoCode\" == countryIsoCode",
FACT_TO_REGION_PREFIX,
FACT_TO_REGION_PREFIX
),
FACT_TO_REGION_PREFIX,
ExprMacroTable.nil()
)
);
}
protected JoinableClause regionToCountry(final JoinType joinType)
{
return new JoinableClause(
REGION_TO_COUNTRY_PREFIX,
new IndexedTableJoinable(countriesTable),
joinType,
JoinConditionAnalysis.forExpression(
StringUtils.format(
"\"%scountryIsoCode\" == \"%scountryIsoCode\"",
FACT_TO_REGION_PREFIX,
REGION_TO_COUNTRY_PREFIX
),
REGION_TO_COUNTRY_PREFIX,
ExprMacroTable.nil()
)
);
}
protected HashJoinSegmentStorageAdapter makeFactToCountrySegment()
{
return new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT))
);
}
protected void compareExpressionVirtualColumns(
ExpressionVirtualColumn expectedVirtualColumn,
ExpressionVirtualColumn actualVirtualColumn
)
{
Assert.assertEquals(
expectedVirtualColumn.getOutputName(),
actualVirtualColumn.getOutputName()
);
Assert.assertEquals(
expectedVirtualColumn.getOutputType(),
actualVirtualColumn.getOutputType()
);
Assert.assertEquals(
expectedVirtualColumn.getParsedExpression().get().toString(),
actualVirtualColumn.getParsedExpression().get().toString()
);
}
}

View File

@ -30,81 +30,24 @@ import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.filter.ExpressionDimFilter;
import org.apache.druid.query.filter.OrDimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.lookup.LookupExtractor;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.join.lookup.LookupJoinable;
import org.apache.druid.segment.join.table.IndexedTable;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.timeline.SegmentId;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.Collections;
public class HashJoinSegmentStorageAdapterTest
public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorageAdapterTest
{
private static final String FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX = "c1.";
private static final String FACT_TO_COUNTRY_ON_NUMBER_PREFIX = "c2.";
private static final String FACT_TO_REGION_PREFIX = "r1.";
private static final String REGION_TO_COUNTRY_PREFIX = "rtc.";
private static Long NULL_COUNTRY;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
public QueryableIndexSegment factSegment;
public LookupExtractor countryIsoCodeToNameLookup;
public LookupExtractor countryNumberToNameLookup;
public IndexedTable countriesTable;
public IndexedTable regionsTable;
@BeforeClass
public static void setUpStatic()
{
NullHandling.initializeForTests();
NULL_COUNTRY = NullHandling.sqlCompatible() ? null : 0L;
}
@Before
public void setUp() throws IOException
{
factSegment = new QueryableIndexSegment(
JoinTestHelper.createFactIndexBuilder(temporaryFolder.newFolder()).buildMMappedIndex(),
SegmentId.dummy("facts")
);
countryIsoCodeToNameLookup = JoinTestHelper.createCountryIsoCodeToNameLookup();
countryNumberToNameLookup = JoinTestHelper.createCountryNumberToNameLookup();
countriesTable = JoinTestHelper.createCountriesIndexedTable();
regionsTable = JoinTestHelper.createRegionsIndexedTable();
}
@After
public void tearDown()
{
if (factSegment != null) {
factSegment.close();
}
}
@Test
public void test_getInterval_factToCountry()
{
Assert.assertEquals(
Intervals.of("2015-09-12/2015-09-12T02:33:40.060Z"),
Intervals.of("2015-09-12/2015-09-12T04:43:40.060Z"),
makeFactToCountrySegment().getInterval()
);
}
@ -145,7 +88,7 @@ public class HashJoinSegmentStorageAdapterTest
public void test_getDimensionCardinality_factToCountryFactColumn()
{
Assert.assertEquals(
15,
17,
makeFactToCountrySegment().getDimensionCardinality("countryIsoCode")
);
}
@ -154,7 +97,7 @@ public class HashJoinSegmentStorageAdapterTest
public void test_getDimensionCardinality_factToCountryJoinColumn()
{
Assert.assertEquals(
15,
17,
makeFactToCountrySegment().getDimensionCardinality(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName")
);
}
@ -190,7 +133,7 @@ public class HashJoinSegmentStorageAdapterTest
public void test_getMaxTime_factToCountry()
{
Assert.assertEquals(
DateTimes.of("2015-09-12T02:33:40.059Z"),
DateTimes.of("2015-09-12T04:43:40.059Z"),
makeFactToCountrySegment().getMaxTime()
);
}
@ -325,7 +268,7 @@ public class HashJoinSegmentStorageAdapterTest
public void test_getMaxIngestedEventTime_factToCountry()
{
Assert.assertEquals(
DateTimes.of("2015-09-12T02:33:40.059Z"),
DateTimes.of("2015-09-12T04:43:40.059Z"),
makeFactToCountrySegment().getMaxIngestedEventTime()
);
}
@ -396,7 +339,9 @@ public class HashJoinSegmentStorageAdapterTest
new Object[]{"Wendigo", "SV", "SV", "El Salvador", 12L},
new Object[]{"Алиса в Зазеркалье", "NO", "NO", "Norway", 11L},
new Object[]{"Gabinete Ministerial de Rafael Correa", "EC", "EC", "Ecuador", 4L},
new Object[]{"Old Anatolian Turkish", "US", "US", "United States", 13L}
new Object[]{"Old Anatolian Turkish", "US", "US", "United States", 13L},
new Object[]{"Cream Soda", "SU", "SU", "States United", 15L},
new Object[]{"Orange Soda", "MatchNothing", null, null, NULL_COUNTRY}
)
);
}
@ -444,7 +389,8 @@ public class HashJoinSegmentStorageAdapterTest
new Object[]{"Wendigo", "SV", "SV", "El Salvador", 12L},
new Object[]{"Алиса в Зазеркалье", "NO", "NO", "Norway", 11L},
new Object[]{"Gabinete Ministerial de Rafael Correa", "EC", "EC", "Ecuador", 4L},
new Object[]{"Old Anatolian Turkish", "US", "US", "United States", 13L}
new Object[]{"Old Anatolian Turkish", "US", "US", "United States", 13L},
new Object[]{"Cream Soda", "SU", "SU", "States United", 15L}
)
);
}
@ -491,7 +437,8 @@ public class HashJoinSegmentStorageAdapterTest
new Object[]{"Wendigo", "SV", "SV", "El Salvador"},
new Object[]{"Алиса в Зазеркалье", "NO", "NO", "Norway"},
new Object[]{"Gabinete Ministerial de Rafael Correa", "EC", "EC", "Ecuador"},
new Object[]{"Old Anatolian Turkish", "US", "US", "United States"}
new Object[]{"Old Anatolian Turkish", "US", "US", "United States"},
new Object[]{"Cream Soda", "SU", "SU", "States United"}
)
);
}
@ -532,7 +479,8 @@ public class HashJoinSegmentStorageAdapterTest
new Object[]{"Carlo Curti", "US", "US", "United States", 13L},
new Object[]{"Giusy Ferreri discography", "IT", "IT", "Italy", 7L},
new Object[]{"Roma-Bangkok", "IT", "IT", "Italy", 7L},
new Object[]{"Old Anatolian Turkish", "US", "US", "United States", 13L}
new Object[]{"Old Anatolian Turkish", "US", "US", "United States", 13L},
new Object[]{"Cream Soda", "SU", "SU", "States United", 15L}
) :
ImmutableList.of(
new Object[]{"Talk:Oswald Tilghman", null, "AU", "Australia", 0L},
@ -545,7 +493,8 @@ public class HashJoinSegmentStorageAdapterTest
new Object[]{"Carlo Curti", "US", "US", "United States", 13L},
new Object[]{"Giusy Ferreri discography", "IT", "IT", "Italy", 7L},
new Object[]{"Roma-Bangkok", "IT", "IT", "Italy", 7L},
new Object[]{"Old Anatolian Turkish", "US", "US", "United States", 13L}
new Object[]{"Old Anatolian Turkish", "US", "US", "United States", 13L},
new Object[]{"Cream Soda", "SU", "SU", "States United", 15L}
)
);
}
@ -584,7 +533,8 @@ public class HashJoinSegmentStorageAdapterTest
new Object[]{"Carlo Curti", "US", "United States"},
new Object[]{"Giusy Ferreri discography", "IT", "Italy"},
new Object[]{"Roma-Bangkok", "IT", "Italy"},
new Object[]{"Old Anatolian Turkish", "US", "United States"}
new Object[]{"Old Anatolian Turkish", "US", "United States"},
new Object[]{"Cream Soda", "SU", "States United"}
) :
ImmutableList.of(
new Object[]{"Talk:Oswald Tilghman", null, "Australia"},
@ -597,7 +547,8 @@ public class HashJoinSegmentStorageAdapterTest
new Object[]{"Carlo Curti", "US", "United States"},
new Object[]{"Giusy Ferreri discography", "IT", "Italy"},
new Object[]{"Roma-Bangkok", "IT", "Italy"},
new Object[]{"Old Anatolian Turkish", "US", "United States"}
new Object[]{"Old Anatolian Turkish", "US", "United States"},
new Object[]{"Cream Soda", "SU", "States United"}
)
);
}
@ -654,7 +605,8 @@ public class HashJoinSegmentStorageAdapterTest
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryNumber"
),
ImmutableList.of(
new Object[]{null, null, NullHandling.sqlCompatible() ? null : 0L, "AX", "Atlantis", 14L}
new Object[]{null, null, NullHandling.sqlCompatible() ? null : 0L, "AX", "Atlantis", 14L},
new Object[]{null, null, NullHandling.sqlCompatible() ? null : 0L, "USCA", "Usca", 16L}
)
);
}
@ -683,7 +635,8 @@ public class HashJoinSegmentStorageAdapterTest
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryNumber"
),
ImmutableList.of(
new Object[]{null, null, NullHandling.sqlCompatible() ? null : 0L, "AX", "Atlantis", 14L}
new Object[]{null, null, NullHandling.sqlCompatible() ? null : 0L, "AX", "Atlantis", 14L},
new Object[]{null, null, NullHandling.sqlCompatible() ? null : 0L, "USCA", "Usca", 16L}
)
);
}
@ -846,7 +799,8 @@ public class HashJoinSegmentStorageAdapterTest
new Object[]{"Wendigo", "SV", "SV", "El Salvador", 12L},
new Object[]{"Алиса в Зазеркалье", "NO", "NO", "Norway", 11L},
new Object[]{"Gabinete Ministerial de Rafael Correa", "EC", "EC", "Ecuador", 4L},
new Object[]{"Old Anatolian Turkish", "US", "US", "United States", 13L}
new Object[]{"Old Anatolian Turkish", "US", "US", "United States", 13L},
new Object[]{"Cream Soda", "SU", "SU", "States United", 15L}
)
);
}
@ -900,7 +854,9 @@ public class HashJoinSegmentStorageAdapterTest
new Object[]{"Wendigo", "Departamento de San Salvador", "El Salvador"},
new Object[]{"Алиса в Зазеркалье", "Finnmark Fylke", "Norway"},
new Object[]{"Gabinete Ministerial de Rafael Correa", "Provincia del Guayas", "Ecuador"},
new Object[]{"Old Anatolian Turkish", "Virginia", "United States"}
new Object[]{"Old Anatolian Turkish", "Virginia", "United States"},
new Object[]{"Cream Soda", "Ainigriv", "States United"},
new Object[]{"Orange Soda", null, null}
)
);
}
@ -950,7 +906,9 @@ public class HashJoinSegmentStorageAdapterTest
new Object[]{"Diskussion:Sebastian Schulz", "Norway"},
new Object[]{"Diskussion:Sebastian Schulz", "El Salvador"},
new Object[]{"Diskussion:Sebastian Schulz", "United States"},
new Object[]{"Diskussion:Sebastian Schulz", "Atlantis"}
new Object[]{"Diskussion:Sebastian Schulz", "Atlantis"},
new Object[]{"Diskussion:Sebastian Schulz", "States United"},
new Object[]{"Diskussion:Sebastian Schulz", "Usca"}
)
);
}
@ -1036,7 +994,9 @@ public class HashJoinSegmentStorageAdapterTest
new Object[]{"Diskussion:Sebastian Schulz", "Norway"},
new Object[]{"Diskussion:Sebastian Schulz", "El Salvador"},
new Object[]{"Diskussion:Sebastian Schulz", "United States"},
new Object[]{"Diskussion:Sebastian Schulz", "Atlantis"}
new Object[]{"Diskussion:Sebastian Schulz", "Atlantis"},
new Object[]{"Diskussion:Sebastian Schulz", "States United"},
new Object[]{"Diskussion:Sebastian Schulz", "Usca"}
)
);
}
@ -1287,104 +1247,4 @@ public class HashJoinSegmentStorageAdapterTest
ImmutableList.of()
);
}
private JoinableClause factToCountryNameUsingIsoCodeLookup(final JoinType joinType)
{
return new JoinableClause(
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
LookupJoinable.wrap(countryIsoCodeToNameLookup),
joinType,
JoinConditionAnalysis.forExpression(
StringUtils.format("\"%sk\" == countryIsoCode", FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX),
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
ExprMacroTable.nil()
)
);
}
private JoinableClause factToCountryNameUsingNumberLookup(final JoinType joinType)
{
return new JoinableClause(
FACT_TO_COUNTRY_ON_NUMBER_PREFIX,
LookupJoinable.wrap(countryNumberToNameLookup),
joinType,
JoinConditionAnalysis.forExpression(
StringUtils.format("\"%sk\" == countryNumber", FACT_TO_COUNTRY_ON_NUMBER_PREFIX),
FACT_TO_COUNTRY_ON_NUMBER_PREFIX,
ExprMacroTable.nil()
)
);
}
private JoinableClause factToCountryOnIsoCode(final JoinType joinType)
{
return new JoinableClause(
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
new IndexedTableJoinable(countriesTable),
joinType,
JoinConditionAnalysis.forExpression(
StringUtils.format("\"%scountryIsoCode\" == countryIsoCode", FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX),
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
ExprMacroTable.nil()
)
);
}
private JoinableClause factToCountryOnNumber(final JoinType joinType)
{
return new JoinableClause(
FACT_TO_COUNTRY_ON_NUMBER_PREFIX,
new IndexedTableJoinable(countriesTable),
joinType,
JoinConditionAnalysis.forExpression(
StringUtils.format("\"%scountryNumber\" == countryNumber", FACT_TO_COUNTRY_ON_NUMBER_PREFIX),
FACT_TO_COUNTRY_ON_NUMBER_PREFIX,
ExprMacroTable.nil()
)
);
}
private JoinableClause factToRegion(final JoinType joinType)
{
return new JoinableClause(
FACT_TO_REGION_PREFIX,
new IndexedTableJoinable(regionsTable),
joinType,
JoinConditionAnalysis.forExpression(
StringUtils.format(
"\"%sregionIsoCode\" == regionIsoCode && \"%scountryIsoCode\" == countryIsoCode",
FACT_TO_REGION_PREFIX,
FACT_TO_REGION_PREFIX
),
FACT_TO_REGION_PREFIX,
ExprMacroTable.nil()
)
);
}
private JoinableClause regionToCountry(final JoinType joinType)
{
return new JoinableClause(
REGION_TO_COUNTRY_PREFIX,
new IndexedTableJoinable(countriesTable),
joinType,
JoinConditionAnalysis.forExpression(
StringUtils.format(
"\"%scountryIsoCode\" == \"%scountryIsoCode\"",
FACT_TO_REGION_PREFIX,
REGION_TO_COUNTRY_PREFIX
),
REGION_TO_COUNTRY_PREFIX,
ExprMacroTable.nil()
)
);
}
private HashJoinSegmentStorageAdapter makeFactToCountrySegment()
{
return new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT))
);
}
}

View File

@ -93,7 +93,7 @@ public class RowBasedIndexedTableTest
@Test
public void test_numRows_countries()
{
Assert.assertEquals(15, countriesTable.numRows());
Assert.assertEquals(17, countriesTable.numRows());
}
@Test

View File

@ -13,3 +13,5 @@
{"countryNumber":12,"countryIsoCode":"SV","countryName":"El Salvador"}
{"countryNumber":13,"countryIsoCode":"US","countryName":"United States"}
{"countryNumber":14,"countryIsoCode":"AX","countryName":"Atlantis"}
{"countryNumber":15,"countryIsoCode":"SU","countryName":"States United"}
{"countryNumber":16,"countryIsoCode":"USCA","countryName":"Usca"}

View File

@ -24,3 +24,5 @@
{"time":"2015-09-12T01:01:00.474Z","channel":"#ru.wikipedia","regionIsoCode":"20","countryNumber":11,"countryIsoCode":"NO","user":"85.113.179.226","delta":48,"isRobot":false,"isAnonymous":true,"page":"Алиса в Зазеркалье","namespace":"Main"}
{"time":"2015-09-12T01:02:08.440Z","channel":"#es.wikipedia","regionIsoCode":"G","countryNumber":4,"countryIsoCode":"EC","user":"181.39.132.136","delta":29,"isRobot":false,"isAnonymous":true,"page":"Gabinete Ministerial de Rafael Correa","namespace":"Main"}
{"time":"2015-09-12T02:33:40.059Z","channel":"#en.wikipedia","regionIsoCode":"VA","countryNumber":13,"countryIsoCode":"US","user":"68.100.166.227","delta":14,"isRobot":false,"isAnonymous":true,"page":"Old Anatolian Turkish","namespace":"Main"}
{"time":"2015-09-12T03:43:40.059Z","channel":"#en.wikipedia","regionIsoCode":"AV","countryNumber":15,"countryIsoCode":"SU","user":"68.100.166.227","delta":14,"isRobot":false,"isAnonymous":true,"page":"Cream Soda","namespace":"Main"}
{"time":"2015-09-12T04:43:40.059Z","channel":"#en.wikipedia","regionIsoCode":"MatchNothing","countryNumber":99,"countryIsoCode":"MatchNothing","user":"68.100.166.227","delta":14,"isRobot":false,"isAnonymous":true,"page":"Orange Soda","namespace":"Main"}

View File

@ -15,3 +15,5 @@
{"regionIsoCode":"SS","countryIsoCode":"SV","regionName":"Departamento de San Salvador"}
{"regionIsoCode":"VA","countryIsoCode":"IT","regionName":"Provincia di Varese"}
{"regionIsoCode":"VA","countryIsoCode":"US","regionName":"Virginia"}
{"regionIsoCode":"AV","countryIsoCode":"SU","regionName":"Ainigriv"}
{"regionIsoCode":"ZZ","countryIsoCode":"USCA","regionName":"Usca City"}