Fix join filter rewrites with nested queries (#10015)

* Fix join filter rewrites with nested queries

* Fix test, inspection, coverage

* Remove clauses from group key

* Fix import order

Co-authored-by: Gian Merlino <gianmerlino@gmail.com>
This commit is contained in:
Jonathan Wei 2020-06-18 21:32:29 -07:00 committed by GitHub
parent d644a27f1a
commit 37e150c075
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1623 additions and 1239 deletions

View File

@ -49,9 +49,8 @@ import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.JoinTestHelper;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.join.JoinableClause;
import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.segment.join.filter.JoinableClauses;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterPreAnalysisGroup;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterRewriteConfig;
import org.apache.druid.segment.join.lookup.LookupJoinable;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
@ -141,19 +140,20 @@ public class JoinAndLookupBenchmark
)
)
);
JoinFilterPreAnalysis preAnalysisLookupStringKey = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClausesLookupStringKey),
VirtualColumns.EMPTY,
null,
JoinFilterPreAnalysisGroup preAnalysisGroupLookupStringKey = new JoinFilterPreAnalysisGroup(
new JoinFilterRewriteConfig(
false,
false,
false,
0
),
true
);
hashJoinLookupStringKeySegment = new HashJoinSegment(
ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
joinableClausesLookupStringKey,
preAnalysisLookupStringKey
preAnalysisGroupLookupStringKey
);
List<JoinableClause> joinableClausesLookupLongKey = ImmutableList.of(
@ -168,19 +168,20 @@ public class JoinAndLookupBenchmark
)
)
);
JoinFilterPreAnalysis preAnalysisLookupLongKey = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClausesLookupLongKey),
VirtualColumns.EMPTY,
null,
JoinFilterPreAnalysisGroup preAnalysisGroupLookupLongKey = new JoinFilterPreAnalysisGroup(
new JoinFilterRewriteConfig(
false,
false,
false,
0
),
true
);
hashJoinLookupLongKeySegment = new HashJoinSegment(
ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
joinableClausesLookupLongKey,
preAnalysisLookupLongKey
preAnalysisGroupLookupLongKey
);
List<JoinableClause> joinableClausesIndexedTableStringKey = ImmutableList.of(
@ -195,19 +196,20 @@ public class JoinAndLookupBenchmark
)
)
);
JoinFilterPreAnalysis preAnalysisIndexedTableStringKey = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClausesIndexedTableStringKey),
VirtualColumns.EMPTY,
null,
JoinFilterPreAnalysisGroup preAnalysisGroupIndexedStringKey = new JoinFilterPreAnalysisGroup(
new JoinFilterRewriteConfig(
false,
false,
false,
0
),
true
);
hashJoinIndexedTableStringKeySegment = new HashJoinSegment(
ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
joinableClausesIndexedTableStringKey,
preAnalysisIndexedTableStringKey
preAnalysisGroupIndexedStringKey
);
List<JoinableClause> joinableClausesIndexedTableLonggKey = ImmutableList.of(
@ -222,19 +224,19 @@ public class JoinAndLookupBenchmark
)
)
);
JoinFilterPreAnalysis preAnalysisIndexedTableLongKey = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClausesIndexedTableLonggKey),
VirtualColumns.EMPTY,
null,
JoinFilterPreAnalysisGroup preAnalysisGroupIndexedLongKey = new JoinFilterPreAnalysisGroup(
new JoinFilterRewriteConfig(
false,
false,
false,
0
),
true
);
hashJoinIndexedTableLongKeySegment = new HashJoinSegment(
ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
joinableClausesIndexedTableLonggKey,
preAnalysisIndexedTableLongKey
preAnalysisGroupIndexedLongKey
);
final Map<String, String> countryCodeToNameMap = JoinTestHelper.createCountryIsoCodeToNameLookup().getMap();

View File

@ -249,8 +249,8 @@ public class GroupByQuery extends BaseQuery<ResultRow>
return subtotalsSpec;
}
@Override
@JsonProperty
@Override
public VirtualColumns getVirtualColumns()
{
return virtualColumns;

View File

@ -183,8 +183,8 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
return maxSegmentPartitionsOrderedInMemory;
}
@Override
@JsonProperty
@Override
public VirtualColumns getVirtualColumns()
{
return virtualColumns;

View File

@ -108,8 +108,8 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
return Query.TIMESERIES;
}
@Override
@JsonProperty
@Override
public VirtualColumns getVirtualColumns()
{
return virtualColumns;

View File

@ -113,8 +113,8 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
return TOPN;
}
@Override
@JsonProperty
@Override
public VirtualColumns getVirtualColumns()
{
return virtualColumns;

View File

@ -25,7 +25,7 @@ import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterPreAnalysisGroup;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
@ -44,23 +44,24 @@ public class HashJoinSegment implements SegmentReference
{
private final SegmentReference baseSegment;
private final List<JoinableClause> clauses;
private final JoinFilterPreAnalysis joinFilterPreAnalysis;
private final JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup;
/**
* @param baseSegment The left-hand side base segment
* @param clauses The right-hand side clauses. The caller is responsible for ensuring that there are no
* duplicate prefixes or prefixes that shadow each other across the clauses
* @param joinFilterPreAnalysis Pre-analysis computed by {@link org.apache.druid.segment.join.filter.JoinFilterAnalyzer#computeJoinFilterPreAnalysis}
* @param joinFilterPreAnalysisGroup Pre-analysis group that holds all of the JoinFilterPreAnalysis results within
* the scope of a query
*/
public HashJoinSegment(
SegmentReference baseSegment,
List<JoinableClause> clauses,
JoinFilterPreAnalysis joinFilterPreAnalysis
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup
)
{
this.baseSegment = baseSegment;
this.clauses = clauses;
this.joinFilterPreAnalysis = joinFilterPreAnalysis;
this.joinFilterPreAnalysisGroup = joinFilterPreAnalysisGroup;
// Verify 'clauses' is nonempty (otherwise it's a waste to create this object, and the caller should know)
if (clauses.isEmpty()) {
@ -93,7 +94,7 @@ public class HashJoinSegment implements SegmentReference
@Override
public StorageAdapter asStorageAdapter()
{
return new HashJoinSegmentStorageAdapter(baseSegment.asStorageAdapter(), clauses, joinFilterPreAnalysis);
return new HashJoinSegmentStorageAdapter(baseSegment.asStorageAdapter(), clauses, joinFilterPreAnalysisGroup);
}
@Override

View File

@ -21,7 +21,6 @@ package org.apache.druid.segment.join;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
@ -38,6 +37,7 @@ import org.apache.druid.segment.data.ListIndexed;
import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.segment.join.filter.JoinFilterSplit;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterPreAnalysisGroup;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -47,7 +47,6 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@ -55,22 +54,23 @@ public class HashJoinSegmentStorageAdapter implements StorageAdapter
{
private final StorageAdapter baseAdapter;
private final List<JoinableClause> clauses;
private final JoinFilterPreAnalysis joinFilterPreAnalysis;
private final JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup;
/**
* @param baseAdapter A StorageAdapter for the left-hand side base segment
* @param clauses The right-hand side clauses. The caller is responsible for ensuring that there are no
* duplicate prefixes or prefixes that shadow each other across the clauses
* @param joinFilterPreAnalysisGroup Pre-analysis group that holds all of the JoinFilterPreAnalysis results within
* the scope of a query
*/
HashJoinSegmentStorageAdapter(
StorageAdapter baseAdapter,
List<JoinableClause> clauses,
final JoinFilterPreAnalysis joinFilterPreAnalysis
final JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup
)
{
this.baseAdapter = baseAdapter;
this.clauses = clauses;
this.joinFilterPreAnalysis = joinFilterPreAnalysis;
this.joinFilterPreAnalysisGroup = joinFilterPreAnalysisGroup;
}
@Override
@ -209,13 +209,16 @@ public class HashJoinSegmentStorageAdapter implements StorageAdapter
@Nullable final QueryMetrics<?> queryMetrics
)
{
if (!Objects.equals(joinFilterPreAnalysis.getOriginalFilter(), filter)) {
throw new ISE(
"Filter provided to cursor [%s] does not match join pre-analysis filter [%s]",
JoinFilterPreAnalysis jfpa;
if (joinFilterPreAnalysisGroup.isSingleLevelMode()) {
jfpa = joinFilterPreAnalysisGroup.getPreAnalysisForSingleLevelMode();
} else {
jfpa = joinFilterPreAnalysisGroup.getAnalysis(
filter,
joinFilterPreAnalysis.getOriginalFilter()
virtualColumns
);
}
final List<VirtualColumn> preJoinVirtualColumns = new ArrayList<>();
final List<VirtualColumn> postJoinVirtualColumns = new ArrayList<>();
@ -225,7 +228,7 @@ public class HashJoinSegmentStorageAdapter implements StorageAdapter
postJoinVirtualColumns
);
JoinFilterSplit joinFilterSplit = JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
JoinFilterSplit joinFilterSplit = JoinFilterAnalyzer.splitFilter(jfpa);
preJoinVirtualColumns.addAll(joinFilterSplit.getPushDownVirtualColumns());
// Soon, we will need a way to push filters past a join when possible. This could potentially be done right here

View File

@ -20,17 +20,20 @@
package org.apache.druid.segment.join;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.planning.PreJoinableClause;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.segment.join.filter.JoinableClauses;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterPreAnalysisGroup;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterRewriteConfig;
import org.apache.druid.utils.JvmUtils;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@ -71,33 +74,19 @@ public class Joinables
/**
* Creates a Function that maps base segments to {@link HashJoinSegment} if needed (i.e. if the number of join
* clauses is > 0). If mapping is not needed, this method will return {@link Function#identity()}.
*
* @param clauses pre-joinable clauses
* @param joinableFactory factory for joinables
* @param cpuTimeAccumulator an accumulator that we will add CPU nanos to; this is part of the function to encourage
* @param clauses Pre-joinable clauses
* @param joinableFactory Factory for joinables
* @param cpuTimeAccumulator An accumulator that we will add CPU nanos to; this is part of the function to encourage
* callers to remember to track metrics on CPU time required for creation of Joinables
* @param enableFilterPushDown whether to enable filter push down optimizations to the base segment. In production
* this should generally be {@code QueryContexts.getEnableJoinFilterPushDown(query)}.
* @param enableFilterRewrite whether to enable filter rewrite optimizations for RHS columns. In production
* this should generally be {@code QueryContexts.getEnableJoinFilterRewrite(query)}.
* @param enableRewriteValueColumnFilters whether to enable filter rewrite optimizations for RHS columns that are not
* key columns. In production this should generally
* be {@code QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(query)}.
* @param filterRewriteMaxSize the max allowed size of correlated value sets for RHS rewrites. In production
* this should generally be {@code QueryContexts.getJoinFilterRewriteMaxSize(query)}.
* @param originalFilter The original filter from the query.
* @param virtualColumns The virtual columns from the query.
* @param joinFilterRewriteConfig Configuration options for the join filter rewrites
* @param query The query being processed
*/
public static Function<SegmentReference, SegmentReference> createSegmentMapFn(
final List<PreJoinableClause> clauses,
final JoinableFactory joinableFactory,
final AtomicLong cpuTimeAccumulator,
final boolean enableFilterPushDown,
final boolean enableFilterRewrite,
final boolean enableRewriteValueColumnFilters,
final long filterRewriteMaxSize,
final Filter originalFilter,
final VirtualColumns virtualColumns
final JoinFilterRewriteConfig joinFilterRewriteConfig,
final Query query
)
{
// compute column correlations here and RHS correlated values
@ -108,21 +97,85 @@ public class Joinables
return Function.identity();
} else {
final JoinableClauses joinableClauses = JoinableClauses.createClauses(clauses, joinableFactory);
JoinFilterPreAnalysis jfpa = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
joinableClauses,
virtualColumns,
originalFilter,
enableFilterPushDown,
enableFilterRewrite,
enableRewriteValueColumnFilters,
filterRewriteMaxSize
List<Query> joinQueryLevels = new ArrayList<>();
Joinables.gatherAllJoinQueryLevels(query, joinQueryLevels);
final JoinFilterPreAnalysisGroup preAnalysisGroup = new JoinFilterPreAnalysisGroup(
joinFilterRewriteConfig,
joinQueryLevels.size() <= 1 // use single-level mode if there's one or fewer query levels with joins
);
return baseSegment -> new HashJoinSegment(baseSegment, joinableClauses.getJoinableClauses(), jfpa);
for (Query joinQuery : joinQueryLevels) {
preAnalysisGroup.computeJoinFilterPreAnalysisIfAbsent(
joinQuery.getFilter() == null ? null : joinQuery.getFilter().toFilter(),
joinableClauses.getJoinableClauses(),
joinQuery.getVirtualColumns()
);
}
return baseSegment -> new HashJoinSegment(baseSegment, joinableClauses.getJoinableClauses(), preAnalysisGroup);
}
}
);
}
/**
* Walks a query and its subqueries, finding any queries that read from a JoinDatasource,
* and adding them to a list provided by the caller.
*
* @param currentLevelQuery The query to analyze
* @param allJoinQueryLevels A mutable list provided by the caller.
*/
public static void gatherAllJoinQueryLevels(Query currentLevelQuery, List<Query> allJoinQueryLevels)
{
DataSource currentDatasource = currentLevelQuery.getDataSource();
if (currentDatasource instanceof QueryDataSource) {
gatherAllJoinQueryLevels(
((QueryDataSource) currentDatasource).getQuery(),
allJoinQueryLevels
);
}
if (currentDatasource instanceof JoinDataSource) {
allJoinQueryLevels.add(currentLevelQuery);
gatherAllJoinQueryLevelsJoinDatasourceHelper(
(JoinDataSource) currentDatasource,
allJoinQueryLevels
);
}
}
private static void gatherAllJoinQueryLevelsJoinDatasourceHelper(
JoinDataSource joinDatasource,
List<Query> allJoinQueryLevels
)
{
if (joinDatasource.getLeft() instanceof QueryDataSource) {
gatherAllJoinQueryLevels(
((QueryDataSource) joinDatasource.getLeft()).getQuery(),
allJoinQueryLevels
);
}
if (joinDatasource.getLeft() instanceof JoinDataSource) {
gatherAllJoinQueryLevelsJoinDatasourceHelper(
(JoinDataSource) joinDatasource.getLeft(),
allJoinQueryLevels
);
}
if (joinDatasource.getRight() instanceof QueryDataSource) {
gatherAllJoinQueryLevels(
((QueryDataSource) joinDatasource.getRight()).getQuery(),
allJoinQueryLevels
);
}
if (joinDatasource.getRight() instanceof JoinDataSource) {
gatherAllJoinQueryLevelsJoinDatasourceHelper(
(JoinDataSource) joinDatasource.getRight(),
allJoinQueryLevels
);
}
}
/**
* Check if any prefixes in the provided list duplicate or shadow each other.
*

View File

@ -32,6 +32,7 @@ import org.apache.druid.segment.filter.FalseFilter;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.filter.OrFilter;
import org.apache.druid.segment.filter.SelectorFilter;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterRewriteConfig;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import java.util.ArrayList;
@ -85,23 +86,14 @@ public class JoinFilterAnalyzer
* @param joinableClauses The joinable clauses from the query
* @param virtualColumns The virtual columns from the query
* @param originalFilter The original filter from the query
* @param enableFilterPushDown Whether to enable filter push down
* @param enableFilterRewrite Whether to enable rewrites of filters involving RHS columns
* @param enableRewriteValueColumnFilters Whether to enable rewrites of filters invovling RHS non-key columns
* @param filterRewriteMaxSize The maximum size of the correlated value set for rewritten filters.
* If the correlated value set size exceeds this, the filter will not be
* rewritten and pushed down.
*
* @param joinFilterRewriteConfig Configuration options for the join rewrites
* @return A JoinFilterPreAnalysis containing information determined in this pre-analysis step.
*/
public static JoinFilterPreAnalysis computeJoinFilterPreAnalysis(
JoinableClauses joinableClauses,
VirtualColumns virtualColumns,
Filter originalFilter,
boolean enableFilterPushDown,
boolean enableFilterRewrite,
boolean enableRewriteValueColumnFilters,
long filterRewriteMaxSize
JoinFilterRewriteConfig joinFilterRewriteConfig
)
{
final List<VirtualColumn> preJoinVirtualColumns = new ArrayList<>();
@ -109,10 +101,8 @@ public class JoinFilterAnalyzer
joinableClauses.splitVirtualColumns(virtualColumns, preJoinVirtualColumns, postJoinVirtualColumns);
JoinFilterPreAnalysis.Builder preAnalysisBuilder =
new JoinFilterPreAnalysis.Builder(joinableClauses, originalFilter, postJoinVirtualColumns)
.withEnableFilterPushDown(enableFilterPushDown)
.withEnableFilterRewrite(enableFilterRewrite);
if (originalFilter == null || !enableFilterPushDown) {
new JoinFilterPreAnalysis.Builder(joinFilterRewriteConfig, joinableClauses, originalFilter, postJoinVirtualColumns);
if (originalFilter == null || !joinFilterRewriteConfig.isEnableFilterPushDown()) {
return preAnalysisBuilder.build();
}
@ -135,7 +125,7 @@ public class JoinFilterAnalyzer
preAnalysisBuilder
.withNormalizedBaseTableClauses(normalizedBaseTableClauses)
.withNormalizedJoinTableClauses(normalizedJoinTableClauses);
if (!enableFilterRewrite) {
if (!joinFilterRewriteConfig.isEnableFilterRewrite()) {
return preAnalysisBuilder.build();
}
@ -146,8 +136,8 @@ public class JoinFilterAnalyzer
normalizedJoinTableClauses,
equiconditions,
joinableClauses,
enableRewriteValueColumnFilters,
filterRewriteMaxSize
joinFilterRewriteConfig.isEnableRewriteValueColumnFilters(),
joinFilterRewriteConfig.getFilterRewriteMaxSize()
);
return preAnalysisBuilder.withCorrelations(correlations)

View File

@ -24,6 +24,7 @@ import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.join.Equality;
import org.apache.druid.segment.join.JoinableClause;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterRewriteConfig;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@ -38,11 +39,11 @@ import java.util.Set;
* A JoinFilterPreAnalysis contains filter push down/rewrite information that does not have per-segment dependencies.
* This includes:
* - The query's JoinableClauses list
* - The query's original filter (if any)
* - The original filter that an analysis was performed ons
* - A list of filter clauses from the original filter's CNF representation that only reference the base table
* - A list of filter clauses from the original filter's CNF representation that reference RHS join tables
* - A list of virtual columns that can only be computed post-join
* - Control flag booleans for whether filter push down and RHS rewrites are enabled.
* - The JoinFilterRewriteConfig that this pre-analysis is associated with.
*/
public class JoinFilterPreAnalysis
{
@ -51,10 +52,9 @@ public class JoinFilterPreAnalysis
private final List<Filter> normalizedBaseTableClauses;
private final List<Filter> normalizedJoinTableClauses;
private final JoinFilterCorrelations correlations;
private final boolean enableFilterPushDown;
private final boolean enableFilterRewrite;
private final List<VirtualColumn> postJoinVirtualColumns;
private final Equiconditions equiconditions;
private final JoinFilterRewriteConfig rewriteConfig;
private JoinFilterPreAnalysis(
final JoinableClauses joinableClauses,
@ -63,9 +63,8 @@ public class JoinFilterPreAnalysis
final List<Filter> normalizedBaseTableClauses,
final List<Filter> normalizedJoinTableClauses,
JoinFilterCorrelations correlations,
final boolean enableFilterPushDown,
final boolean enableFilterRewrite,
final Equiconditions equiconditions
final Equiconditions equiconditions,
final JoinFilterRewriteConfig rewriteConfig
)
{
this.joinableClauses = joinableClauses;
@ -74,8 +73,7 @@ public class JoinFilterPreAnalysis
this.normalizedBaseTableClauses = normalizedBaseTableClauses;
this.normalizedJoinTableClauses = normalizedJoinTableClauses;
this.correlations = correlations;
this.enableFilterPushDown = enableFilterPushDown;
this.enableFilterRewrite = enableFilterRewrite;
this.rewriteConfig = rewriteConfig;
this.equiconditions = equiconditions;
}
@ -116,12 +114,12 @@ public class JoinFilterPreAnalysis
public boolean isEnableFilterPushDown()
{
return enableFilterPushDown;
return rewriteConfig.isEnableFilterPushDown();
}
public boolean isEnableFilterRewrite()
{
return enableFilterRewrite;
return rewriteConfig.isEnableFilterRewrite();
}
public Equiconditions getEquiconditions()
@ -134,22 +132,23 @@ public class JoinFilterPreAnalysis
*/
public static class Builder
{
@Nonnull private final JoinFilterRewriteConfig rewriteConfig;
@Nonnull private final JoinableClauses joinableClauses;
@Nullable private final Filter originalFilter;
@Nullable private List<Filter> normalizedBaseTableClauses;
@Nullable private List<Filter> normalizedJoinTableClauses;
@Nullable private JoinFilterCorrelations correlations;
private boolean enableFilterPushDown = false;
private boolean enableFilterRewrite = false;
@Nonnull private final List<VirtualColumn> postJoinVirtualColumns;
@Nonnull private Equiconditions equiconditions = new Equiconditions(Collections.emptyMap());
public Builder(
@Nonnull JoinFilterRewriteConfig rewriteConfig,
@Nonnull JoinableClauses joinableClauses,
@Nullable Filter originalFilter,
@Nonnull List<VirtualColumn> postJoinVirtualColumns
)
{
this.rewriteConfig = rewriteConfig;
this.joinableClauses = joinableClauses;
this.originalFilter = originalFilter;
this.postJoinVirtualColumns = postJoinVirtualColumns;
@ -175,18 +174,6 @@ public class JoinFilterPreAnalysis
return this;
}
public Builder withEnableFilterPushDown(boolean enableFilterPushDown)
{
this.enableFilterPushDown = enableFilterPushDown;
return this;
}
public Builder withEnableFilterRewrite(boolean enableFilterRewrite)
{
this.enableFilterRewrite = enableFilterRewrite;
return this;
}
public Equiconditions computeEquiconditionsFromJoinableClauses()
{
Map<String, Set<Expr>> equiconditionsMap = new HashMap<>();
@ -212,9 +199,8 @@ public class JoinFilterPreAnalysis
normalizedBaseTableClauses,
normalizedJoinTableClauses,
correlations,
enableFilterPushDown,
enableFilterRewrite,
equiconditions
equiconditions,
rewriteConfig
);
}

View File

@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join.filter.rewrite;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.join.JoinableClause;
import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.segment.join.filter.JoinableClauses;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* A JoinFilterPreAnalysisGroup holds all of the JoinFilterPreAnalysis objects for a given query and
* also stores the per-query parameters that control the filter rewrite operations (from the query context).
*
* The analyses map is keyed by (Filter, JoinableClause list, VirtualColumns): each Filter in the map belongs to a
* separate level of query (e.g. outer query, subquery level 1, etc.)
*
* If there is only a single Filter, then this class does not use the analyses map, instead of using a single reference
* for efficiency reasons.
*/
public class JoinFilterPreAnalysisGroup
{
private final JoinFilterRewriteConfig joinFilterRewriteConfig;
private final Map<JoinFilterPreAnalysisGroupKey, JoinFilterPreAnalysis> analyses;
private final boolean isSingleLevelMode;
/**
* Hashing and comparing filters can be expensive for large filters, so if we're only dealing with
* a single level of join query, then we can be more efficient by using a single reference instead of a map.
*/
private JoinFilterPreAnalysis preAnalysisForSingleLevelMode;
public JoinFilterPreAnalysisGroup(
JoinFilterRewriteConfig joinFilterRewriteConfig,
boolean isSingleLevelMode
)
{
this.joinFilterRewriteConfig = joinFilterRewriteConfig;
this.analyses = new HashMap<>();
this.isSingleLevelMode = isSingleLevelMode;
}
public boolean isSingleLevelMode()
{
return isSingleLevelMode;
}
public JoinFilterPreAnalysis computeJoinFilterPreAnalysisIfAbsent(
Filter filter,
List<JoinableClause> clauses,
VirtualColumns virtualColumns
)
{
if (isSingleLevelMode) {
preAnalysisForSingleLevelMode = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(clauses),
virtualColumns,
filter,
joinFilterRewriteConfig
);
return preAnalysisForSingleLevelMode;
}
JoinFilterPreAnalysisGroupKey key = new JoinFilterPreAnalysisGroupKey(filter, virtualColumns);
return analyses.computeIfAbsent(
key,
(groupKey) -> {
return JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(clauses),
virtualColumns,
filter,
joinFilterRewriteConfig
);
}
);
}
public JoinFilterPreAnalysis getAnalysis(
Filter filter,
VirtualColumns virtualColumns
)
{
JoinFilterPreAnalysisGroupKey key = new JoinFilterPreAnalysisGroupKey(filter, virtualColumns);
return analyses.get(key);
}
public JoinFilterPreAnalysis getPreAnalysisForSingleLevelMode()
{
return preAnalysisForSingleLevelMode;
}
public static class JoinFilterPreAnalysisGroupKey
{
private final Filter filter;
private final VirtualColumns virtualColumns;
public JoinFilterPreAnalysisGroupKey(
Filter filter,
VirtualColumns virtualColumns
)
{
this.filter = filter;
this.virtualColumns = virtualColumns;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
JoinFilterPreAnalysisGroupKey that = (JoinFilterPreAnalysisGroupKey) o;
return Objects.equals(filter, that.filter) &&
Objects.equals(virtualColumns, that.virtualColumns);
}
@Override
public int hashCode()
{
return Objects.hash(filter, virtualColumns);
}
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join.filter.rewrite;
/**
* A config class that holds properties that control how join filter rewrites behave.
*/
public class JoinFilterRewriteConfig
{
/**
* Whether to enable filter push down optimizations to the base segment.
* In production this should generally be {@code QueryContexts.getEnableJoinFilterPushDown(query)}.
*/
private final boolean enableFilterPushDown;
/**
* Whether to enable filter rewrite optimizations for RHS columns.
* In production this should generally be {@code QueryContexts.getEnableJoinFilterRewrite(query)}.
*/
private final boolean enableFilterRewrite;
/**
* Whether to enable filter rewrite optimizations for RHS columns that are not key columns.
* In production this should generally be {@code QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(query)}.
*/
private final boolean enableRewriteValueColumnFilters;
/**
* The max allowed size of correlated value sets for RHS rewrites. In production
* This should generally be {@code QueryContexts.getJoinFilterRewriteMaxSize(query)}.
*/
private final long filterRewriteMaxSize;
public JoinFilterRewriteConfig(
boolean enableFilterPushDown,
boolean enableFilterRewrite,
boolean enableRewriteValueColumnFilters,
long filterRewriteMaxSize
)
{
this.enableFilterPushDown = enableFilterPushDown;
this.enableFilterRewrite = enableFilterRewrite;
this.enableRewriteValueColumnFilters = enableRewriteValueColumnFilters;
this.filterRewriteMaxSize = filterRewriteMaxSize;
}
public boolean isEnableFilterPushDown()
{
return enableFilterPushDown;
}
public boolean isEnableFilterRewrite()
{
return enableFilterRewrite;
}
public boolean isEnableRewriteValueColumnFilters()
{
return enableRewriteValueColumnFilters;
}
public long getFilterRewriteMaxSize()
{
return filterRewriteMaxSize;
}
}

View File

@ -24,12 +24,12 @@ import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.lookup.LookupExtractor;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.segment.join.filter.JoinableClauses;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterPreAnalysisGroup;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterRewriteConfig;
import org.apache.druid.segment.join.lookup.LookupJoinable;
import org.apache.druid.segment.join.table.IndexedTable;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
@ -44,9 +44,17 @@ import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.List;
public class BaseHashJoinSegmentStorageAdapterTest
{
public static JoinFilterRewriteConfig DEFAULT_JOIN_FILTER_REWRITE_CONFIG = new JoinFilterRewriteConfig(
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
);
public static final String FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX = "c1.";
public static final String FACT_TO_COUNTRY_ON_NUMBER_PREFIX = "c2.";
public static final String FACT_TO_REGION_PREFIX = "r1.";
@ -187,20 +195,12 @@ public class BaseHashJoinSegmentStorageAdapterTest
protected HashJoinSegmentStorageAdapter makeFactToCountrySegment()
{
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT))),
VirtualColumns.EMPTY,
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
);
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup();
return new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT)),
preAnalysis
joinFilterPreAnalysisGroup
);
}
@ -222,4 +222,32 @@ public class BaseHashJoinSegmentStorageAdapterTest
actualVirtualColumn.getParsedExpression().get().toString()
);
}
protected static JoinFilterPreAnalysisGroup makeDefaultConfigPreAnalysisGroup()
{
return new JoinFilterPreAnalysisGroup(
DEFAULT_JOIN_FILTER_REWRITE_CONFIG,
true
);
}
protected static JoinFilterPreAnalysisGroup makeDefaultConfigPreAnalysisGroup(
Filter originalFilter,
List<JoinableClause> joinableClauses,
VirtualColumns virtualColumns
)
{
JoinFilterPreAnalysisGroup group = new JoinFilterPreAnalysisGroup(
DEFAULT_JOIN_FILTER_REWRITE_CONFIG,
true
);
group.computeJoinFilterPreAnalysisIfAbsent(
originalFilter,
joinableClauses,
virtualColumns
);
return group;
}
}

View File

@ -23,12 +23,10 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.filter.ExpressionDimFilter;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.filter.OrDimFilter;
@ -37,9 +35,7 @@ import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.filter.SelectorFilter;
import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.segment.join.filter.JoinableClauses;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterPreAnalysisGroup;
import org.apache.druid.segment.join.lookup.LookupJoinable;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
@ -302,21 +298,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
{
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT));
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
null,
Intervals.ETERNITY,
@ -371,21 +362,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
{
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryNameUsingIsoCodeLookup(JoinType.LEFT));
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
null,
Intervals.ETERNITY,
@ -438,21 +424,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
public void test_makeCursors_factToCountryInner()
{
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryOnIsoCode(JoinType.INNER));
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
null,
Intervals.ETERNITY,
@ -500,21 +481,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
public void test_makeCursors_factToCountryInnerUsingLookup()
{
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryNameUsingIsoCodeLookup(JoinType.INNER));
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
null,
Intervals.ETERNITY,
@ -564,21 +540,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
// is interpreted as 0 (a.k.a. Australia).
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryOnNumber(JoinType.INNER));
Filter filter = new SelectorDimFilter("channel", "#en.wikipedia", null).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
filter,
Intervals.ETERNITY,
@ -634,21 +605,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
// is interpreted as 0 (a.k.a. Australia).
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryNameUsingNumberLookup(JoinType.INNER));
Filter filter = new SelectorDimFilter("channel", "#en.wikipedia", null).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
filter,
Intervals.ETERNITY,
@ -700,21 +666,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
{
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT));
Filter filter = new SelectorDimFilter("channel", "#de.wikipedia", null).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
filter,
Intervals.ETERNITY,
@ -741,21 +702,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
{
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryNameUsingIsoCodeLookup(JoinType.LEFT));
Filter filter = new SelectorDimFilter("channel", "#de.wikipedia", null).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
filter,
Intervals.ETERNITY,
@ -781,21 +737,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
{
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryOnIsoCode(JoinType.RIGHT));
Filter filter = new SelectorDimFilter("channel", null, null).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
filter,
Intervals.ETERNITY,
@ -824,21 +775,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
{
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryNameUsingIsoCodeLookup(JoinType.RIGHT));
Filter filter = new SelectorDimFilter("channel", null, null).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
filter,
Intervals.ETERNITY,
@ -866,21 +812,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
{
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryOnIsoCode(JoinType.FULL));
Filter filter = new SelectorDimFilter("channel", null, null).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
filter,
Intervals.ETERNITY,
@ -909,21 +850,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
{
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryNameUsingIsoCodeLookup(JoinType.FULL));
Filter filter = new SelectorDimFilter("channel", null, null).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
filter,
Intervals.ETERNITY,
@ -956,21 +892,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
null
).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
filter,
Intervals.ETERNITY,
@ -1003,21 +934,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
null
).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
filter,
Intervals.ETERNITY,
@ -1050,21 +976,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
new SelectorDimFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryNumber", "10", null)
).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
filter,
Intervals.ETERNITY,
@ -1097,21 +1018,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
new SelectorDimFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "v", "Norway", null)
).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
filter,
Intervals.ETERNITY,
@ -1157,21 +1073,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
ExprMacroTable.nil()
).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
filter,
Intervals.ETERNITY,
@ -1238,22 +1149,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
StringUtils.format("\"%sk\" == countryIsoCode", FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX),
ExprMacroTable.nil()
).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
filter,
Intervals.ETERNITY,
@ -1303,22 +1208,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
factToRegion(JoinType.LEFT),
regionToCountry(JoinType.LEFT)
);
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
null,
Intervals.ETERNITY,
@ -1383,20 +1282,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
);
Filter filter = new SelectorDimFilter("channel", "#de.wikipedia", null).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
filter,
Intervals.ETERNITY,
@ -1450,21 +1345,17 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
Filter filter = new SelectorDimFilter("channel", "#de.wikipedia", null).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
filter,
Intervals.ETERNITY,
@ -1501,21 +1392,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
Filter filter = new SelectorDimFilter("channel", "#de.wikipedia", null).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
filter,
Intervals.ETERNITY,
@ -1569,21 +1455,17 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
Filter filter = new SelectorDimFilter("channel", "#de.wikipedia", null).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
filter,
Intervals.ETERNITY,
@ -1629,21 +1511,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
)
);
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
virtualColumns,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
virtualColumns
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
null,
Intervals.ETERNITY,
@ -1695,21 +1572,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
)
);
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
virtualColumns,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
virtualColumns
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
null,
Intervals.ETERNITY,
@ -1753,21 +1625,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
)
);
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
null,
Intervals.ETERNITY,
@ -1810,21 +1677,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
)
);
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
null,
Intervals.ETERNITY,
@ -1869,22 +1731,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
);
Filter filter = new SelectorDimFilter("regionIsoCode", "VA", null).toFilter();
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
filter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
filter,
Intervals.ETERNITY,
@ -1930,21 +1786,17 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
)
);
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.readCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
null,
Intervals.ETERNITY,
@ -1976,21 +1828,17 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
)
);
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.readCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
null,
Intervals.ETERNITY,
@ -2022,21 +1870,17 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
)
);
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.readCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
null,
Intervals.ETERNITY,
@ -2067,21 +1911,17 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
)
);
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.readCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
null,
Intervals.ETERNITY,
@ -2100,21 +1940,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
Filter originalFilter = new SelectorFilter("page", "this matches nothing");
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT));
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
originalFilter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
originalFilter,
Intervals.ETERNITY,
@ -2139,21 +1974,16 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
{
Filter originalFilter = new SelectorFilter("page", "this matches nothing");
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryNameUsingIsoCodeLookup(JoinType.LEFT));
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
originalFilter,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
joinableClauses,
VirtualColumns.EMPTY
);
JoinTestHelper.verifyCursors(
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
originalFilter,
Intervals.ETERNITY,
@ -2176,23 +2006,17 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
public void test_makeCursors_originalFilterDoesNotMatchPreAnalysis_shouldThrowISE()
{
List<JoinableClause> joinableClauses = ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT));
JoinFilterPreAnalysis preAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
);
Filter filter = new SelectorFilter("page", "this matches nothing");
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = makeDefaultConfigPreAnalysisGroup(
filter,
joinableClauses,
VirtualColumns.EMPTY
);
try {
new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
joinableClauses,
preAnalysis
joinFilterPreAnalysisGroup
).makeCursors(
filter,
Intervals.ETERNITY,
@ -2201,10 +2025,6 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag
false,
null
);
Assert.fail();
}
catch (ISE e) {
Assert.assertTrue(e.getMessage().startsWith("Filter provided to cursor ["));
}
}
}

View File

@ -28,10 +28,8 @@ import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.segment.join.filter.JoinableClauses;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterPreAnalysisGroup;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterRewriteConfig;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.SegmentId;
@ -52,6 +50,13 @@ import java.util.Optional;
public class HashJoinSegmentTest extends InitializedNullHandlingTest
{
private JoinFilterRewriteConfig DEFAULT_JOIN_FILTER_REWRITE_CONFIG = new JoinFilterRewriteConfig(
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
);
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@ -129,14 +134,9 @@ public class HashJoinSegmentTest extends InitializedNullHandlingTest
)
);
JoinFilterPreAnalysis joinFilterPreAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = new JoinFilterPreAnalysisGroup(
DEFAULT_JOIN_FILTER_REWRITE_CONFIG,
true
);
referencedSegment = ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment);
@ -188,7 +188,7 @@ public class HashJoinSegmentTest extends InitializedNullHandlingTest
hashJoinSegment = new HashJoinSegment(
testWrapper,
joinableClauses,
joinFilterPreAnalysis
joinFilterPreAnalysisGroup
)
{
@Override
@ -213,20 +213,15 @@ public class HashJoinSegmentTest extends InitializedNullHandlingTest
List<JoinableClause> joinableClauses = ImmutableList.of();
JoinFilterPreAnalysis joinFilterPreAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
JoinableClauses.fromList(joinableClauses),
VirtualColumns.EMPTY,
null,
true,
true,
true,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
JoinFilterPreAnalysisGroup joinFilterPreAnalysisGroup = new JoinFilterPreAnalysisGroup(
DEFAULT_JOIN_FILTER_REWRITE_CONFIG,
true
);
final HashJoinSegment ignored = new HashJoinSegment(
ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
joinableClauses,
joinFilterPreAnalysis
joinFilterPreAnalysisGroup
);
}

View File

@ -22,22 +22,34 @@ package org.apache.druid.segment.join;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.TestQuery;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.extraction.MapLookupExtractor;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.planning.PreJoinableClause;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterRewriteConfig;
import org.apache.druid.segment.join.lookup.LookupJoinable;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
@ -45,6 +57,13 @@ import java.util.function.Function;
public class JoinablesTest
{
private static final JoinFilterRewriteConfig DEFAULT_JOIN_FILTER_REWRITE_CONFIG = new JoinFilterRewriteConfig(
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
);
@Rule
public ExpectedException expectedException = ExpectedException.none();
@ -100,12 +119,8 @@ public class JoinablesTest
ImmutableList.of(),
NoopJoinableFactory.INSTANCE,
new AtomicLong(),
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE,
null,
VirtualColumns.EMPTY
DEFAULT_JOIN_FILTER_REWRITE_CONFIG,
null
);
Assert.assertSame(Function.identity(), segmentMapFn);
@ -129,12 +144,8 @@ public class JoinablesTest
ImmutableList.of(clause),
NoopJoinableFactory.INSTANCE,
new AtomicLong(),
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE,
null,
VirtualColumns.EMPTY
DEFAULT_JOIN_FILTER_REWRITE_CONFIG,
null
);
}
@ -177,12 +188,13 @@ public class JoinablesTest
}
},
new AtomicLong(),
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS,
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE,
null,
VirtualColumns.EMPTY
DEFAULT_JOIN_FILTER_REWRITE_CONFIG,
new TestQuery(
new TableDataSource("test"),
new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))),
false,
new HashMap()
)
);
Assert.assertNotSame(Function.identity(), segmentMapFn);
@ -236,4 +248,83 @@ public class JoinablesTest
Joinables.checkPrefixesForDuplicatesAndShadowing(prefixes);
}
@Test
public void test_gatherAllJoinQueryLevels()
{
Query query1 = new GroupByQuery.Builder()
.addDimension("dim1")
.setDataSource(
JoinDataSource.create(
new NoopDataSource(),
new NoopDataSource(),
"111",
"1",
JoinType.LEFT,
TestExprMacroTable.INSTANCE
)
)
.setInterval("1999/2000")
.setGranularity(Granularities.YEAR)
.build();
Query query2 = new GroupByQuery.Builder()
.addDimension("dim2")
.setDataSource(
JoinDataSource.create(
new NoopDataSource(),
new NoopDataSource(),
"222",
"1",
JoinType.LEFT,
TestExprMacroTable.INSTANCE
)
)
.setInterval("1999/2000")
.setGranularity(Granularities.YEAR)
.build();
QueryDataSource queryDataSource1 = new QueryDataSource(query1);
QueryDataSource queryDataSource2 = new QueryDataSource(query2);
Query query3 = new GroupByQuery.Builder()
.addDimension("dim2")
.setDataSource(
JoinDataSource.create(
JoinDataSource.create(
queryDataSource1,
new NoopDataSource(),
"444",
"4",
JoinType.LEFT,
TestExprMacroTable.INSTANCE
),
queryDataSource2,
"333",
"3",
JoinType.LEFT,
TestExprMacroTable.INSTANCE
)
)
.setInterval("1999/2000")
.setGranularity(Granularities.YEAR)
.build();
Query queryOuter = new GroupByQuery.Builder()
.addDimension("dim")
.setDataSource(
new QueryDataSource(query3)
)
.setInterval("1999/2000")
.setGranularity(Granularities.YEAR)
.build();
List<Query> joinQueryLevels = new ArrayList<>();
Joinables.gatherAllJoinQueryLevels(queryOuter, joinQueryLevels);
Assert.assertEquals(
ImmutableList.of(query3, query1, query2),
joinQueryLevels
);
}
}

View File

@ -59,6 +59,7 @@ import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.Joinables;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterRewriteConfig;
import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.plumber.Sink;
import org.apache.druid.timeline.SegmentId;
@ -169,17 +170,20 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
throw new ISE("Cannot handle subquery: %s", analysis.getDataSource());
}
final JoinFilterRewriteConfig joinFilterRewriteConfig = new JoinFilterRewriteConfig(
QueryContexts.getEnableJoinFilterPushDown(query),
QueryContexts.getEnableJoinFilterRewrite(query),
QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(query),
QueryContexts.getJoinFilterRewriteMaxSize(query)
);
// segmentMapFn maps each base Segment into a joined Segment if necessary.
final Function<SegmentReference, SegmentReference> segmentMapFn = Joinables.createSegmentMapFn(
analysis.getPreJoinableClauses(),
joinableFactory,
cpuTimeAccumulator,
QueryContexts.getEnableJoinFilterPushDown(query),
QueryContexts.getEnableJoinFilterRewrite(query),
QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(query),
QueryContexts.getJoinFilterRewriteMaxSize(query),
query.getFilter() == null ? null : query.getFilter().toFilter(),
query.getVirtualColumns()
joinFilterRewriteConfig,
query
);
Iterable<QueryRunner<T>> perSegmentRunners = Iterables.transform(

View File

@ -42,6 +42,7 @@ import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.SegmentWrangler;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.Joinables;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterRewriteConfig;
import org.joda.time.Interval;
import java.util.HashSet;
@ -100,16 +101,18 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
final Query<T> prioritizedAndLaned = prioritizeAndLaneQuery(query, segments);
final AtomicLong cpuAccumulator = new AtomicLong(0L);
final JoinFilterRewriteConfig joinFilterRewriteConfig = new JoinFilterRewriteConfig(
QueryContexts.getEnableJoinFilterPushDown(prioritizedAndLaned),
QueryContexts.getEnableJoinFilterRewrite(prioritizedAndLaned),
QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(prioritizedAndLaned),
QueryContexts.getJoinFilterRewriteMaxSize(prioritizedAndLaned)
);
final Function<SegmentReference, SegmentReference> segmentMapFn = Joinables.createSegmentMapFn(
analysis.getPreJoinableClauses(),
joinableFactory,
cpuAccumulator,
QueryContexts.getEnableJoinFilterPushDown(prioritizedAndLaned),
QueryContexts.getEnableJoinFilterRewrite(prioritizedAndLaned),
QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(prioritizedAndLaned),
QueryContexts.getJoinFilterRewriteMaxSize(prioritizedAndLaned),
prioritizedAndLaned.getFilter() == null ? null : prioritizedAndLaned.getFilter().toFilter(),
prioritizedAndLaned.getVirtualColumns()
joinFilterRewriteConfig,
query
);
final QueryRunnerFactory<T, Query<T>> queryRunnerFactory = conglomerate.findFactory(prioritizedAndLaned);

View File

@ -57,6 +57,7 @@ import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.Joinables;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterRewriteConfig;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.SetAndVerifyContextQueryRunner;
import org.apache.druid.server.initialization.ServerConfig;
@ -192,17 +193,20 @@ public class ServerManager implements QuerySegmentWalker
return new NoopQueryRunner<>();
}
final JoinFilterRewriteConfig joinFilterRewriteConfig = new JoinFilterRewriteConfig(
QueryContexts.getEnableJoinFilterPushDown(query),
QueryContexts.getEnableJoinFilterRewrite(query),
QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(query),
QueryContexts.getJoinFilterRewriteMaxSize(query)
);
// segmentMapFn maps each base Segment into a joined Segment if necessary.
final Function<SegmentReference, SegmentReference> segmentMapFn = Joinables.createSegmentMapFn(
analysis.getPreJoinableClauses(),
joinableFactory,
cpuTimeAccumulator,
QueryContexts.getEnableJoinFilterPushDown(query),
QueryContexts.getEnableJoinFilterRewrite(query),
QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(query),
QueryContexts.getJoinFilterRewriteMaxSize(query),
query.getFilter() == null ? null : query.getFilter().toFilter(),
query.getVirtualColumns()
joinFilterRewriteConfig,
query
);
FunctionalIterable<QueryRunner<T>> queryRunners = FunctionalIterable

View File

@ -48,6 +48,7 @@ import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.Joinables;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterRewriteConfig;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
@ -139,17 +140,19 @@ public class TestClusterQuerySegmentWalker implements QuerySegmentWalker
&& !toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery())) {
throw new ISE("Cannot handle subquery: %s", analysis.getDataSource());
}
final JoinFilterRewriteConfig joinFilterRewriteConfig = new JoinFilterRewriteConfig(
QueryContexts.getEnableJoinFilterPushDown(query),
QueryContexts.getEnableJoinFilterRewrite(query),
QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(query),
QueryContexts.getJoinFilterRewriteMaxSize(query)
);
final Function<SegmentReference, SegmentReference> segmentMapFn = Joinables.createSegmentMapFn(
analysis.getPreJoinableClauses(),
joinableFactory,
new AtomicLong(),
QueryContexts.getEnableJoinFilterPushDown(query),
QueryContexts.getEnableJoinFilterRewrite(query),
QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(query),
QueryContexts.getJoinFilterRewriteMaxSize(query),
query.getFilter() == null ? null : query.getFilter().toFilter(),
query.getVirtualColumns()
joinFilterRewriteConfig,
query
);
final QueryRunner<T> baseRunner = new FinalizeResultsQueryRunner<>(

View File

@ -31,7 +31,6 @@ import org.apache.druid.annotations.UsedByJUnitParamsRunner;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.StringUtils;
@ -11917,7 +11916,9 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
@Parameters(source = QueryContextForJoinProvider.class)
public void testNestedGroupByOnInlineDataSourceWithFilter(Map<String, Object> queryContext) throws Exception
{
try {
// Cannot vectorize due to virtual columns.
cannotVectorize();
testQuery(
"with abc as"
+ "("
@ -11944,7 +11945,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Intervals.of("2001-01-02T00:00:00.000Z/146140482-04-24T15:36:27.903Z")))
.columns("dim1", "m2")
.columns("dim1")
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(queryContext)
.build()
@ -11958,7 +11959,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.context(queryContext)
.build()
),
"j0",
"j0.",
equalsCondition(
DruidExpression.fromColumn("dim1"),
DruidExpression.fromColumn("j0.dim1")
@ -11968,28 +11969,22 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
.setGranularity(Granularities.ALL)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setDimFilter(selector("dim1", "def", null))
.setDimensions(
dimensions(
new DefaultDimensionSpec("v0", "d0")
)
)
.setVirtualColumns(expressionVirtualColumn("v0", "'def'", ValueType.STRING))
.build()
)
.setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
.setGranularity(Granularities.ALL)
.setInterval(querySegmentSpec(Filtration.eternity()))
.build()
),
ImmutableList.of(new Object[] {1})
ImmutableList.of(new Object[] {1L})
);
Assert.fail("Expected an ISE to be thrown");
}
catch (RuntimeException e) {
Throwable cause = e.getCause();
boolean foundISE = false;
while (cause != null) {
if (cause instanceof ISE) {
foundISE = true;
break;
}
cause = cause.getCause();
}
Assert.assertTrue(foundISE);
}
}
@Test