Add query context option to disable join filter push down (#9335)

This commit is contained in:
Jonathan Wei 2020-02-11 15:31:34 -08:00 committed by GitHub
parent a5c49cc4bd
commit b2c00b3a79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 160 additions and 37 deletions

View File

@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.expression.LookupExprMacro;
import org.apache.druid.query.filter.SelectorDimFilter;
@ -137,7 +138,8 @@ public class JoinAndLookupBenchmark
ExprMacroTable.nil()
)
)
)
),
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN
);
hashJoinLookupLongKeySegment = new HashJoinSegment(
@ -153,7 +155,8 @@ public class JoinAndLookupBenchmark
ExprMacroTable.nil()
)
)
)
),
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN
);
hashJoinIndexedTableStringKeySegment = new HashJoinSegment(
@ -169,7 +172,8 @@ public class JoinAndLookupBenchmark
ExprMacroTable.nil()
)
)
)
),
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN
);
hashJoinIndexedTableLongKeySegment = new HashJoinSegment(
@ -185,7 +189,8 @@ public class JoinAndLookupBenchmark
ExprMacroTable.nil()
)
)
)
),
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN
);
final Map<String, String> countryCodeToNameMap = JoinTestHelper.createCountryIsoCodeToNameLookup().getMap();

View File

@ -45,6 +45,7 @@ public class QueryContexts
public static final String BROKER_PARALLELISM = "parallelMergeParallelism";
public static final String VECTORIZE_KEY = "vectorize";
public static final String VECTOR_SIZE_KEY = "vectorSize";
public static final String JOIN_FILTER_PUSH_DOWN_KEY = "enableJoinFilterPushDown";
public static final boolean DEFAULT_BY_SEGMENT = false;
public static final boolean DEFAULT_POPULATE_CACHE = true;
@ -57,6 +58,7 @@ public class QueryContexts
public static final long DEFAULT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);
public static final long NO_TIMEOUT = 0;
public static final boolean DEFAULT_ENABLE_PARALLEL_MERGE = true;
public static final boolean DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN = true;
@SuppressWarnings("unused") // Used by Jackson serialization
public enum Vectorize
@ -218,6 +220,11 @@ public class QueryContexts
return parseInt(query, BROKER_PARALLELISM, defaultValue);
}
public static <T> boolean getEnableJoinFilterPushDown(Query<T> query)
{
return parseBoolean(query, JOIN_FILTER_PUSH_DOWN_KEY, DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN);
}
public static <T> Query<T> withMaxScatterGatherBytes(Query<T> query, long maxScatterGatherBytesLimit)
{
Object obj = query.getContextValue(MAX_SCATTER_GATHER_BYTES_KEY);

View File

@ -40,14 +40,17 @@ public class HashJoinSegment extends AbstractSegment
{
private final Segment baseSegment;
private final List<JoinableClause> clauses;
private final boolean enableFilterPushDown;
public HashJoinSegment(
Segment baseSegment,
List<JoinableClause> clauses
List<JoinableClause> clauses,
boolean enableFilterPushDown
)
{
this.baseSegment = baseSegment;
this.clauses = clauses;
this.enableFilterPushDown = enableFilterPushDown;
// Verify 'clauses' is nonempty (otherwise it's a waste to create this object, and the caller should know)
if (clauses.isEmpty()) {
@ -80,7 +83,7 @@ public class HashJoinSegment extends AbstractSegment
@Override
public StorageAdapter asStorageAdapter()
{
return new HashJoinSegmentStorageAdapter(baseSegment.asStorageAdapter(), clauses);
return new HashJoinSegmentStorageAdapter(baseSegment.asStorageAdapter(), clauses, enableFilterPushDown);
}
@Override

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.Capabilities;
@ -53,6 +54,7 @@ public class HashJoinSegmentStorageAdapter implements StorageAdapter
{
private final StorageAdapter baseAdapter;
private final List<JoinableClause> clauses;
private final boolean enableFilterPushDown;
HashJoinSegmentStorageAdapter(
StorageAdapter baseAdapter,
@ -61,6 +63,18 @@ public class HashJoinSegmentStorageAdapter implements StorageAdapter
{
this.baseAdapter = baseAdapter;
this.clauses = clauses;
this.enableFilterPushDown = QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN;
}
HashJoinSegmentStorageAdapter(
StorageAdapter baseAdapter,
List<JoinableClause> clauses,
final boolean enableFilterPushDown
)
{
this.baseAdapter = baseAdapter;
this.clauses = clauses;
this.enableFilterPushDown = enableFilterPushDown;
}
@Override
@ -227,7 +241,8 @@ public class HashJoinSegmentStorageAdapter implements StorageAdapter
JoinFilterSplit joinFilterSplit = JoinFilterAnalyzer.splitFilter(
this,
filter
filter,
enableFilterPushDown
);
preJoinVirtualColumns.addAll(joinFilterSplit.getPushDownVirtualColumns());
@ -277,6 +292,11 @@ public class HashJoinSegmentStorageAdapter implements StorageAdapter
return !getClauseForColumn(column).isPresent();
}
public boolean isEnableFilterPushDown()
{
return enableFilterPushDown;
}
/**
* Returns the JoinableClause corresponding to a particular column, based on the clauses' prefixes.
*

View File

@ -74,7 +74,8 @@ public class Joinables
public static Function<Segment, Segment> createSegmentMapFn(
final List<PreJoinableClause> clauses,
final JoinableFactory joinableFactory,
final AtomicLong cpuTimeAccumulator
final AtomicLong cpuTimeAccumulator,
final boolean enableFilterPushDown
)
{
return JvmUtils.safeAccumulateThreadCpuTime(
@ -84,7 +85,7 @@ public class Joinables
return Function.identity();
} else {
final List<JoinableClause> joinableClauses = createJoinableClauses(clauses, joinableFactory);
return baseSegment -> new HashJoinSegment(baseSegment, joinableClauses);
return baseSegment -> new HashJoinSegment(baseSegment, joinableClauses, enableFilterPushDown);
}
}
);

View File

@ -50,11 +50,11 @@ import java.util.Set;
* When there is a filter in a join query, we can sometimes improve performance by applying parts of the filter
* when we first read from the base table instead of after the join.
*
* This class provides a {@link #splitFilter(HashJoinSegmentStorageAdapter, Filter)} method that
* This class provides a {@link #splitFilter(HashJoinSegmentStorageAdapter, Filter, boolean)} method that
* takes a filter and splits it into a portion that should be applied to the base table prior to the join, and a
* portion that should be applied after the join.
*
* The first step of the filter splitting is to convert the fllter into
* The first step of the filter splitting is to convert the filter into
* https://en.wikipedia.org/wiki/Conjunctive_normal_form (an AND of ORs). This allows us to consider each
* OR clause independently as a candidate for filter push down to the base table.
*
@ -73,7 +73,8 @@ public class JoinFilterAnalyzer
public static JoinFilterSplit splitFilter(
HashJoinSegmentStorageAdapter hashJoinSegmentStorageAdapter,
@Nullable Filter originalFilter
@Nullable Filter originalFilter,
boolean enableFilterPushDown
)
{
if (originalFilter == null) {
@ -84,6 +85,14 @@ public class JoinFilterAnalyzer
);
}
if (!enableFilterPushDown) {
return new JoinFilterSplit(
null,
originalFilter,
ImmutableList.of()
);
}
Filter normalizedFilter = Filters.convertToCNF(originalFilter);
// build the prefix and equicondition maps

View File

@ -22,6 +22,7 @@ package org.apache.druid.segment.join;
import com.google.common.collect.ImmutableList;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
import org.apache.druid.timeline.SegmentId;
@ -76,7 +77,8 @@ public class HashJoinSegmentTest
JoinType.LEFT,
JoinConditionAnalysis.forExpression("1", "j1.", ExprMacroTable.nil())
)
)
),
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN
);
}
@ -86,7 +88,11 @@ public class HashJoinSegmentTest
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("'clauses' is empty, no need to create HashJoinSegment");
final HashJoinSegment ignored = new HashJoinSegment(baseSegment, ImmutableList.of());
final HashJoinSegment ignored = new HashJoinSegment(
baseSegment,
ImmutableList.of(),
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN
);
}
@Test

View File

@ -66,7 +66,8 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(
adapter,
originalFilter
originalFilter,
true
);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -135,7 +136,8 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(
adapter,
originalFilter
originalFilter,
true
);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -188,7 +190,8 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(
adapter,
originalFilter
originalFilter,
true
);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -249,7 +252,8 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(
adapter,
originalFilter
originalFilter,
true
);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -309,7 +313,8 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(
adapter,
originalFilter
originalFilter,
true
);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -354,7 +359,8 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(
adapter,
originalFilter
originalFilter,
true
);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -532,7 +538,8 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(
adapter,
originalFilter
originalFilter,
true
);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -601,7 +608,8 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(
adapter,
originalFilter
originalFilter,
true
);
ExpressionVirtualColumn expectedVirtualColumn = new ExpressionVirtualColumn(
"JOIN-FILTER-PUSHDOWN-VIRTUAL-COLUMN-0",
@ -757,7 +765,8 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(
adapter,
originalFilter
originalFilter,
true
);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -833,7 +842,8 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(
adapter,
filter
filter,
true
);
Assert.assertEquals(
expectedFilterSplit.getBaseTableFilter(),
@ -894,7 +904,8 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(
adapter,
originalFilter
originalFilter,
true
);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -947,7 +958,8 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(
adapter,
originalFilter
originalFilter,
true
);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -998,7 +1010,8 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(
adapter,
originalFilter
originalFilter,
true
);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -1057,7 +1070,8 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(
adapter,
originalFilter
originalFilter,
true
);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -1107,7 +1121,8 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(
adapter,
filter
filter,
true
);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -1160,7 +1175,8 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(
adapter,
originalFilter
originalFilter,
true
);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -1222,7 +1238,8 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(
adapter,
originalFilter
originalFilter,
true
);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@ -1244,6 +1261,51 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes
);
}
@Test
public void test_filterPushDown_factToRegionToCountryLeftFilterOnPageDisablePushDown()
{
HashJoinSegmentStorageAdapter adapter = new HashJoinSegmentStorageAdapter(
factSegment.asStorageAdapter(),
ImmutableList.of(
factToRegion(JoinType.LEFT),
regionToCountry(JoinType.LEFT)
),
false
);
Filter originalFilter = new SelectorFilter("page", "Peremptory norm");
JoinFilterSplit expectedFilterSplit = new JoinFilterSplit(
null,
new SelectorFilter("page", "Peremptory norm"),
ImmutableList.of()
);
JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter(
adapter,
originalFilter,
adapter.isEnableFilterPushDown()
);
Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
JoinTestHelper.verifyCursors(
adapter.makeCursors(
originalFilter,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
),
ImmutableList.of(
"page",
FACT_TO_REGION_PREFIX + "regionName",
REGION_TO_COUNTRY_PREFIX + "countryName"
),
ImmutableList.of(
new Object[]{"Peremptory norm", "New South Wales", "Australia"}
)
);
}
@Test
public void test_JoinFilterSplit_equals()

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.extraction.MapLookupExtractor;
import org.apache.druid.query.planning.PreJoinableClause;
import org.apache.druid.segment.Segment;
@ -93,7 +94,8 @@ public class JoinablesTest
final Function<Segment, Segment> segmentMapFn = Joinables.createSegmentMapFn(
ImmutableList.of(),
NoopJoinableFactory.INSTANCE,
new AtomicLong()
new AtomicLong(),
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN
);
Assert.assertSame(Function.identity(), segmentMapFn);
@ -116,7 +118,8 @@ public class JoinablesTest
final Function<Segment, Segment> ignored = Joinables.createSegmentMapFn(
ImmutableList.of(clause),
NoopJoinableFactory.INSTANCE,
new AtomicLong()
new AtomicLong(),
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN
);
}
@ -147,7 +150,8 @@ public class JoinablesTest
return Optional.empty();
}
},
new AtomicLong()
new AtomicLong(),
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN
);
Assert.assertNotSame(Function.identity(), segmentMapFn);

View File

@ -41,6 +41,7 @@ import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.MetricsEmittingQueryRunner;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryRunner;
@ -173,7 +174,8 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
final Function<Segment, Segment> segmentMapFn = Joinables.createSegmentMapFn(
analysis.getPreJoinableClauses(),
joinableFactory,
cpuTimeAccumulator
cpuTimeAccumulator,
QueryContexts.getEnableJoinFilterPushDown(query)
);
Iterable<QueryRunner<T>> perSegmentRunners = Iterables.transform(

View File

@ -39,6 +39,7 @@ import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.PerSegmentOptimizingQueryRunner;
import org.apache.druid.query.PerSegmentQueryOptimizationContext;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryRunner;
@ -194,7 +195,8 @@ public class ServerManager implements QuerySegmentWalker
final Function<Segment, Segment> segmentMapFn = Joinables.createSegmentMapFn(
analysis.getPreJoinableClauses(),
joinableFactory,
cpuTimeAccumulator
cpuTimeAccumulator,
QueryContexts.getEnableJoinFilterPushDown(query)
);
FunctionalIterable<QueryRunner<T>> queryRunners = FunctionalIterable

View File

@ -36,6 +36,7 @@ import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Queries;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
@ -372,7 +373,8 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
final Function<Segment, Segment> segmentMapFn = Joinables.createSegmentMapFn(
analysis.getPreJoinableClauses(),
joinableFactory,
new AtomicLong()
new AtomicLong(),
QueryContexts.getEnableJoinFilterPushDown(query)
);
final QueryRunner<T> baseRunner = new FinalizeResultsQueryRunner<>(