More efficient join filter rewrites (#9516)

* More efficient join filter rewrites

* Rebase

* Remove unused functions

* PR comments, fix compile

* Adjust comment

* Allow filter rewrite when join condition has LHS expression

* Fix inspections

* Fix tests
This commit is contained in:
Jonathan Wei 2020-03-16 22:16:14 -07:00 committed by GitHub
parent 142742f291
commit b1847364b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 1869 additions and 860 deletions

View File

@ -28,7 +28,6 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.expression.LookupExprMacro;
import org.apache.druid.query.filter.SelectorDimFilter;
@ -49,6 +48,8 @@ import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.JoinTestHelper;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.join.JoinableClause;
import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.segment.join.lookup.LookupJoinable;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
@ -69,6 +70,7 @@ import org.openjdk.jmh.infra.Blackhole;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@ -125,76 +127,112 @@ public class JoinAndLookupBenchmark
baseSegment = new QueryableIndexSegment(index, SegmentId.dummy("join"));
List<JoinableClause> joinableClausesLookupStringKey = ImmutableList.of(
new JoinableClause(
prefix,
LookupJoinable.wrap(JoinTestHelper.createCountryIsoCodeToNameLookup()),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format("countryIsoCode == \"%sk\"", prefix),
prefix,
ExprMacroTable.nil()
)
)
);
JoinFilterPreAnalysis preAnalysisLookupStringKey = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClausesLookupStringKey,
VirtualColumns.EMPTY,
null,
false,
false,
false,
0
);
hashJoinLookupStringKeySegment = new HashJoinSegment(
baseSegment,
ImmutableList.of(
new JoinableClause(
prefix,
LookupJoinable.wrap(JoinTestHelper.createCountryIsoCodeToNameLookup()),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format("countryIsoCode == \"%sk\"", prefix),
prefix,
ExprMacroTable.nil()
)
)
),
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE
joinableClausesLookupStringKey,
preAnalysisLookupStringKey
);
List<JoinableClause> joinableClausesLookupLongKey = ImmutableList.of(
new JoinableClause(
prefix,
LookupJoinable.wrap(JoinTestHelper.createCountryIsoCodeToNameLookup()),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format("countryIsoCode == \"%sk\"", prefix),
prefix,
ExprMacroTable.nil()
)
)
);
JoinFilterPreAnalysis preAnalysisLookupLongKey = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClausesLookupLongKey,
VirtualColumns.EMPTY,
null,
false,
false,
false,
0
);
hashJoinLookupLongKeySegment = new HashJoinSegment(
baseSegment,
ImmutableList.of(
new JoinableClause(
prefix,
LookupJoinable.wrap(JoinTestHelper.createCountryNumberToNameLookup()),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format("countryNumber == \"%sk\"", prefix),
prefix,
ExprMacroTable.nil()
)
)
),
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE
joinableClausesLookupLongKey,
preAnalysisLookupLongKey
);
List<JoinableClause> joinableClausesIndexedTableStringKey = ImmutableList.of(
new JoinableClause(
prefix,
new IndexedTableJoinable(JoinTestHelper.createCountriesIndexedTable()),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format("countryIsoCode == \"%scountryIsoCode\"", prefix),
prefix,
ExprMacroTable.nil()
)
)
);
JoinFilterPreAnalysis preAnalysisIndexedTableStringKey = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClausesIndexedTableStringKey,
VirtualColumns.EMPTY,
null,
false,
false,
false,
0
);
hashJoinIndexedTableStringKeySegment = new HashJoinSegment(
baseSegment,
ImmutableList.of(
new JoinableClause(
prefix,
new IndexedTableJoinable(JoinTestHelper.createCountriesIndexedTable()),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format("countryIsoCode == \"%scountryIsoCode\"", prefix),
prefix,
ExprMacroTable.nil()
)
)
),
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE
joinableClausesIndexedTableStringKey,
preAnalysisIndexedTableStringKey
);
List<JoinableClause> joinableClausesIndexedTableLonggKey = ImmutableList.of(
new JoinableClause(
prefix,
new IndexedTableJoinable(JoinTestHelper.createCountriesIndexedTable()),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format("countryNumber == \"%scountryNumber\"", prefix),
prefix,
ExprMacroTable.nil()
)
)
);
JoinFilterPreAnalysis preAnalysisIndexedTableLongKey = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClausesIndexedTableLonggKey,
VirtualColumns.EMPTY,
null,
false,
false,
false,
0
);
hashJoinIndexedTableLongKeySegment = new HashJoinSegment(
baseSegment,
ImmutableList.of(
new JoinableClause(
prefix,
new IndexedTableJoinable(JoinTestHelper.createCountriesIndexedTable()),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format("countryNumber == \"%scountryNumber\"", prefix),
prefix,
ExprMacroTable.nil()
)
)
),
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE
joinableClausesIndexedTableLonggKey,
preAnalysisIndexedTableLongKey
);
final Map<String, String> countryCodeToNameMap = JoinTestHelper.createCountryIsoCodeToNameLookup().getMap();

View File

@ -37,6 +37,7 @@ import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.VirtualColumns;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.Interval;
@ -158,4 +159,8 @@ public interface Query<T>
return withOverriddenContext(ImmutableMap.of(QueryContexts.LANE_KEY, lane));
}
default VirtualColumns getVirtualColumns()
{
return VirtualColumns.EMPTY;
}
}

View File

@ -47,7 +47,9 @@ public class QueryContexts
public static final String VECTORIZE_KEY = "vectorize";
public static final String VECTOR_SIZE_KEY = "vectorSize";
public static final String JOIN_FILTER_PUSH_DOWN_KEY = "enableJoinFilterPushDown";
public static final String JOIN_FILTER_REWRITE_KEY = "enableJoinFilterRewrite";
public static final String JOIN_FILTER_REWRITE_ENABLE_KEY = "enableJoinFilterRewrite";
public static final String JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS_ENABLE_KEY = "enableJoinFilterRewriteValueColumnFilters";
public static final String JOIN_FILTER_REWRITE_MAX_SIZE_KEY = "joinFilterRewriteMaxSize";
public static final boolean DEFAULT_BY_SEGMENT = false;
public static final boolean DEFAULT_POPULATE_CACHE = true;
@ -61,7 +63,9 @@ public class QueryContexts
public static final long NO_TIMEOUT = 0;
public static final boolean DEFAULT_ENABLE_PARALLEL_MERGE = true;
public static final boolean DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN = true;
public static final boolean DEFAULT_ENABLE_JOIN_FILTER_REWRITE = false;
public static final boolean DEFAULT_ENABLE_JOIN_FILTER_REWRITE = true;
public static final boolean DEFAULT_ENABLE_JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS = false;
public static final long DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY = 10000;
@SuppressWarnings("unused") // Used by Jackson serialization
public enum Vectorize
@ -227,6 +231,19 @@ public class QueryContexts
{
return parseInt(query, BROKER_PARALLELISM, defaultValue);
}
public static <T> boolean getEnableJoinFilterRewriteValueColumnFilters(Query<T> query)
{
return parseBoolean(
query,
JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS_ENABLE_KEY,
DEFAULT_ENABLE_JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS
);
}
public static <T> long getJoinFilterRewriteMaxSize(Query<T> query)
{
return parseLong(query, JOIN_FILTER_REWRITE_MAX_SIZE_KEY, DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY);
}
public static <T> boolean getEnableJoinFilterPushDown(Query<T> query)
{
@ -235,9 +252,10 @@ public class QueryContexts
public static <T> boolean getEnableJoinFilterRewrite(Query<T> query)
{
return parseBoolean(query, JOIN_FILTER_REWRITE_KEY, DEFAULT_ENABLE_JOIN_FILTER_REWRITE);
return parseBoolean(query, JOIN_FILTER_REWRITE_ENABLE_KEY, DEFAULT_ENABLE_JOIN_FILTER_REWRITE);
}
public static <T> Query<T> withMaxScatterGatherBytes(Query<T> query, long maxScatterGatherBytesLimit)
{
Object obj = query.getContextValue(MAX_SCATTER_GATHER_BYTES_KEY);

View File

@ -249,6 +249,7 @@ public class GroupByQuery extends BaseQuery<ResultRow>
return subtotalsSpec;
}
@Override
@JsonProperty
public VirtualColumns getVirtualColumns()
{

View File

@ -183,6 +183,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
return maxSegmentPartitionsOrderedInMemory;
}
@Override
@JsonProperty
public VirtualColumns getVirtualColumns()
{

View File

@ -103,6 +103,7 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
return Query.TIMESERIES;
}
@Override
@JsonProperty
public VirtualColumns getVirtualColumns()
{

View File

@ -113,6 +113,7 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
return TOPN;
}
@Override
@JsonProperty
public VirtualColumns getVirtualColumns()
{

View File

@ -24,6 +24,7 @@ import org.apache.druid.segment.AbstractSegment;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
@ -40,27 +41,23 @@ public class HashJoinSegment extends AbstractSegment
{
private final Segment baseSegment;
private final List<JoinableClause> clauses;
private final boolean enableFilterPushDown;
private final boolean enableFilterRewrite;
private final JoinFilterPreAnalysis joinFilterPreAnalysis;
/**
* @param baseSegment The left-hand side base segment
* @param clauses The right-hand side clauses. The caller is responsible for ensuring that there are no
* duplicate prefixes or prefixes that shadow each other across the clauses
* @param enableFilterPushDown Whether to enable filter push down optimizations to the base segment. In production
* this should generally be {@code QueryContexts.getEnableJoinFilterPushDown(query)}.
* @param baseSegment The left-hand side base segment
* @param clauses The right-hand side clauses. The caller is responsible for ensuring that there are no
* duplicate prefixes or prefixes that shadow each other across the clauses
* @param joinFilterPreAnalysis Pre-analysis computed by {@link org.apache.druid.segment.join.filter.JoinFilterAnalyzer#computeJoinFilterPreAnalysis}
*/
public HashJoinSegment(
Segment baseSegment,
List<JoinableClause> clauses,
boolean enableFilterPushDown,
boolean enableFilterRewrite
JoinFilterPreAnalysis joinFilterPreAnalysis
)
{
this.baseSegment = baseSegment;
this.clauses = clauses;
this.enableFilterPushDown = enableFilterPushDown;
this.enableFilterRewrite = enableFilterRewrite;
this.joinFilterPreAnalysis = joinFilterPreAnalysis;
// Verify 'clauses' is nonempty (otherwise it's a waste to create this object, and the caller should know)
if (clauses.isEmpty()) {
@ -93,7 +90,7 @@ public class HashJoinSegment extends AbstractSegment
@Override
public StorageAdapter asStorageAdapter()
{
return new HashJoinSegmentStorageAdapter(baseSegment.asStorageAdapter(), clauses, enableFilterPushDown, enableFilterRewrite);
return new HashJoinSegmentStorageAdapter(baseSegment.asStorageAdapter(), clauses, joinFilterPreAnalysis);
}
@Override

View File

@ -19,13 +19,11 @@
package org.apache.druid.segment.join;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.Capabilities;
@ -38,6 +36,7 @@ import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.data.ListIndexed;
import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.segment.join.filter.JoinFilterSplit;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -55,40 +54,22 @@ public class HashJoinSegmentStorageAdapter implements StorageAdapter
{
private final StorageAdapter baseAdapter;
private final List<JoinableClause> clauses;
private final boolean enableFilterPushDown;
private final boolean enableFilterRewrite;
private final JoinFilterPreAnalysis joinFilterPreAnalysis;
/**
* @param baseAdapter A StorageAdapter for the left-hand side base segment
* @param clauses The right-hand side clauses. The caller is responsible for ensuring that there are no
* duplicate prefixes or prefixes that shadow each other across the clauses
* @param enableFilterPushDown Whether to enable filter push down optimizations to the base segment
*/
HashJoinSegmentStorageAdapter(
StorageAdapter baseAdapter,
List<JoinableClause> clauses,
final boolean enableFilterPushDown,
final boolean enableFilterRewrite
final JoinFilterPreAnalysis joinFilterPreAnalysis
)
{
this.baseAdapter = baseAdapter;
this.clauses = clauses;
this.enableFilterPushDown = enableFilterPushDown;
this.enableFilterRewrite = enableFilterRewrite;
}
@VisibleForTesting
HashJoinSegmentStorageAdapter(
StorageAdapter baseAdapter,
List<JoinableClause> clauses
)
{
this(
baseAdapter,
clauses,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE
);
this.joinFilterPreAnalysis = joinFilterPreAnalysis;
}
@Override
@ -237,22 +218,16 @@ public class HashJoinSegmentStorageAdapter implements StorageAdapter
@Nullable final QueryMetrics<?> queryMetrics
)
{
final List<VirtualColumn> preJoinVirtualColumns = new ArrayList<>();
final List<VirtualColumn> postJoinVirtualColumns = new ArrayList<>();
final Set<String> baseColumns = determineBaseColumnsWithPreAndPostJoinVirtualColumns(
determineBaseColumnsWithPreAndPostJoinVirtualColumns(
virtualColumns,
preJoinVirtualColumns,
postJoinVirtualColumns
);
JoinFilterSplit joinFilterSplit = JoinFilterAnalyzer.splitFilter(
this,
baseColumns,
filter,
enableFilterPushDown,
enableFilterRewrite
);
JoinFilterSplit joinFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
preJoinVirtualColumns.addAll(joinFilterSplit.getPushDownVirtualColumns());
// Soon, we will need a way to push filters past a join when possible. This could potentially be done right here
@ -287,11 +262,6 @@ public class HashJoinSegmentStorageAdapter implements StorageAdapter
);
}
public List<JoinableClause> getClauses()
{
return clauses;
}
/**
* Returns whether "column" will be selected from "baseAdapter". This is true if it is not shadowed by any joinables
* (i.e. if it does not start with any of their prefixes).
@ -301,11 +271,6 @@ public class HashJoinSegmentStorageAdapter implements StorageAdapter
return !getClauseForColumn(column).isPresent();
}
public boolean isEnableFilterPushDown()
{
return enableFilterPushDown;
}
/**
* Return a String set containing the name of columns that belong to the base table (including any pre-join virtual
* columns as well).

View File

@ -84,11 +84,17 @@ public interface Joinable
* @param searchColumnName Name of the search column
* @param searchColumnValue Target value of the search column
* @param retrievalColumnName The column to retrieve values from
* @param maxCorrelationSetSize Maximum number of values to retrieve. If we detect that more values would be
* returned than this limit, return an empty set.
* @param allowNonKeyColumnSearch If true, allow searchs on non-key columns. If this is false,
* a search on a non-key column should return an empty set.
* @return The set of correlated column values. If we cannot determine correlated values, return an empty set.
*/
Set<String> getCorrelatedColumnValues(
String searchColumnName,
String searchColumnValue,
String retrievalColumnName
String retrievalColumnName,
long maxCorrelationSetSize,
boolean allowNonKeyColumnSearch
);
}

View File

@ -21,9 +21,13 @@ package org.apache.druid.segment.join;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.planning.PreJoinableClause;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.utils.JvmUtils;
import javax.annotation.Nullable;
@ -77,15 +81,29 @@ public class Joinables
* callers to remember to track metrics on CPU time required for creation of Joinables
* @param enableFilterPushDown whether to enable filter push down optimizations to the base segment. In production
* this should generally be {@code QueryContexts.getEnableJoinFilterPushDown(query)}.
* @param enableFilterRewrite whether to enable filter rewrite optimizations for RHS columns. In production
* this should generally be {@code QueryContexts.getEnableJoinFilterRewrite(query)}.
* @param enableRewriteValueColumnFilters whether to enable filter rewrite optimizations for RHS columns that are not
* key columns. In production this should generally
* be {@code QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(query)}.
* @param filterRewriteMaxSize the max allowed size of correlated value sets for RHS rewrites. In production
* this should generally be {@code QueryContexts.getJoinFilterRewriteMaxSize(query)}.
* @param originalFilter The original filter from the query.
* @param virtualColumns The virtual columns from the query.
*/
public static Function<Segment, Segment> createSegmentMapFn(
final List<PreJoinableClause> clauses,
final JoinableFactory joinableFactory,
final AtomicLong cpuTimeAccumulator,
final boolean enableFilterPushDown,
final boolean enableFilterRewrite
final boolean enableFilterRewrite,
final boolean enableRewriteValueColumnFilters,
final long filterRewriteMaxSize,
final Filter originalFilter,
final VirtualColumns virtualColumns
)
{
// compute column correlations here and RHS correlated values
return JvmUtils.safeAccumulateThreadCpuTime(
cpuTimeAccumulator,
() -> {
@ -93,7 +111,16 @@ public class Joinables
return Function.identity();
} else {
final List<JoinableClause> joinableClauses = createJoinableClauses(clauses, joinableFactory);
return baseSegment -> new HashJoinSegment(baseSegment, joinableClauses, enableFilterPushDown, enableFilterRewrite);
JoinFilterPreAnalysis jfpa = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
virtualColumns,
originalFilter,
enableFilterPushDown,
enableFilterRewrite,
enableRewriteValueColumnFilters,
filterRewriteMaxSize
);
return baseSegment -> new HashJoinSegment(baseSegment, joinableClauses, jfpa);
}
}
);

View File

@ -20,25 +20,26 @@
package org.apache.druid.segment.join.filter;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.math.expr.Expr;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.filter.InDimFilter;
import org.apache.druid.query.filter.ValueMatcher;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.filter.AndFilter;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.filter.OrFilter;
import org.apache.druid.segment.filter.SelectorFilter;
import org.apache.druid.segment.join.Equality;
import org.apache.druid.segment.join.HashJoinSegmentStorageAdapter;
import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.JoinableClause;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -50,22 +51,29 @@ import java.util.Set;
/**
* When there is a filter in a join query, we can sometimes improve performance by applying parts of the filter
* when we first read from the base table instead of after the join.
*
* This class provides a {@link #splitFilter(HashJoinSegmentStorageAdapter, Set, Filter, boolean, boolean)} method that
* takes a filter and splits it into a portion that should be applied to the base table prior to the join, and a
* portion that should be applied after the join.
*
*
* The first step of the filter splitting is to convert the filter into
* https://en.wikipedia.org/wiki/Conjunctive_normal_form (an AND of ORs). This allows us to consider each
* OR clause independently as a candidate for filter push down to the base table.
*
*
* A filter clause can be pushed down if it meets one of the following conditions:
* - The filter only applies to columns from the base table
* - The filter applies to columns from the join table, and we determine that the filter can be rewritten
* into a filter on columns from the base table
*
*
* For the second case, where we rewrite filter clauses, the rewritten clause can be less selective than the original,
* so we preserve the original clause in the post-join filtering phase.
*
* The starting point for join analysis is the {@link #computeJoinFilterPreAnalysis} method. This method should be
* called before performing any per-segment join query work. This method converts the query filter into
* conjunctive normal form, and splits the CNF clauses into a portion that only references base table columns and
* a portion that references join table columns. For the filter clauses that apply to join table columns, the
* pre-analysis step computes the information necessary for rewriting such filters into filters on base table columns.
*
* The result of this pre-analysis method should be passed into the next step of join filter analysis, described below.
*
* The {@link #splitFilter(JoinFilterPreAnalysis)} method takes the pre-analysis result and optionally applies the\
* filter rewrite and push down operations on a per-segment level.
*/
public class JoinFilterAnalyzer
{
@ -73,61 +81,55 @@ public class JoinFilterAnalyzer
private static final ColumnSelectorFactory ALL_NULL_COLUMN_SELECTOR_FACTORY = new AllNullColumnSelectorFactory();
/**
* Analyze a filter and return a JoinFilterSplit indicating what parts of the filter should be applied pre-join
* and post-join.
* Before making per-segment filter splitting decisions, we first do a pre-analysis step
* where we convert the query filter (if any) into conjunctive normal form and then
* determine the structure of RHS filter rewrites (if any), since this information is shared across all
* per-segment operations.
*
* @param hashJoinSegmentStorageAdapter The storage adapter that is being queried
* @param baseColumnNames Set of names of columns that belong to the base table,
* including pre-join virtual columns
* @param originalFilter Original filter from the query
* @param enableFilterPushDown Whether to enable filter push down
* @return A JoinFilterSplit indicating what parts of the filter should be applied pre-join
* and post-join.
* See {@link JoinFilterPreAnalysis} for details on the result of this pre-analysis step.
*
* @param joinableClauses The joinable clauses from the query
* @param virtualColumns The virtual columns from the query
* @param originalFilter The original filter from the query
* @param enableFilterPushDown Whether to enable filter push down
* @param enableFilterRewrite Whether to enable rewrites of filters involving RHS columns
* @param enableRewriteValueColumnFilters Whether to enable rewrites of filters invovling RHS non-key columns
* @param filterRewriteMaxSize The maximum size of the correlated value set for rewritten filters.
* If the correlated value set size exceeds this, the filter will not be
* rewritten and pushed down.
*
* @return A JoinFilterPreAnalysis containing information determined in this pre-analysis step.
*/
public static JoinFilterSplit splitFilter(
HashJoinSegmentStorageAdapter hashJoinSegmentStorageAdapter,
Set<String> baseColumnNames,
@Nullable Filter originalFilter,
public static JoinFilterPreAnalysis computeJoinFilterPreAnalysis(
List<JoinableClause> joinableClauses,
VirtualColumns virtualColumns,
Filter originalFilter,
boolean enableFilterPushDown,
boolean enableFilterRewrite
boolean enableFilterRewrite,
boolean enableRewriteValueColumnFilters,
long filterRewriteMaxSize
)
{
if (originalFilter == null) {
return new JoinFilterSplit(
null,
null,
ImmutableList.of()
);
}
final List<VirtualColumn> preJoinVirtualColumns = new ArrayList<>();
final List<VirtualColumn> postJoinVirtualColumns = new ArrayList<>();
if (!enableFilterPushDown) {
return new JoinFilterSplit(
null,
splitVirtualColumns(joinableClauses, virtualColumns, preJoinVirtualColumns, postJoinVirtualColumns);
if (originalFilter == null || !enableFilterPushDown) {
return new JoinFilterPreAnalysis(
joinableClauses,
originalFilter,
ImmutableList.of()
postJoinVirtualColumns,
null,
null,
null,
enableFilterPushDown,
enableFilterRewrite
);
}
Filter normalizedFilter = Filters.convertToCNF(originalFilter);
// build the prefix and equicondition maps
// We should check that the prefixes do not duplicate or shadow each other. This is not currently implemented,
// but this is tracked at https://github.com/apache/druid/issues/9329
Map<String, Set<Expr>> equiconditions = new HashMap<>();
Map<String, JoinableClause> prefixes = new HashMap<>();
for (JoinableClause clause : hashJoinSegmentStorageAdapter.getClauses()) {
prefixes.put(clause.getPrefix(), clause);
for (Equality equality : clause.getCondition().getEquiConditions()) {
Set<Expr> exprsForRhs = equiconditions.computeIfAbsent(
clause.getPrefix() + equality.getRightColumn(),
(rhs) -> {
return new HashSet<>();
}
);
exprsForRhs.add(equality.getLeftExpr());
}
}
// List of candidates for pushdown
// CNF normalization will generate either
// - an AND filter with multiple subfilters
@ -139,20 +141,213 @@ public class JoinFilterAnalyzer
normalizedOrClauses = Collections.singletonList(normalizedFilter);
}
List<Filter> normalizedBaseTableClauses = new ArrayList<>();
List<Filter> normalizedJoinTableClauses = new ArrayList<>();
for (Filter orClause : normalizedOrClauses) {
Set<String> reqColumns = orClause.getRequiredColumns();
if (areSomeColumnsFromJoin(joinableClauses, reqColumns) || areSomeColumnsFromPostJoinVirtualColumns(
postJoinVirtualColumns,
reqColumns
)) {
normalizedJoinTableClauses.add(orClause);
} else {
normalizedBaseTableClauses.add(orClause);
}
}
if (!enableFilterRewrite) {
return new JoinFilterPreAnalysis(
joinableClauses,
originalFilter,
postJoinVirtualColumns,
normalizedBaseTableClauses,
normalizedJoinTableClauses,
null,
enableFilterPushDown,
enableFilterRewrite
);
}
// build the equicondition map, used for determining how the tables are connected through joins
Map<String, Set<Expr>> equiconditions = new HashMap<>();
for (JoinableClause clause : joinableClauses) {
for (Equality equality : clause.getCondition().getEquiConditions()) {
Set<Expr> exprsForRhs = equiconditions.computeIfAbsent(
clause.getPrefix() + equality.getRightColumn(),
(rhs) -> {
return new HashSet<>();
}
);
exprsForRhs.add(equality.getLeftExpr());
}
}
Map<String, Optional<Map<String, JoinFilterColumnCorrelationAnalysis>>> correlationsByPrefix = new HashMap<>();
// Determine candidates for filter rewrites.
// A candidate is an RHS column that appears in a filter, along with the value being filtered on, plus
// the joinable clause associated with the table that the RHS column is from.
Set<RhsRewriteCandidate> rhsRewriteCandidates = new HashSet<>();
for (Filter orClause : normalizedJoinTableClauses) {
if (filterMatchesNull(orClause)) {
continue;
}
if (orClause instanceof SelectorFilter) {
// this is a candidate for RHS filter rewrite, determine column correlations and correlated values
String reqColumn = ((SelectorFilter) orClause).getDimension();
String reqValue = ((SelectorFilter) orClause).getValue();
JoinableClause joinableClause = isColumnFromJoin(joinableClauses, reqColumn);
if (joinableClause != null) {
rhsRewriteCandidates.add(
new RhsRewriteCandidate(
joinableClause,
reqColumn,
reqValue
)
);
}
}
if (orClause instanceof OrFilter) {
for (Filter subFilter : ((OrFilter) orClause).getFilters()) {
if (subFilter instanceof SelectorFilter) {
String reqColumn = ((SelectorFilter) subFilter).getDimension();
String reqValue = ((SelectorFilter) subFilter).getValue();
JoinableClause joinableClause = isColumnFromJoin(joinableClauses, reqColumn);
if (joinableClause != null) {
rhsRewriteCandidates.add(
new RhsRewriteCandidate(
joinableClause,
reqColumn,
reqValue
)
);
}
}
}
}
}
// Build a map of RHS table prefix -> JoinFilterColumnCorrelationAnalysis based on the RHS rewrite candidates
for (RhsRewriteCandidate rhsRewriteCandidate : rhsRewriteCandidates) {
correlationsByPrefix.computeIfAbsent(
rhsRewriteCandidate.getJoinableClause().getPrefix(),
p -> findCorrelatedBaseTableColumns(
joinableClauses,
p,
rhsRewriteCandidate.getJoinableClause(),
equiconditions
)
);
}
// Using the RHS table prefix -> JoinFilterColumnCorrelationAnalysis created in the previous step,
// build a map of rhsFilterColumn -> Pair(rhsFilterColumn, rhsFilterValue) -> correlatedValues for specific filter pair
// The Pair(rhsFilterColumn, rhsFilterValue) -> correlatedValues mappings are stored in the
// JoinFilterColumnCorrelationAnalysis objects, which are shared across all rhsFilterColumn entries that belong
// to the same RHS table.
//
// The value is a List<JoinFilterColumnCorreationAnalysis> instead of a single value because a table can be joined
// to another via multiple columns.
// (See JoinFilterAnalyzerTest.test_filterPushDown_factToRegionOneColumnToTwoRHSColumnsAndFilterOnRHS for an example)
Map<String, Optional<List<JoinFilterColumnCorrelationAnalysis>>> correlationsByFilteringColumn = new HashMap<>();
for (RhsRewriteCandidate rhsRewriteCandidate : rhsRewriteCandidates) {
Optional<Map<String, JoinFilterColumnCorrelationAnalysis>> correlationsForPrefix = correlationsByPrefix.get(
rhsRewriteCandidate.getJoinableClause().getPrefix()
);
if (correlationsForPrefix.isPresent()) {
for (Map.Entry<String, JoinFilterColumnCorrelationAnalysis> correlationForPrefix : correlationsForPrefix.get()
.entrySet()) {
Optional<List<JoinFilterColumnCorrelationAnalysis>> perColumnCorrelations =
correlationsByFilteringColumn.computeIfAbsent(
rhsRewriteCandidate.getRhsColumn(),
(rhsCol) -> {
return Optional.of(new ArrayList<>());
}
);
perColumnCorrelations.get().add(correlationForPrefix.getValue());
correlationForPrefix.getValue().getCorrelatedValuesMap().computeIfAbsent(
Pair.of(rhsRewriteCandidate.getRhsColumn(), rhsRewriteCandidate.getValueForRewrite()),
(rhsVal) -> {
Set<String> correlatedValues = getCorrelatedValuesForPushDown(
rhsRewriteCandidate.getRhsColumn(),
rhsRewriteCandidate.getValueForRewrite(),
correlationForPrefix.getValue().getJoinColumn(),
rhsRewriteCandidate.getJoinableClause(),
enableRewriteValueColumnFilters,
filterRewriteMaxSize
);
if (correlatedValues.isEmpty()) {
return Optional.empty();
} else {
return Optional.of(correlatedValues);
}
}
);
}
} else {
correlationsByFilteringColumn.put(rhsRewriteCandidate.getRhsColumn(), Optional.empty());
}
}
// Go through each per-column analysis list and prune duplicates
for (Map.Entry<String, Optional<List<JoinFilterColumnCorrelationAnalysis>>> correlation : correlationsByFilteringColumn.entrySet()) {
if (correlation.getValue().isPresent()) {
List<JoinFilterColumnCorrelationAnalysis> dedupList = eliminateCorrelationDuplicates(
correlation.getValue().get()
);
correlationsByFilteringColumn.put(correlation.getKey(), Optional.of(dedupList));
}
}
return new JoinFilterPreAnalysis(
joinableClauses,
originalFilter,
postJoinVirtualColumns,
normalizedBaseTableClauses,
normalizedJoinTableClauses,
correlationsByFilteringColumn,
enableFilterPushDown,
enableFilterRewrite
);
}
/**
* @param joinFilterPreAnalysis The pre-analysis computed by {@link #computeJoinFilterPreAnalysis)}
* @return A JoinFilterSplit indicating what parts of the filter should be applied pre-join and post-join
*/
public static JoinFilterSplit splitFilter(
JoinFilterPreAnalysis joinFilterPreAnalysis
)
{
if (joinFilterPreAnalysis.getOriginalFilter() == null || !joinFilterPreAnalysis.isEnableFilterPushDown()) {
return new JoinFilterSplit(
null,
joinFilterPreAnalysis.getOriginalFilter(),
ImmutableList.of()
);
}
// Pushdown filters, rewriting if necessary
List<Filter> leftFilters = new ArrayList<>();
List<Filter> rightFilters = new ArrayList<>();
List<VirtualColumn> pushDownVirtualColumns = new ArrayList<>();
Map<String, Optional<List<JoinFilterColumnCorrelationAnalysis>>> correlationCache = new HashMap<>();
for (Filter orClause : normalizedOrClauses) {
for (Filter baseTableFilter : joinFilterPreAnalysis.getNormalizedBaseTableClauses()) {
if (!filterMatchesNull(baseTableFilter)) {
leftFilters.add(baseTableFilter);
} else {
rightFilters.add(baseTableFilter);
}
}
for (Filter orClause : joinFilterPreAnalysis.getNormalizedJoinTableClauses()) {
JoinFilterAnalysis joinFilterAnalysis = analyzeJoinFilterClause(
baseColumnNames,
orClause,
prefixes,
equiconditions,
correlationCache,
enableFilterRewrite
joinFilterPreAnalysis
);
if (joinFilterAnalysis.isCanPushDown()) {
leftFilters.add(joinFilterAnalysis.getPushDownFilter().get());
@ -173,75 +368,39 @@ public class JoinFilterAnalyzer
}
/**
* Analyze a filter clause from a filter that is in conjunctive normal form (AND of ORs).
* The clause is expected to be an OR filter or a leaf filter.
*
* @param baseColumnNames Set of names of columns that belong to the base table, including pre-join virtual columns
* @param filterClause Individual filter clause (an OR filter or a leaf filter) from a filter that is in CNF
* @param prefixes Map of table prefixes
* @param equiconditions Equicondition map
* @param correlationCache Cache of column correlation analyses.
* @param joinFilterPreAnalysis The pre-analysis computed by {@link #computeJoinFilterPreAnalysis)}
*
* @return a JoinFilterAnalysis that contains a possible filter rewrite and information on how to handle the filter.
*/
private static JoinFilterAnalysis analyzeJoinFilterClause(
Set<String> baseColumnNames,
Filter filterClause,
Map<String, JoinableClause> prefixes,
Map<String, Set<Expr>> equiconditions,
Map<String, Optional<List<JoinFilterColumnCorrelationAnalysis>>> correlationCache,
boolean enableFilterRewrite
JoinFilterPreAnalysis joinFilterPreAnalysis
)
{
// NULL matching conditions are not currently pushed down.
// They require special consideration based on the join type, and for simplicity of the initial implementation
// this is not currently handled.
if (filterMatchesNull(filterClause)) {
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(filterClause);
}
boolean baseTableOnly = true;
for (String requiredColumn : filterClause.getRequiredColumns()) {
if (!baseColumnNames.contains(requiredColumn)) {
baseTableOnly = false;
break;
}
}
if (baseTableOnly) {
return new JoinFilterAnalysis(
false,
filterClause,
filterClause,
ImmutableList.of()
);
}
if (!enableFilterRewrite) {
if (!joinFilterPreAnalysis.isEnableFilterRewrite() || filterMatchesNull(filterClause)) {
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(filterClause);
}
// Currently we only support rewrites of selector filters and selector filters within OR filters.
if (filterClause instanceof SelectorFilter) {
return rewriteSelectorFilter(
baseColumnNames,
(SelectorFilter) filterClause,
prefixes,
equiconditions,
correlationCache
joinFilterPreAnalysis
);
}
if (filterClause instanceof OrFilter) {
return rewriteOrFilter(
baseColumnNames,
(OrFilter) filterClause,
prefixes,
equiconditions,
correlationCache
joinFilterPreAnalysis
);
}
@ -252,54 +411,37 @@ public class JoinFilterAnalyzer
* Potentially rewrite the subfilters of an OR filter so that the whole OR filter can be pushed down to
* the base table.
*
* @param baseColumnNames Set of names of columns that belong to the base table, including pre-join virtual columns
* @param orFilter OrFilter to be rewritten
* @param prefixes Map of table prefixes to clauses
* @param equiconditions Map of equiconditions
* @param correlationCache Column correlation analysis cache. This will be potentially modified by adding
* any new column correlation analyses to the cache.
* @param joinFilterPreAnalysis The pre-analysis computed by {@link #computeJoinFilterPreAnalysis)}
*
* @return A JoinFilterAnalysis indicating how to handle the potentially rewritten filter
*/
private static JoinFilterAnalysis rewriteOrFilter(
Set<String> baseColumnNames,
OrFilter orFilter,
Map<String, JoinableClause> prefixes,
Map<String, Set<Expr>> equiconditions,
Map<String, Optional<List<JoinFilterColumnCorrelationAnalysis>>> correlationCache
JoinFilterPreAnalysis joinFilterPreAnalysis
)
{
boolean retainRhs = false;
List<Filter> newFilters = new ArrayList<>();
for (Filter filter : orFilter.getFilters()) {
boolean allBaseColumns = true;
for (String requiredColumn : filter.getRequiredColumns()) {
if (!baseColumnNames.contains(requiredColumn)) {
allBaseColumns = false;
}
if (!areSomeColumnsFromJoin(joinFilterPreAnalysis.getJoinableClauses(), filter.getRequiredColumns())) {
newFilters.add(filter);
continue;
}
if (!allBaseColumns) {
retainRhs = true;
if (filter instanceof SelectorFilter) {
JoinFilterAnalysis rewritten = rewriteSelectorFilter(
baseColumnNames,
(SelectorFilter) filter,
prefixes,
equiconditions,
correlationCache
);
if (!rewritten.isCanPushDown()) {
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(orFilter);
} else {
newFilters.add(rewritten.getPushDownFilter().get());
}
} else {
retainRhs = true;
if (filter instanceof SelectorFilter) {
JoinFilterAnalysis rewritten = rewriteSelectorFilter(
(SelectorFilter) filter,
joinFilterPreAnalysis
);
if (!rewritten.isCanPushDown()) {
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(orFilter);
} else {
newFilters.add(rewritten.getPushDownFilter().get());
}
} else {
newFilters.add(filter);
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(orFilter);
}
}
@ -314,121 +456,99 @@ public class JoinFilterAnalyzer
/**
* Rewrites a selector filter on a join table into an IN filter on the base table.
*
* @param baseColumnNames Set of names of columns that belong to the base table, including pre-join virtual
* columns
* @param selectorFilter SelectorFilter to be rewritten
* @param prefixes Map of join table prefixes to clauses
* @param equiconditions Map of equiconditions
* @param correlationCache Cache of column correlation analyses. This will be potentially modified by adding
* any new column correlation analyses to the cache.
* @param joinFilterPreAnalysis The pre-analysis computed by {@link #computeJoinFilterPreAnalysis)}
*
* @return A JoinFilterAnalysis that indicates how to handle the potentially rewritten filter
*/
private static JoinFilterAnalysis rewriteSelectorFilter(
Set<String> baseColumnNames,
SelectorFilter selectorFilter,
Map<String, JoinableClause> prefixes,
Map<String, Set<Expr>> equiconditions,
Map<String, Optional<List<JoinFilterColumnCorrelationAnalysis>>> correlationCache
JoinFilterPreAnalysis joinFilterPreAnalysis
)
{
List<Filter> newFilters = new ArrayList<>();
List<VirtualColumn> pushdownVirtualColumns = new ArrayList<>();
String filteringColumn = selectorFilter.getDimension();
for (Map.Entry<String, JoinableClause> prefixAndClause : prefixes.entrySet()) {
if (prefixAndClause.getValue().includesColumn(filteringColumn)) {
Optional<List<JoinFilterColumnCorrelationAnalysis>> correlations = correlationCache.computeIfAbsent(
prefixAndClause.getKey(),
p -> findCorrelatedBaseTableColumns(
baseColumnNames,
p,
prefixes.get(p),
equiconditions
)
String filteringValue = selectorFilter.getValue();
if (areSomeColumnsFromPostJoinVirtualColumns(
joinFilterPreAnalysis.getPostJoinVirtualColumns(),
selectorFilter.getRequiredColumns()
)) {
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(selectorFilter);
}
if (!areSomeColumnsFromJoin(joinFilterPreAnalysis.getJoinableClauses(), selectorFilter.getRequiredColumns())) {
return new JoinFilterAnalysis(
true,
selectorFilter,
selectorFilter,
pushdownVirtualColumns
);
}
Optional<List<JoinFilterColumnCorrelationAnalysis>> correlationAnalyses = joinFilterPreAnalysis.getCorrelationsByFilteringColumn()
.get(filteringColumn);
if (!correlationAnalyses.isPresent()) {
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(selectorFilter);
}
for (JoinFilterColumnCorrelationAnalysis correlationAnalysis : correlationAnalyses.get()) {
if (correlationAnalysis.supportsPushDown()) {
Optional<Set<String>> correlatedValues = correlationAnalysis.getCorrelatedValuesMap().get(
Pair.of(filteringColumn, filteringValue)
);
if (!correlations.isPresent()) {
if (!correlatedValues.isPresent()) {
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(selectorFilter);
}
List<Filter> newFilters = new ArrayList<>();
List<VirtualColumn> pushdownVirtualColumns = new ArrayList<>();
for (JoinFilterColumnCorrelationAnalysis correlationAnalysis : correlations.get()) {
if (correlationAnalysis.supportsPushDown()) {
Set<String> correlatedValues = getCorrelatedValuesForPushDown(
selectorFilter.getDimension(),
selectorFilter.getValue(),
correlationAnalysis.getJoinColumn(),
prefixAndClause.getValue()
);
if (correlatedValues.isEmpty()) {
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(selectorFilter);
}
for (String correlatedBaseColumn : correlationAnalysis.getBaseColumns()) {
Filter rewrittenFilter = new InDimFilter(
correlatedBaseColumn,
correlatedValues,
null,
null
).toFilter();
newFilters.add(rewrittenFilter);
}
for (Expr correlatedBaseExpr : correlationAnalysis.getBaseExpressions()) {
// We need to create a virtual column for the expressions when pushing down.
// Note that this block is never entered right now, since correlationAnalysis.supportsPushDown()
// will return false if there any correlated expressions on the base table.
// Pushdown of such filters is disabled until the expressions system supports converting an expression
// into a String representation that can be reparsed into the same expression.
// https://github.com/apache/druid/issues/9326 tracks this expressions issue.
String vcName = getCorrelatedBaseExprVirtualColumnName(pushdownVirtualColumns.size());
VirtualColumn correlatedBaseExprVirtualColumn = new ExpressionVirtualColumn(
vcName,
correlatedBaseExpr,
ValueType.STRING
);
pushdownVirtualColumns.add(correlatedBaseExprVirtualColumn);
Filter rewrittenFilter = new InDimFilter(
vcName,
correlatedValues,
null,
null
).toFilter();
newFilters.add(rewrittenFilter);
}
}
for (String correlatedBaseColumn : correlationAnalysis.getBaseColumns()) {
Filter rewrittenFilter = new InDimFilter(
correlatedBaseColumn,
correlatedValues.get(),
null,
null
).toFilter();
newFilters.add(rewrittenFilter);
}
if (newFilters.isEmpty()) {
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(selectorFilter);
}
for (Expr correlatedBaseExpr : correlationAnalysis.getBaseExpressions()) {
// We need to create a virtual column for the expressions when pushing down
String vcName = getCorrelatedBaseExprVirtualColumnName(pushdownVirtualColumns.size());
return new JoinFilterAnalysis(
true,
selectorFilter,
Filters.and(newFilters),
pushdownVirtualColumns
);
VirtualColumn correlatedBaseExprVirtualColumn = new ExpressionVirtualColumn(
vcName,
correlatedBaseExpr,
ValueType.STRING
);
pushdownVirtualColumns.add(correlatedBaseExprVirtualColumn);
Filter rewrittenFilter = new InDimFilter(
vcName,
correlatedValues.get(),
null,
null
).toFilter();
newFilters.add(rewrittenFilter);
}
}
}
// We're not filtering directly on a column from one of the join tables, but
// we might be filtering on a post-join virtual column (which won't have a join prefix). We cannot
// push down such filters, so check that the filtering column appears in the set of base column names (which
// includes pre-join virtual columns).
if (baseColumnNames.contains(filteringColumn)) {
return new JoinFilterAnalysis(
false,
selectorFilter,
selectorFilter,
ImmutableList.of()
);
} else {
if (newFilters.isEmpty()) {
return JoinFilterAnalysis.createNoPushdownFilterAnalysis(selectorFilter);
}
return new JoinFilterAnalysis(
true,
selectorFilter,
Filters.and(newFilters),
pushdownVirtualColumns
);
}
private static String getCorrelatedBaseExprVirtualColumnName(int counter)
@ -453,7 +573,9 @@ public class JoinFilterAnalyzer
String filterColumn,
String filterValue,
String correlatedJoinColumn,
JoinableClause clauseForFilteredTable
JoinableClause clauseForFilteredTable,
boolean enableRewriteValueColumnFilters,
long filterRewriteMaxSize
)
{
String filterColumnNoPrefix = filterColumn.substring(clauseForFilteredTable.getPrefix().length());
@ -462,7 +584,9 @@ public class JoinFilterAnalyzer
return clauseForFilteredTable.getJoinable().getCorrelatedColumnValues(
filterColumnNoPrefix,
filterValue,
correlatedColumnNoPrefix
correlatedColumnNoPrefix,
filterRewriteMaxSize,
enableRewriteValueColumnFilters
);
}
@ -490,8 +614,6 @@ public class JoinFilterAnalyzer
* Because we cannot reverse the function f() applied to the second table B in all cases,
* we cannot relate C.joinColumn to A.joinColumn, and we would not generate a correlation for C.joinColumn
*
* @param baseColumnNames Set of names of columns that belong to the base table, including pre-join virtual
* columns
* @param tablePrefix Prefix for a join table
* @param clauseForTablePrefix Joinable clause for the prefix
* @param equiConditions Map of equiconditions, keyed by the right hand columns
@ -499,8 +621,8 @@ public class JoinFilterAnalyzer
* @return A list of correlatation analyses for the equicondition RHS columns that reside in the table associated with
* the tablePrefix
*/
private static Optional<List<JoinFilterColumnCorrelationAnalysis>> findCorrelatedBaseTableColumns(
Set<String> baseColumnNames,
private static Optional<Map<String, JoinFilterColumnCorrelationAnalysis>> findCorrelatedBaseTableColumns(
List<JoinableClause> joinableClauses,
String tablePrefix,
JoinableClause clauseForTablePrefix,
Map<String, Set<Expr>> equiConditions
@ -513,14 +635,14 @@ public class JoinFilterAnalyzer
rhsColumns.add(tablePrefix + eq.getRightColumn());
}
List<JoinFilterColumnCorrelationAnalysis> correlations = new ArrayList<>();
Map<String, JoinFilterColumnCorrelationAnalysis> correlations = new HashMap<>();
for (String rhsColumn : rhsColumns) {
Set<String> correlatedBaseColumns = new HashSet<>();
Set<Expr> correlatedBaseExpressions = new HashSet<>();
getCorrelationForRHSColumn(
baseColumnNames,
joinableClauses,
equiConditions,
rhsColumn,
correlatedBaseColumns,
@ -528,10 +650,11 @@ public class JoinFilterAnalyzer
);
if (correlatedBaseColumns.isEmpty() && correlatedBaseExpressions.isEmpty()) {
return Optional.empty();
continue;
}
correlations.add(
correlations.put(
rhsColumn,
new JoinFilterColumnCorrelationAnalysis(
rhsColumn,
correlatedBaseColumns,
@ -540,9 +663,11 @@ public class JoinFilterAnalyzer
);
}
List<JoinFilterColumnCorrelationAnalysis> dedupCorrelations = eliminateCorrelationDuplicates(correlations);
return Optional.of(dedupCorrelations);
if (correlations.size() == 0) {
return Optional.empty();
} else {
return Optional.of(correlations);
}
}
/**
@ -550,7 +675,6 @@ public class JoinFilterAnalyzer
* and/or expressions for a single RHS column and adds them to the provided sets as it traverses the
* equicondition column relationships.
*
* @param baseColumnNames Set of names of columns that belong to the base table, including pre-join virtual columns
* @param equiConditions Map of equiconditions, keyed by the right hand columns
* @param rhsColumn RHS column to find base table correlations for
* @param correlatedBaseColumns Set of correlated base column names for the provided RHS column. Will be modified.
@ -558,7 +682,7 @@ public class JoinFilterAnalyzer
* modified.
*/
private static void getCorrelationForRHSColumn(
Set<String> baseColumnNames,
List<JoinableClause> joinableClauses,
Map<String, Set<Expr>> equiConditions,
String rhsColumn,
Set<String> correlatedBaseColumns,
@ -577,18 +701,19 @@ public class JoinFilterAnalyzer
// We push down if the function only requires base table columns
Expr.BindingDetails bindingDetails = lhsExpr.analyzeInputs();
Set<String> requiredBindings = bindingDetails.getRequiredBindings();
if (!baseColumnNames.containsAll(requiredBindings)) {
if (areSomeColumnsFromJoin(joinableClauses, requiredBindings)) {
break;
}
correlatedBaseExpressions.add(lhsExpr);
} else {
// simple identifier, see if we can correlate it with a column on the base table
findMappingFor = identifier;
if (baseColumnNames.contains(identifier)) {
if (isColumnFromJoin(joinableClauses, identifier) == null) {
correlatedBaseColumns.add(findMappingFor);
} else {
getCorrelationForRHSColumn(
baseColumnNames,
joinableClauses,
equiConditions,
findMappingFor,
correlatedBaseColumns,
@ -602,20 +727,21 @@ public class JoinFilterAnalyzer
/**
* Given a list of JoinFilterColumnCorrelationAnalysis, prune the list so that we only have one
* JoinFilterColumnCorrelationAnalysis for each unique combination of base columns.
*
*
* Suppose we have a join condition like the following, where A is the base table:
* A.joinColumn == B.joinColumn && A.joinColumn == B.joinColumn2
*
* A.joinColumn == B.joinColumn && A.joinColumn == B.joinColumn2
*
* We only need to consider one correlation to A.joinColumn since B.joinColumn and B.joinColumn2 must
* have the same value in any row that matches the join condition.
*
*
* In the future this method could consider which column correlation should be preserved based on availability of
* indices and other heuristics.
*
*
* When push down of filters with LHS expressions in the join condition is supported, this method should also
* consider expressions.
*
* @param originalList Original list of column correlation analyses.
*
* @return Pruned list of column correlation analyses.
*/
private static List<JoinFilterColumnCorrelationAnalysis> eliminateCorrelationDuplicates(
@ -635,4 +761,106 @@ public class JoinFilterAnalyzer
ValueMatcher valueMatcher = filter.makeMatcher(ALL_NULL_COLUMN_SELECTOR_FACTORY);
return valueMatcher.matches();
}
private static JoinableClause isColumnFromJoin(
List<JoinableClause> joinableClauses,
String column
)
{
for (JoinableClause joinableClause : joinableClauses) {
if (joinableClause.includesColumn(column)) {
return joinableClause;
}
}
return null;
}
private static boolean isColumnFromPostJoinVirtualColumns(
List<VirtualColumn> postJoinVirtualColumns,
String column
)
{
for (VirtualColumn postJoinVirtualColumn : postJoinVirtualColumns) {
if (column.equals(postJoinVirtualColumn.getOutputName())) {
return true;
}
}
return false;
}
private static boolean areSomeColumnsFromJoin(
List<JoinableClause> joinableClauses,
Collection<String> columns
)
{
for (String column : columns) {
if (isColumnFromJoin(joinableClauses, column) != null) {
return true;
}
}
return false;
}
private static boolean areSomeColumnsFromPostJoinVirtualColumns(
List<VirtualColumn> postJoinVirtualColumns,
Collection<String> columns
)
{
for (String column : columns) {
if (isColumnFromPostJoinVirtualColumns(postJoinVirtualColumns, column)) {
return true;
}
}
return false;
}
private static void splitVirtualColumns(
List<JoinableClause> joinableClauses,
final VirtualColumns virtualColumns,
final List<VirtualColumn> preJoinVirtualColumns,
final List<VirtualColumn> postJoinVirtualColumns
)
{
for (VirtualColumn virtualColumn : virtualColumns.getVirtualColumns()) {
if (areSomeColumnsFromJoin(joinableClauses, virtualColumn.requiredColumns())) {
postJoinVirtualColumns.add(virtualColumn);
} else {
preJoinVirtualColumns.add(virtualColumn);
}
}
}
private static class RhsRewriteCandidate
{
private final JoinableClause joinableClause;
private final String rhsColumn;
private final String valueForRewrite;
public RhsRewriteCandidate(
JoinableClause joinableClause,
String rhsColumn,
String valueForRewrite
)
{
this.joinableClause = joinableClause;
this.rhsColumn = rhsColumn;
this.valueForRewrite = valueForRewrite;
}
public JoinableClause getJoinableClause()
{
return joinableClause;
}
public String getRhsColumn()
{
return rhsColumn;
}
public String getValueForRewrite()
{
return valueForRewrite;
}
}
}

View File

@ -19,10 +19,14 @@
package org.apache.druid.segment.join.filter;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.math.expr.Expr;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
@ -37,6 +41,7 @@ public class JoinFilterColumnCorrelationAnalysis
private final String joinColumn;
private final List<String> baseColumns;
private final List<Expr> baseExpressions;
private Map<Pair<String, String>, Optional<Set<String>>> correlatedValuesMap;
public JoinFilterColumnCorrelationAnalysis(
String joinColumn,
@ -48,6 +53,7 @@ public class JoinFilterColumnCorrelationAnalysis
this.baseColumns = new ArrayList<>(baseColumns);
this.baseExpressions = new ArrayList<>(baseExpressions);
this.baseColumns.sort(String.CASE_INSENSITIVE_ORDER);
this.correlatedValuesMap = new HashMap<>();
}
public String getJoinColumn()
@ -65,8 +71,13 @@ public class JoinFilterColumnCorrelationAnalysis
return baseExpressions;
}
public Map<Pair<String, String>, Optional<Set<String>>> getCorrelatedValuesMap()
{
return correlatedValuesMap;
}
public boolean supportsPushDown()
{
return !baseColumns.isEmpty() && baseExpressions.isEmpty();
return !baseColumns.isEmpty() || !baseExpressions.isEmpty();
}
}

View File

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join.filter;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.join.JoinableClause;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* A JoinFilterPreAnalysis contains filter push down/rewrite information that does not have per-segment dependencies.
* This includes:
* - The query's JoinableClauses list
* - The query's original filter (if any)
* - A list of filter clauses from the original filter's CNF representation that only reference the base table
* - A list of filter clauses from the original filter's CNF representation that reference RHS join tables
* - A mapping of RHS filtering columns -> List<JoinFilterColumnCorrelationAnalysis>, used for filter rewrites
* - A list of virtual columns that can only be computed post-join
* - Control flag booleans for whether filter push down and RHS rewrites are enabled.
*/
public class JoinFilterPreAnalysis
{
private final List<JoinableClause> joinableClauses;
private final Filter originalFilter;
private final List<Filter> normalizedBaseTableClauses;
private final List<Filter> normalizedJoinTableClauses;
private final Map<String, Optional<List<JoinFilterColumnCorrelationAnalysis>>> correlationsByFilteringColumn;
private final boolean enableFilterPushDown;
private final boolean enableFilterRewrite;
private final List<VirtualColumn> postJoinVirtualColumns;
public JoinFilterPreAnalysis(
final List<JoinableClause> joinableClauses,
final Filter originalFilter,
final List<VirtualColumn> postJoinVirtualColumns,
final List<Filter> normalizedBaseTableClauses,
final List<Filter> normalizedJoinTableClauses,
final Map<String, Optional<List<JoinFilterColumnCorrelationAnalysis>>> correlationsByFilteringColumn,
final boolean enableFilterPushDown,
final boolean enableFilterRewrite
)
{
this.joinableClauses = joinableClauses;
this.originalFilter = originalFilter;
this.postJoinVirtualColumns = postJoinVirtualColumns;
this.normalizedBaseTableClauses = normalizedBaseTableClauses;
this.normalizedJoinTableClauses = normalizedJoinTableClauses;
this.correlationsByFilteringColumn = correlationsByFilteringColumn;
this.enableFilterPushDown = enableFilterPushDown;
this.enableFilterRewrite = enableFilterRewrite;
}
public List<JoinableClause> getJoinableClauses()
{
return joinableClauses;
}
public Filter getOriginalFilter()
{
return originalFilter;
}
public List<VirtualColumn> getPostJoinVirtualColumns()
{
return postJoinVirtualColumns;
}
public List<Filter> getNormalizedBaseTableClauses()
{
return normalizedBaseTableClauses;
}
public List<Filter> getNormalizedJoinTableClauses()
{
return normalizedJoinTableClauses;
}
public Map<String, Optional<List<JoinFilterColumnCorrelationAnalysis>>> getCorrelationsByFilteringColumn()
{
return correlationsByFilteringColumn;
}
public boolean isEnableFilterPushDown()
{
return enableFilterPushDown;
}
public boolean isEnableFilterRewrite()
{
return enableFilterRewrite;
}
}

View File

@ -90,7 +90,9 @@ public class LookupJoinable implements Joinable
public Set<String> getCorrelatedColumnValues(
String searchColumnName,
String searchColumnValue,
String retrievalColumnName
String retrievalColumnName,
long maxCorrelationSetSize,
boolean allowNonKeyColumnSearch
)
{
Set<String> correlatedValues;
@ -101,9 +103,14 @@ public class LookupJoinable implements Joinable
correlatedValues = ImmutableSet.of(extractor.apply(searchColumnName));
}
} else {
if (!allowNonKeyColumnSearch) {
return ImmutableSet.of();
}
if (LookupColumnSelectorFactory.VALUE_COLUMN.equals(retrievalColumnName)) {
correlatedValues = ImmutableSet.of(searchColumnValue);
} else {
// Lookup extractor unapply only provides a list of strings, so we can't respect
// maxCorrelationSetSize easily. This should be handled eventually.
correlatedValues = ImmutableSet.copyOf(extractor.unapply(searchColumnValue));
}
}

View File

@ -84,7 +84,9 @@ public class IndexedTableJoinable implements Joinable
public Set<String> getCorrelatedColumnValues(
String searchColumnName,
String searchColumnValue,
String retrievalColumnName
String retrievalColumnName,
long maxCorrelationSetSize,
boolean allowNonKeyColumnSearch
)
{
int filterColumnPosition = table.rowSignature().indexOf(searchColumnName);
@ -102,15 +104,26 @@ public class IndexedTableJoinable implements Joinable
for (int i = 0; i < rowIndex.size(); i++) {
int rowNum = rowIndex.getInt(i);
correlatedValues.add(reader.read(rowNum).toString());
if (correlatedValues.size() > maxCorrelationSetSize) {
return ImmutableSet.of();
}
}
return correlatedValues;
} else {
if (!allowNonKeyColumnSearch) {
return ImmutableSet.of();
}
IndexedTable.Reader dimNameReader = table.columnReader(filterColumnPosition);
IndexedTable.Reader correlatedColumnReader = table.columnReader(correlatedColumnPosition);
for (int i = 0; i < table.numRows(); i++) {
if (searchColumnValue.equals(dimNameReader.read(i).toString())) {
correlatedValues.add(correlatedColumnReader.read(i).toString());
}
if (correlatedValues.size() > maxCorrelationSetSize) {
return ImmutableSet.of();
}
}
return correlatedValues;

View File

@ -23,8 +23,12 @@ import com.google.common.collect.ImmutableList;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.lookup.LookupExtractor;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.segment.join.lookup.LookupJoinable;
import org.apache.druid.segment.join.table.IndexedTable;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
@ -182,9 +186,20 @@ public class BaseHashJoinSegmentStorageAdapterTest
protected HashJoinSegmentStorageAdapter makeFactToCountrySegment()
{
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT)),
VirtualColumns.EMPTY,
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY
);
return new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT))
ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT)),
preAnalysis
);
}

View File

@ -27,13 +27,17 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.filter.ExpressionDimFilter;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.filter.OrDimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.filter.SelectorFilter;
import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.segment.join.lookup.LookupJoinable;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
@ -41,6 +45,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorageAdapterTest
{
@ -295,10 +300,23 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
@Test
public void test_makeCursors_factToCountryLeft()
{
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT));
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
VirtualColumns.EMPTY,
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT))
joinableClauses,
preAnalysis
).makeCursors(
null,
Intervals.ETERNITY,
@ -351,10 +369,22 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
@Test
public void test_makeCursors_factToCountryInner()
{
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryOnIsoCode(JoinType.INNER));
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
VirtualColumns.EMPTY,
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(factToCountryOnIsoCode(JoinType.INNER))
joinableClauses,
preAnalysis
).makeCursors(
null,
Intervals.ETERNITY,
@ -401,10 +431,22 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
@Test
public void test_makeCursors_factToCountryInnerUsingLookup()
{
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryNameUsingIsoCodeLookup(JoinType.INNER));
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
VirtualColumns.EMPTY,
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(factToCountryNameUsingIsoCodeLookup(JoinType.INNER))
joinableClauses,
preAnalysis
).makeCursors(
null,
Intervals.ETERNITY,
@ -452,13 +494,25 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
{
// In non-SQL-compatible mode, we get an extra row, since the 'null' countryNumber for "Talk:Oswald Tilghman"
// is interpreted as 0 (a.k.a. Australia).
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryOnNumber(JoinType.INNER));
Filter filter = new SelectorDimFilter("channel", "#en.wikipedia", null).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
VirtualColumns.EMPTY,
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(factToCountryOnNumber(JoinType.INNER))
joinableClauses,
preAnalysis
).makeCursors(
new SelectorDimFilter("channel", "#en.wikipedia", null).toFilter(),
filter,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
@ -510,13 +564,25 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
{
// In non-SQL-compatible mode, we get an extra row, since the 'null' countryNumber for "Talk:Oswald Tilghman"
// is interpreted as 0 (a.k.a. Australia).
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryNameUsingNumberLookup(JoinType.INNER));
Filter filter = new SelectorDimFilter("channel", "#en.wikipedia", null).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
VirtualColumns.EMPTY,
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(factToCountryNameUsingNumberLookup(JoinType.INNER))
joinableClauses,
preAnalysis
).makeCursors(
new SelectorDimFilter("channel", "#en.wikipedia", null).toFilter(),
filter,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
@ -564,12 +630,25 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
@Test
public void test_makeCursors_factToCountryLeftWithFilterOnFacts()
{
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT));
Filter filter = new SelectorDimFilter("channel", "#de.wikipedia", null).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
VirtualColumns.EMPTY,
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT))
joinableClauses,
preAnalysis
).makeCursors(
new SelectorDimFilter("channel", "#de.wikipedia", null).toFilter(),
filter,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
@ -592,12 +671,25 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
@Test
public void test_makeCursors_factToCountryRightWithFilterOnLeftIsNull()
{
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryOnIsoCode(JoinType.RIGHT));
Filter filter = new SelectorDimFilter("channel", null, null).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
VirtualColumns.EMPTY,
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(factToCountryOnIsoCode(JoinType.RIGHT))
joinableClauses,
preAnalysis
).makeCursors(
new SelectorDimFilter("channel", null, null).toFilter(),
filter,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
@ -622,12 +714,25 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
@Test
public void test_makeCursors_factToCountryFullWithFilterOnLeftIsNull()
{
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryOnIsoCode(JoinType.FULL));
Filter filter = new SelectorDimFilter("channel", null, null).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
VirtualColumns.EMPTY,
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(factToCountryOnIsoCode(JoinType.FULL))
joinableClauses,
preAnalysis
).makeCursors(
new SelectorDimFilter("channel", null, null).toFilter(),
filter,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
@ -652,16 +757,30 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
@Test
public void test_makeCursors_factToCountryRightWithFilterOnJoinable()
{
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryOnIsoCode(JoinType.RIGHT));
Filter filter = new SelectorDimFilter(
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName",
"Germany",
null
).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
VirtualColumns.EMPTY,
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(factToCountryOnIsoCode(JoinType.RIGHT))
joinableClauses,
preAnalysis
).makeCursors(
new SelectorDimFilter(
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName",
"Germany",
null
).toFilter(),
filter,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
@ -685,16 +804,31 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
@Test
public void test_makeCursors_factToCountryLeftWithFilterOnJoinable()
{
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT));
Filter filter = new OrDimFilter(
new SelectorDimFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryIsoCode", "DE", null),
new SelectorDimFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName", "Norway", null),
new SelectorDimFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryNumber", "10", null)
).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
VirtualColumns.EMPTY,
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT))
joinableClauses,
preAnalysis
).makeCursors(
new OrDimFilter(
new SelectorDimFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryIsoCode", "DE", null),
new SelectorDimFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName", "Norway", null),
new SelectorDimFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryNumber", "10", null)
).toFilter(),
filter,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
@ -719,15 +853,29 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
@Test
public void test_makeCursors_factToCountryLeftWithFilterOnJoinableUsingLookup()
{
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryNameUsingIsoCodeLookup(JoinType.LEFT));
Filter filter = new OrDimFilter(
new SelectorDimFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "k", "DE", null),
new SelectorDimFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "v", "Norway", null)
).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
VirtualColumns.EMPTY,
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(factToCountryNameUsingIsoCodeLookup(JoinType.LEFT))
joinableClauses,
preAnalysis
).makeCursors(
new OrDimFilter(
new SelectorDimFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "k", "DE", null),
new SelectorDimFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "v", "Norway", null)
).toFilter(),
filter,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
@ -753,26 +901,41 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
// Join condition => always true.
// Filter => Fact to countries on countryIsoCode.
List<JoinableClause> joinableClauses = ImmutableList.of(
new JoinableClause(
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
new IndexedTableJoinable(countriesTable),
JoinType.INNER,
JoinConditionAnalysis.forExpression(
"1",
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
ExprMacroTable.nil()
)
)
);
Filter filter = new ExpressionDimFilter(
StringUtils.format("\"%scountryIsoCode\" == countryIsoCode", FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX),
ExprMacroTable.nil()
).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
VirtualColumns.EMPTY,
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(
new JoinableClause(
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
new IndexedTableJoinable(countriesTable),
JoinType.INNER,
JoinConditionAnalysis.forExpression(
"1",
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
ExprMacroTable.nil()
)
)
)
joinableClauses,
preAnalysis
).makeCursors(
new ExpressionDimFilter(
StringUtils.format("\"%scountryIsoCode\" == countryIsoCode", FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX),
ExprMacroTable.nil()
).toFilter(),
filter,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
@ -817,13 +980,26 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
@Test
public void test_makeCursors_factToRegionToCountryLeft()
{
List<JoinableClause> joinableClauses = ImmutableList.of(
factToRegion(JoinType.LEFT),
regionToCountry(JoinType.LEFT)
);
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
VirtualColumns.EMPTY,
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(
factToRegion(JoinType.LEFT),
regionToCountry(JoinType.LEFT)
)
joinableClauses,
preAnalysis
).makeCursors(
null,
Intervals.ETERNITY,
@ -874,23 +1050,36 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
@Test
public void test_makeCursors_factToCountryAlwaysTrue()
{
List<JoinableClause> joinableClauses = ImmutableList.of(
new JoinableClause(
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
new IndexedTableJoinable(countriesTable),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
"1",
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
ExprMacroTable.nil()
)
)
);
Filter filter = new SelectorDimFilter("channel", "#de.wikipedia", null).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
VirtualColumns.EMPTY,
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(
new JoinableClause(
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
new IndexedTableJoinable(countriesTable),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
"1",
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
ExprMacroTable.nil()
)
)
)
joinableClauses,
preAnalysis
).makeCursors(
new SelectorDimFilter("channel", "#de.wikipedia", null).toFilter(),
filter,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
@ -927,23 +1116,38 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
@Test
public void test_makeCursors_factToCountryAlwaysFalse()
{
List<JoinableClause> joinableClauses = ImmutableList.of(
new JoinableClause(
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
new IndexedTableJoinable(countriesTable),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
"0",
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
ExprMacroTable.nil()
)
)
);
Filter filter = new SelectorDimFilter("channel", "#de.wikipedia", null).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
VirtualColumns.EMPTY,
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(
new JoinableClause(
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
new IndexedTableJoinable(countriesTable),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
"0",
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
ExprMacroTable.nil()
)
)
)
joinableClauses,
preAnalysis
).makeCursors(
new SelectorDimFilter("channel", "#de.wikipedia", null).toFilter(),
filter,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
@ -963,23 +1167,38 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
@Test
public void test_makeCursors_factToCountryAlwaysTrueUsingLookup()
{
List<JoinableClause> joinableClauses = ImmutableList.of(
new JoinableClause(
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
LookupJoinable.wrap(countryIsoCodeToNameLookup),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
"1",
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
ExprMacroTable.nil()
)
)
);
Filter filter = new SelectorDimFilter("channel", "#de.wikipedia", null).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
VirtualColumns.EMPTY,
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(
new JoinableClause(
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
LookupJoinable.wrap(countryIsoCodeToNameLookup),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
"1",
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
ExprMacroTable.nil()
)
)
)
joinableClauses,
preAnalysis
).makeCursors(
new SelectorDimFilter("channel", "#de.wikipedia", null).toFilter(),
filter,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
@ -1016,23 +1235,38 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
@Test
public void test_makeCursors_factToCountryAlwaysFalseUsingLookup()
{
List<JoinableClause> joinableClauses = ImmutableList.of(
new JoinableClause(
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
LookupJoinable.wrap(countryIsoCodeToNameLookup),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
"0",
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
ExprMacroTable.nil()
)
)
);
Filter filter = new SelectorDimFilter("channel", "#de.wikipedia", null).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
VirtualColumns.EMPTY,
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(
new JoinableClause(
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
LookupJoinable.wrap(countryIsoCodeToNameLookup),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
"0",
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
ExprMacroTable.nil()
)
)
)
joinableClauses,
preAnalysis
).makeCursors(
new SelectorDimFilter("channel", "#de.wikipedia", null).toFilter(),
filter,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
@ -1052,34 +1286,49 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
@Test
public void test_makeCursors_factToCountryUsingVirtualColumn()
{
List<JoinableClause> joinableClauses = ImmutableList.of(
new JoinableClause(
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
new IndexedTableJoinable(countriesTable),
JoinType.INNER,
JoinConditionAnalysis.forExpression(
StringUtils.format("\"%scountryIsoCode\" == virtual", FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX),
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
ExprMacroTable.nil()
)
)
);
VirtualColumns virtualColumns = VirtualColumns.create(
Collections.singletonList(
new ExpressionVirtualColumn(
"virtual",
"concat(substring(countryIsoCode, 0, 1),'L')",
ValueType.STRING,
ExprMacroTable.nil()
)
)
);
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
virtualColumns,
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(
new JoinableClause(
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
new IndexedTableJoinable(countriesTable),
JoinType.INNER,
JoinConditionAnalysis.forExpression(
StringUtils.format("\"%scountryIsoCode\" == virtual", FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX),
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
ExprMacroTable.nil()
)
)
)
joinableClauses,
preAnalysis
).makeCursors(
null,
Intervals.ETERNITY,
VirtualColumns.create(
Collections.singletonList(
new ExpressionVirtualColumn(
"virtual",
"concat(substring(countryIsoCode, 0, 1),'L')",
ValueType.STRING,
ExprMacroTable.nil()
)
)
),
virtualColumns,
Granularities.ALL,
false,
null
@ -1103,24 +1352,37 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
@Test
public void test_makeCursors_factToCountryUsingExpression()
{
List<JoinableClause> joinableClauses = ImmutableList.of(
new JoinableClause(
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
new IndexedTableJoinable(countriesTable),
JoinType.INNER,
JoinConditionAnalysis.forExpression(
StringUtils.format(
"\"%scountryIsoCode\" == concat(substring(countryIsoCode, 0, 1),'L')",
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX
),
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
ExprMacroTable.nil()
)
)
);
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
VirtualColumns.EMPTY,
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(
new JoinableClause(
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
new IndexedTableJoinable(countriesTable),
JoinType.INNER,
JoinConditionAnalysis.forExpression(
StringUtils.format(
"\"%scountryIsoCode\" == concat(substring(countryIsoCode, 0, 1),'L')",
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX
),
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
ExprMacroTable.nil()
)
)
)
joinableClauses,
preAnalysis
).makeCursors(
null,
Intervals.ETERNITY,
@ -1148,27 +1410,41 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
public void test_makeCursors_factToRegionTheWrongWay()
{
// Joins using only regionIsoCode, which is wrong since they are not unique internationally.
List<JoinableClause> joinableClauses = ImmutableList.of(
new JoinableClause(
FACT_TO_REGION_PREFIX,
new IndexedTableJoinable(regionsTable),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format(
"\"%sregionIsoCode\" == regionIsoCode",
FACT_TO_REGION_PREFIX
),
FACT_TO_REGION_PREFIX,
ExprMacroTable.nil()
)
)
);
Filter filter = new SelectorDimFilter("regionIsoCode", "VA", null).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
VirtualColumns.EMPTY,
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(
new JoinableClause(
FACT_TO_REGION_PREFIX,
new IndexedTableJoinable(regionsTable),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format(
"\"%sregionIsoCode\" == regionIsoCode",
FACT_TO_REGION_PREFIX
),
FACT_TO_REGION_PREFIX,
ExprMacroTable.nil()
)
)
)
joinableClauses,
preAnalysis
).makeCursors(
new SelectorDimFilter("regionIsoCode", "VA", null).toFilter(),
filter,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
@ -1199,21 +1475,34 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Cannot build hash-join matcher on non-equi-join condition: x == y");
List<JoinableClause> joinableClauses = ImmutableList.of(
new JoinableClause(
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
new IndexedTableJoinable(countriesTable),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
"x == y",
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
ExprMacroTable.nil()
)
)
);
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
VirtualColumns.EMPTY,
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY
);
JoinTestHelper.readCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(
new JoinableClause(
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
new IndexedTableJoinable(countriesTable),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
"x == y",
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
ExprMacroTable.nil()
)
)
)
joinableClauses,
preAnalysis
).makeCursors(
null,
Intervals.ETERNITY,
@ -1232,22 +1521,34 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Cannot build hash-join matcher on non-key-based condition: "
+ "Equality{leftExpr=x, rightColumn='countryName'}");
List<JoinableClause> joinableClauses = ImmutableList.of(
new JoinableClause(
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
new IndexedTableJoinable(countriesTable),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format("x == \"%scountryName\"", FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX),
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
ExprMacroTable.nil()
)
)
);
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
VirtualColumns.EMPTY,
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY
);
JoinTestHelper.readCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(
new JoinableClause(
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
new IndexedTableJoinable(countriesTable),
JoinType.LEFT,
JoinConditionAnalysis.forExpression(
StringUtils.format("x == \"%scountryName\"", FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX),
FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX,
ExprMacroTable.nil()
)
)
)
joinableClauses,
preAnalysis
).makeCursors(
null,
Intervals.ETERNITY,
@ -1263,12 +1564,26 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
@Test
public void test_makeCursors_factToCountryLeft_filterExcludesAllLeftRows()
{
Filter originalFilter = new SelectorFilter("page", "this matches nothing");
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT));
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
VirtualColumns.EMPTY,
originalFilter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT))
joinableClauses,
preAnalysis
).makeCursors(
new SelectorFilter("page", "this matches nothing"),
originalFilter,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,

View File

@ -24,6 +24,9 @@ import org.apache.druid.common.config.NullHandling;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
import org.apache.druid.timeline.SegmentId;
import org.hamcrest.CoreMatchers;
@ -36,6 +39,7 @@ import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.List;
public class HashJoinSegmentTest
{
@ -62,24 +66,35 @@ public class HashJoinSegmentTest
SegmentId.dummy("facts")
);
List<JoinableClause> joinableClauses = ImmutableList.of(
new JoinableClause(
"j0.",
new IndexedTableJoinable(JoinTestHelper.createCountriesIndexedTable()),
JoinType.LEFT,
JoinConditionAnalysis.forExpression("1", "j0.", ExprMacroTable.nil())
),
new JoinableClause(
"j1.",
new IndexedTableJoinable(JoinTestHelper.createRegionsIndexedTable()),
JoinType.LEFT,
JoinConditionAnalysis.forExpression("1", "j1.", ExprMacroTable.nil())
)
);
JoinFilterPreAnalysis joinFilterPreAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
VirtualColumns.EMPTY,
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY
);
hashJoinSegment = new HashJoinSegment(
baseSegment,
ImmutableList.of(
new JoinableClause(
"j0.",
new IndexedTableJoinable(JoinTestHelper.createCountriesIndexedTable()),
JoinType.LEFT,
JoinConditionAnalysis.forExpression("1", "j0.", ExprMacroTable.nil())
),
new JoinableClause(
"j1.",
new IndexedTableJoinable(JoinTestHelper.createRegionsIndexedTable()),
JoinType.LEFT,
JoinConditionAnalysis.forExpression("1", "j1.", ExprMacroTable.nil())
)
),
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE
joinableClauses,
joinFilterPreAnalysis
);
}
@ -89,11 +104,22 @@ public class HashJoinSegmentTest
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("'clauses' is empty, no need to create HashJoinSegment");
List<JoinableClause> joinableClauses = ImmutableList.of();
JoinFilterPreAnalysis joinFilterPreAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
VirtualColumns.EMPTY,
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY
);
final HashJoinSegment ignored = new HashJoinSegment(
baseSegment,
ImmutableList.of(),
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE
joinableClauses,
joinFilterPreAnalysis
);
}

View File

@ -28,6 +28,7 @@ import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.extraction.MapLookupExtractor;
import org.apache.druid.query.planning.PreJoinableClause;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.join.lookup.LookupJoinable;
import org.junit.Assert;
@ -99,7 +100,11 @@ public class JoinablesTest
NoopJoinableFactory.INSTANCE,
new AtomicLong(),
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY,
null,
VirtualColumns.EMPTY
);
Assert.assertSame(Function.identity(), segmentMapFn);
@ -124,7 +129,11 @@ public class JoinablesTest
NoopJoinableFactory.INSTANCE,
new AtomicLong(),
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY,
null,
VirtualColumns.EMPTY
);
}
@ -157,7 +166,11 @@ public class JoinablesTest
},
new AtomicLong(),
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE_KEY,
null,
VirtualColumns.EMPTY
);
Assert.assertNotSame(Function.identity(), segmentMapFn);

View File

@ -176,7 +176,11 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
joinableFactory,
cpuTimeAccumulator,
QueryContexts.getEnableJoinFilterPushDown(query),
QueryContexts.getEnableJoinFilterRewrite(query)
QueryContexts.getEnableJoinFilterRewrite(query),
QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(query),
QueryContexts.getJoinFilterRewriteMaxSize(query),
query.getFilter() == null ? null : query.getFilter().toFilter(),
query.getVirtualColumns()
);
Iterable<QueryRunner<T>> perSegmentRunners = Iterables.transform(

View File

@ -88,7 +88,11 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
joinableFactory,
cpuAccumulator,
QueryContexts.getEnableJoinFilterPushDown(query),
QueryContexts.getEnableJoinFilterRewrite(query)
QueryContexts.getEnableJoinFilterRewrite(query),
QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(query),
QueryContexts.getJoinFilterRewriteMaxSize(query),
query.getFilter() == null ? null : query.getFilter().toFilter(),
query.getVirtualColumns()
);
final QueryRunner<T> baseRunner = queryRunnerFactory.mergeRunners(

View File

@ -197,7 +197,11 @@ public class ServerManager implements QuerySegmentWalker
joinableFactory,
cpuTimeAccumulator,
QueryContexts.getEnableJoinFilterPushDown(query),
QueryContexts.getEnableJoinFilterRewrite(query)
QueryContexts.getEnableJoinFilterRewrite(query),
QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(query),
QueryContexts.getJoinFilterRewriteMaxSize(query),
query.getFilter() == null ? null : query.getFilter().toFilter(),
query.getVirtualColumns()
);
FunctionalIterable<QueryRunner<T>> queryRunners = FunctionalIterable

View File

@ -407,7 +407,11 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
joinableFactory,
new AtomicLong(),
QueryContexts.getEnableJoinFilterPushDown(query),
QueryContexts.getEnableJoinFilterRewrite(query)
QueryContexts.getEnableJoinFilterRewrite(query),
QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(query),
QueryContexts.getJoinFilterRewriteMaxSize(query),
query.getFilter() == null ? null : query.getFilter().toFilter(),
query.getVirtualColumns()
);
final QueryRunner<T> baseRunner = new FinalizeResultsQueryRunner<>(