diff --git a/processing/src/main/java/org/apache/druid/segment/ColumnProcessors.java b/processing/src/main/java/org/apache/druid/segment/ColumnProcessors.java index 676e6e78b5e..81cdaa8d9ac 100644 --- a/processing/src/main/java/org/apache/druid/segment/ColumnProcessors.java +++ b/processing/src/main/java/org/apache/druid/segment/ColumnProcessors.java @@ -79,9 +79,9 @@ public class ColumnProcessors final ColumnSelectorFactory selectorFactory ) { - if (expr.getIdentifierIfIdentifier() != null) { + if (expr.getBindingIfIdentifier() != null) { // If expr is an identifier, treat this the same way as a direct column reference. - return makeProcessor(expr.getIdentifierIfIdentifier(), processorFactory, selectorFactory); + return makeProcessor(expr.getBindingIfIdentifier(), processorFactory, selectorFactory); } else { return makeProcessorInternal( factory -> exprTypeHint, diff --git a/processing/src/main/java/org/apache/druid/segment/join/JoinConditionAnalysis.java b/processing/src/main/java/org/apache/druid/segment/join/JoinConditionAnalysis.java index 6f2225d8bcf..dddd61814ae 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/JoinConditionAnalysis.java +++ b/processing/src/main/java/org/apache/druid/segment/join/JoinConditionAnalysis.java @@ -113,9 +113,9 @@ public class JoinConditionAnalysis if (isLeftExprAndRightColumn(lhs, rhs, rightPrefix)) { // rhs is a right-hand column; lhs is an expression solely of the left-hand side. - equiConditions.add(new Equality(lhs, rhs.getIdentifierIfIdentifier().substring(rightPrefix.length()))); + equiConditions.add(new Equality(lhs, rhs.getBindingIfIdentifier().substring(rightPrefix.length()))); } else if (isLeftExprAndRightColumn(rhs, lhs, rightPrefix)) { - equiConditions.add(new Equality(rhs, lhs.getIdentifierIfIdentifier().substring(rightPrefix.length()))); + equiConditions.add(new Equality(rhs, lhs.getBindingIfIdentifier().substring(rightPrefix.length()))); } else { nonEquiConditions.add(childExpr); } @@ -128,8 +128,8 @@ public class JoinConditionAnalysis private static boolean isLeftExprAndRightColumn(final Expr a, final Expr b, final String rightPrefix) { return a.analyzeInputs().getRequiredBindings().stream().noneMatch(c -> Joinables.isPrefixedBy(c, rightPrefix)) - && b.getIdentifierIfIdentifier() != null - && Joinables.isPrefixedBy(b.getIdentifierIfIdentifier(), rightPrefix); + && b.getBindingIfIdentifier() != null + && Joinables.isPrefixedBy(b.getBindingIfIdentifier(), rightPrefix); } /** diff --git a/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalyzer.java b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalyzer.java index e988199b293..fc0f2aeca16 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalyzer.java +++ b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalyzer.java @@ -41,6 +41,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -98,14 +99,18 @@ public class JoinFilterAnalyzer // build the prefix and equicondition maps // We should check that the prefixes do not duplicate or shadow each other. This is not currently implemented, // but this is tracked at https://github.com/apache/druid/issues/9329 - // We should also consider the case where one RHS column is joined to multiple columns: - // https://github.com/apache/druid/issues/9328 - Map equiconditions = new HashMap<>(); + Map> equiconditions = new HashMap<>(); Map prefixes = new HashMap<>(); for (JoinableClause clause : hashJoinSegmentStorageAdapter.getClauses()) { prefixes.put(clause.getPrefix(), clause); for (Equality equality : clause.getCondition().getEquiConditions()) { - equiconditions.put(clause.getPrefix() + equality.getRightColumn(), equality.getLeftExpr()); + Set exprsForRhs = equiconditions.computeIfAbsent( + clause.getPrefix() + equality.getRightColumn(), + (rhs) -> { + return new HashSet<>(); + } + ); + exprsForRhs.add(equality.getLeftExpr()); } } @@ -170,7 +175,7 @@ public class JoinFilterAnalyzer HashJoinSegmentStorageAdapter adapter, Filter filterClause, Map prefixes, - Map equiconditions, + Map> equiconditions, Map>> correlationCache ) { @@ -232,7 +237,7 @@ public class JoinFilterAnalyzer HashJoinSegmentStorageAdapter adapter, OrFilter orFilter, Map prefixes, - Map equiconditions, + Map> equiconditions, Map>> correlationCache ) { @@ -294,7 +299,7 @@ public class JoinFilterAnalyzer HashJoinSegmentStorageAdapter baseAdapter, SelectorFilter selectorFilter, Map prefixes, - Map equiconditions, + Map> equiconditions, Map>> correlationCache ) { @@ -459,57 +464,34 @@ public class JoinFilterAnalyzer HashJoinSegmentStorageAdapter adapter, String tablePrefix, JoinableClause clauseForTablePrefix, - Map equiConditions + Map> equiConditions ) { JoinConditionAnalysis jca = clauseForTablePrefix.getCondition(); - List rhsColumns = new ArrayList<>(); + Set rhsColumns = new HashSet<>(); for (Equality eq : jca.getEquiConditions()) { rhsColumns.add(tablePrefix + eq.getRightColumn()); } List correlations = new ArrayList<>(); - for (String rhsColumn : rhsColumns) { - List correlatedBaseColumns = new ArrayList<>(); - List correlatedBaseExpressions = new ArrayList<>(); - boolean terminate = false; - String findMappingFor = rhsColumn; - while (!terminate) { - Expr lhs = equiConditions.get(findMappingFor); - if (lhs == null) { - break; - } + Set correlatedBaseColumns = new HashSet<>(); + Set correlatedBaseExpressions = new HashSet<>(); - String identifier = lhs.getBindingIfIdentifier(); - if (identifier == null) { - // We push down if the function only requires base table columns - Expr.BindingDetails bindingDetails = lhs.analyzeInputs(); - Set requiredBindings = bindingDetails.getRequiredBindings(); - if (!requiredBindings.stream().allMatch(requiredBinding -> adapter.isBaseColumn(requiredBinding))) { - return Optional.empty(); - } - - terminate = true; - correlatedBaseExpressions.add(lhs); - } else { - // simple identifier, see if we can correlate it with a column on the base table - findMappingFor = identifier; - if (adapter.isBaseColumn(identifier)) { - terminate = true; - correlatedBaseColumns.add(findMappingFor); - } - } - } + getCorrelationForRHSColumn( + adapter, + equiConditions, + rhsColumn, + correlatedBaseColumns, + correlatedBaseExpressions + ); if (correlatedBaseColumns.isEmpty() && correlatedBaseExpressions.isEmpty()) { return Optional.empty(); } - // We should merge correlation analyses if they're for the same rhsColumn - // See https://github.com/apache/druid/issues/9328 correlations.add( new JoinFilterColumnCorrelationAnalysis( rhsColumn, @@ -519,7 +501,94 @@ public class JoinFilterAnalyzer ); } - return Optional.of(correlations); + List dedupCorrelations = eliminateCorrelationDuplicates(correlations); + + return Optional.of(dedupCorrelations); + } + + /** + * Helper method for {@link #findCorrelatedBaseTableColumns} that determines correlated base table columns + * and/or expressions for a single RHS column and adds them to the provided sets as it traverses the + * equicondition column relationships. + * + * @param adapter The adapter for the join. Used to determine if a column is a base table column. + * @param equiConditions Map of equiconditions, keyed by the right hand columns + * @param rhsColumn RHS column to find base table correlations for + * @param correlatedBaseColumns Set of correlated base column names for the provided RHS column. Will be modified. + * @param correlatedBaseExpressions Set of correlated base column expressions for the provided RHS column. Will be + * modified. + */ + private static void getCorrelationForRHSColumn( + HashJoinSegmentStorageAdapter adapter, + Map> equiConditions, + String rhsColumn, + Set correlatedBaseColumns, + Set correlatedBaseExpressions + ) + { + String findMappingFor = rhsColumn; + Set lhsExprs = equiConditions.get(findMappingFor); + if (lhsExprs == null) { + return; + } + + for (Expr lhsExpr : lhsExprs) { + String identifier = lhsExpr.getBindingIfIdentifier(); + if (identifier == null) { + // We push down if the function only requires base table columns + Expr.BindingDetails bindingDetails = lhsExpr.analyzeInputs(); + Set requiredBindings = bindingDetails.getRequiredBindings(); + if (!requiredBindings.stream().allMatch(requiredBinding -> adapter.isBaseColumn(requiredBinding))) { + break; + } + correlatedBaseExpressions.add(lhsExpr); + } else { + // simple identifier, see if we can correlate it with a column on the base table + findMappingFor = identifier; + if (adapter.isBaseColumn(identifier)) { + correlatedBaseColumns.add(findMappingFor); + } else { + getCorrelationForRHSColumn( + adapter, + equiConditions, + findMappingFor, + correlatedBaseColumns, + correlatedBaseExpressions + ); + } + } + } + } + + /** + * Given a list of JoinFilterColumnCorrelationAnalysis, prune the list so that we only have one + * JoinFilterColumnCorrelationAnalysis for each unique combination of base columns. + * + * Suppose we have a join condition like the following, where A is the base table: + * A.joinColumn == B.joinColumn && A.joinColumn == B.joinColumn2 + * + * We only need to consider one correlation to A.joinColumn since B.joinColumn and B.joinColumn2 must + * have the same value in any row that matches the join condition. + * + * In the future this method could consider which column correlation should be preserved based on availability of + * indices and other heuristics. + * + * When push down of filters with LHS expressions in the join condition is supported, this method should also + * consider expressions. + * + * @param originalList Original list of column correlation analyses. + * @return Pruned list of column correlation analyses. + */ + private static List eliminateCorrelationDuplicates( + List originalList + ) + { + Map, JoinFilterColumnCorrelationAnalysis> uniquesMap = new HashMap<>(); + for (JoinFilterColumnCorrelationAnalysis jca : originalList) { + uniquesMap.put(jca.getBaseColumns(), jca); + } + + return new ArrayList<>(uniquesMap.values()); } private static boolean filterMatchesNull(Filter filter) diff --git a/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterColumnCorrelationAnalysis.java b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterColumnCorrelationAnalysis.java index fb569b152bc..c27d13c8683 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterColumnCorrelationAnalysis.java +++ b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterColumnCorrelationAnalysis.java @@ -21,7 +21,9 @@ package org.apache.druid.segment.join.filter; import org.apache.druid.math.expr.Expr; +import java.util.ArrayList; import java.util.List; +import java.util.Set; /** * Represents an analysis of what base table columns, if any, can be correlated with a column that will @@ -38,13 +40,14 @@ public class JoinFilterColumnCorrelationAnalysis public JoinFilterColumnCorrelationAnalysis( String joinColumn, - List baseColumns, - List baseExpressions + Set baseColumns, + Set baseExpressions ) { this.joinColumn = joinColumn; - this.baseColumns = baseColumns; - this.baseExpressions = baseExpressions; + this.baseColumns = new ArrayList<>(baseColumns); + this.baseExpressions = new ArrayList<>(baseExpressions); + this.baseColumns.sort(String.CASE_INSENSITIVE_ORDER); } public String getJoinColumn() diff --git a/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapterTest.java b/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapterTest.java index 5455fdd43ee..876ec8ed2bd 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapterTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapterTest.java @@ -47,7 +47,7 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag public void test_getInterval_factToCountry() { Assert.assertEquals( - Intervals.of("2015-09-12/2015-09-12T04:43:40.060Z"), + Intervals.of("2015-09-12/2015-09-12T05:21:00.060Z"), makeFactToCountrySegment().getInterval() ); } @@ -88,7 +88,7 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag public void test_getDimensionCardinality_factToCountryFactColumn() { Assert.assertEquals( - 17, + 18, makeFactToCountrySegment().getDimensionCardinality("countryIsoCode") ); } @@ -97,7 +97,7 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag public void test_getDimensionCardinality_factToCountryJoinColumn() { Assert.assertEquals( - 17, + 18, makeFactToCountrySegment().getDimensionCardinality(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName") ); } @@ -133,7 +133,7 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag public void test_getMaxTime_factToCountry() { Assert.assertEquals( - DateTimes.of("2015-09-12T04:43:40.059Z"), + DateTimes.of("2015-09-12T05:21:00.059Z"), makeFactToCountrySegment().getMaxTime() ); } @@ -268,7 +268,7 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag public void test_getMaxIngestedEventTime_factToCountry() { Assert.assertEquals( - DateTimes.of("2015-09-12T04:43:40.059Z"), + DateTimes.of("2015-09-12T05:21:00.059Z"), makeFactToCountrySegment().getMaxIngestedEventTime() ); } @@ -341,7 +341,8 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag new Object[]{"Gabinete Ministerial de Rafael Correa", "EC", "EC", "Ecuador", 4L}, new Object[]{"Old Anatolian Turkish", "US", "US", "United States", 13L}, new Object[]{"Cream Soda", "SU", "SU", "States United", 15L}, - new Object[]{"Orange Soda", "MatchNothing", null, null, NULL_COUNTRY} + new Object[]{"Orange Soda", "MatchNothing", null, null, NULL_COUNTRY}, + new Object[]{"History of Fourems", "MMMM", "MMMM", "Fourems", 205L} ) ); } @@ -390,7 +391,8 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag new Object[]{"Алиса в Зазеркалье", "NO", "NO", "Norway", 11L}, new Object[]{"Gabinete Ministerial de Rafael Correa", "EC", "EC", "Ecuador", 4L}, new Object[]{"Old Anatolian Turkish", "US", "US", "United States", 13L}, - new Object[]{"Cream Soda", "SU", "SU", "States United", 15L} + new Object[]{"Cream Soda", "SU", "SU", "States United", 15L}, + new Object[]{"History of Fourems", "MMMM", "MMMM", "Fourems", 205L} ) ); } @@ -438,7 +440,8 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag new Object[]{"Алиса в Зазеркалье", "NO", "NO", "Norway"}, new Object[]{"Gabinete Ministerial de Rafael Correa", "EC", "EC", "Ecuador"}, new Object[]{"Old Anatolian Turkish", "US", "US", "United States"}, - new Object[]{"Cream Soda", "SU", "SU", "States United"} + new Object[]{"Cream Soda", "SU", "SU", "States United"}, + new Object[]{"History of Fourems", "MMMM", "MMMM", "Fourems"} ) ); } @@ -480,7 +483,8 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag new Object[]{"Giusy Ferreri discography", "IT", "IT", "Italy", 7L}, new Object[]{"Roma-Bangkok", "IT", "IT", "Italy", 7L}, new Object[]{"Old Anatolian Turkish", "US", "US", "United States", 13L}, - new Object[]{"Cream Soda", "SU", "SU", "States United", 15L} + new Object[]{"Cream Soda", "SU", "SU", "States United", 15L}, + new Object[]{"History of Fourems", "MMMM", "MMMM", "Fourems", 205L} ) : ImmutableList.of( new Object[]{"Talk:Oswald Tilghman", null, "AU", "Australia", 0L}, @@ -494,7 +498,8 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag new Object[]{"Giusy Ferreri discography", "IT", "IT", "Italy", 7L}, new Object[]{"Roma-Bangkok", "IT", "IT", "Italy", 7L}, new Object[]{"Old Anatolian Turkish", "US", "US", "United States", 13L}, - new Object[]{"Cream Soda", "SU", "SU", "States United", 15L} + new Object[]{"Cream Soda", "SU", "SU", "States United", 15L}, + new Object[][]{new Object[]{"History of Fourems", "MMMM", "MMMM", "Fourems", 205L}} ) ); } @@ -534,7 +539,8 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag new Object[]{"Giusy Ferreri discography", "IT", "Italy"}, new Object[]{"Roma-Bangkok", "IT", "Italy"}, new Object[]{"Old Anatolian Turkish", "US", "United States"}, - new Object[]{"Cream Soda", "SU", "States United"} + new Object[]{"Cream Soda", "SU", "States United"}, + new Object[]{"History of Fourems", "MMMM", "Fourems"} ) : ImmutableList.of( new Object[]{"Talk:Oswald Tilghman", null, "Australia"}, @@ -548,7 +554,8 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag new Object[]{"Giusy Ferreri discography", "IT", "Italy"}, new Object[]{"Roma-Bangkok", "IT", "Italy"}, new Object[]{"Old Anatolian Turkish", "US", "United States"}, - new Object[]{"Cream Soda", "SU", "States United"} + new Object[]{"Cream Soda", "SU", "States United"}, + new Object[][]{new Object[]{"History of Fourems", "MMMM", "Fourems"}} ) ); } @@ -800,7 +807,8 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag new Object[]{"Алиса в Зазеркалье", "NO", "NO", "Norway", 11L}, new Object[]{"Gabinete Ministerial de Rafael Correa", "EC", "EC", "Ecuador", 4L}, new Object[]{"Old Anatolian Turkish", "US", "US", "United States", 13L}, - new Object[]{"Cream Soda", "SU", "SU", "States United", 15L} + new Object[]{"Cream Soda", "SU", "SU", "States United", 15L}, + new Object[]{"History of Fourems", "MMMM", "MMMM", "Fourems", 205L} ) ); } @@ -856,7 +864,8 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag new Object[]{"Gabinete Ministerial de Rafael Correa", "Provincia del Guayas", "Ecuador"}, new Object[]{"Old Anatolian Turkish", "Virginia", "United States"}, new Object[]{"Cream Soda", "Ainigriv", "States United"}, - new Object[]{"Orange Soda", null, null} + new Object[]{"Orange Soda", null, null}, + new Object[]{"History of Fourems", "Fourems Province", "Fourems"} ) ); } @@ -908,7 +917,8 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag new Object[]{"Diskussion:Sebastian Schulz", "United States"}, new Object[]{"Diskussion:Sebastian Schulz", "Atlantis"}, new Object[]{"Diskussion:Sebastian Schulz", "States United"}, - new Object[]{"Diskussion:Sebastian Schulz", "Usca"} + new Object[]{"Diskussion:Sebastian Schulz", "Usca"}, + new Object[]{"Diskussion:Sebastian Schulz", "Fourems"} ) ); } @@ -996,7 +1006,8 @@ public class HashJoinSegmentStorageAdapterTest extends BaseHashJoinSegmentStorag new Object[]{"Diskussion:Sebastian Schulz", "United States"}, new Object[]{"Diskussion:Sebastian Schulz", "Atlantis"}, new Object[]{"Diskussion:Sebastian Schulz", "States United"}, - new Object[]{"Diskussion:Sebastian Schulz", "Usca"} + new Object[]{"Diskussion:Sebastian Schulz", "Usca"}, + new Object[]{"Diskussion:Sebastian Schulz", "Fourems"} ) ); } diff --git a/processing/src/test/java/org/apache/druid/segment/join/JoinFilterAnalyzerTest.java b/processing/src/test/java/org/apache/druid/segment/join/JoinFilterAnalyzerTest.java index 78c057fcbd0..3a9e37b1355 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/JoinFilterAnalyzerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/JoinFilterAnalyzerTest.java @@ -98,7 +98,8 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes new Object[]{"Roma-Bangkok", "Provincia di Varese", "Italy"}, new Object[]{"Old Anatolian Turkish", "Virginia", "United States"}, new Object[]{"Cream Soda", "Ainigriv", "States United"}, - new Object[][]{new Object[]{"Orange Soda", null, null}} + new Object[]{"Orange Soda", null, null}, + new Object[]{"History of Fourems", "Fourems Province", "Fourems"} ) ); } @@ -400,7 +401,8 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes new Object[]{"Roma-Bangkok", "Provincia di Varese", "Italy"}, new Object[]{"Old Anatolian Turkish", "Virginia", "United States"}, new Object[]{"Cream Soda", "Ainigriv", "States United"}, - new Object[][]{new Object[]{"Orange Soda", null, null}} + new Object[]{"Orange Soda", null, null}, + new Object[]{"History of Fourems", "Fourems Province", "Fourems"} ) ); } @@ -737,8 +739,8 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes new SelectorFilter("channel", "#ko.wikipedia"), new AndFilter( ImmutableList.of( - new InDimFilter("regionIsoCode", ImmutableSet.of("VA"), null, null).toFilter(), - new InDimFilter("countryIsoCode", ImmutableSet.of("US"), null, null).toFilter() + new InDimFilter("countryIsoCode", ImmutableSet.of("US"), null, null).toFilter(), + new InDimFilter("regionIsoCode", ImmutableSet.of("VA"), null, null).toFilter() ) ) ) @@ -1201,13 +1203,9 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes ); } - @Test public void test_filterPushDown_factToRegionTwoColumnsToOneRHSColumnAndFilterOnRHS() { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("Cannot build hash-join matcher on non-key-based condition: Equality{leftExpr=countryIsoCode, rightColumn='regionIsoCode_0'}"); - JoinableClause factExprToRegon = new JoinableClause( FACT_TO_REGION_PREFIX, new IndexedTableJoinable(regionsTable), @@ -1229,11 +1227,16 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes factExprToRegon ) ); - Filter originalFilter = new SelectorFilter("r1.regionName", "Blah"); + Filter originalFilter = new SelectorFilter("r1.regionName", "Fourems Province"); JoinFilterSplit expectedFilterSplit = new JoinFilterSplit( - null, - new SelectorFilter("r1.regionName", "Blah"), + new AndFilter( + ImmutableList.of( + new InDimFilter("countryIsoCode", ImmutableSet.of("MMMM"), null, null).toFilter(), + new InDimFilter("regionIsoCode", ImmutableSet.of("MMMM"), null, null).toFilter() + ) + ), + new SelectorFilter("r1.regionName", "Fourems Province"), ImmutableList.of() ); JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter( @@ -1254,11 +1257,69 @@ public class JoinFilterAnalyzerTest extends BaseHashJoinSegmentStorageAdapterTes ), ImmutableList.of( "page", - FACT_TO_REGION_PREFIX + "regionName", - REGION_TO_COUNTRY_PREFIX + "countryName" + FACT_TO_REGION_PREFIX + "regionName" ), + ImmutableList.of( + new Object[]{"History of Fourems", "Fourems Province"} + ) + ); + } + + @Test + public void test_filterPushDown_factToRegionOneColumnToTwoRHSColumnsAndFilterOnRHS() + { + JoinableClause factExprToRegon = new JoinableClause( + FACT_TO_REGION_PREFIX, + new IndexedTableJoinable(regionsTable), + JoinType.LEFT, + JoinConditionAnalysis.forExpression( + StringUtils.format( + "\"%sregionIsoCode\" == regionIsoCode && \"%scountryIsoCode\" == regionIsoCode", + FACT_TO_REGION_PREFIX, + FACT_TO_REGION_PREFIX + ), + FACT_TO_REGION_PREFIX, + ExprMacroTable.nil() + ) + ); + + HashJoinSegmentStorageAdapter adapter = new HashJoinSegmentStorageAdapter( + factSegment.asStorageAdapter(), + ImmutableList.of( + factExprToRegon + ) + ); + Filter originalFilter = new SelectorFilter("r1.regionName", "Fourems Province"); + + JoinFilterSplit expectedFilterSplit = new JoinFilterSplit( + new InDimFilter("regionIsoCode", ImmutableSet.of("MMMM"), null, null).toFilter(), + new SelectorFilter("r1.regionName", "Fourems Province"), ImmutableList.of() ); + JoinFilterSplit actualFilterSplit = JoinFilterAnalyzer.splitFilter( + adapter, + originalFilter, + true + ); + Assert.assertEquals(expectedFilterSplit, actualFilterSplit); + + JoinTestHelper.verifyCursors( + adapter.makeCursors( + originalFilter, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ), + ImmutableList.of( + "page", + FACT_TO_REGION_PREFIX + "regionName" + ), + ImmutableList.of( + new Object[]{"History of Fourems", "Fourems Province"} + ) + ); } @Test diff --git a/processing/src/test/java/org/apache/druid/segment/join/table/RowBasedIndexedTableTest.java b/processing/src/test/java/org/apache/druid/segment/join/table/RowBasedIndexedTableTest.java index 8bcbe5ea82c..92fedcc499b 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/table/RowBasedIndexedTableTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/table/RowBasedIndexedTableTest.java @@ -93,7 +93,7 @@ public class RowBasedIndexedTableTest @Test public void test_numRows_countries() { - Assert.assertEquals(17, countriesTable.numRows()); + Assert.assertEquals(18, countriesTable.numRows()); } @Test diff --git a/processing/src/test/resources/wikipedia/countries.json b/processing/src/test/resources/wikipedia/countries.json index 6c71bb3bc47..858a58ca2d6 100644 --- a/processing/src/test/resources/wikipedia/countries.json +++ b/processing/src/test/resources/wikipedia/countries.json @@ -15,3 +15,4 @@ {"countryNumber":14,"countryIsoCode":"AX","countryName":"Atlantis"} {"countryNumber":15,"countryIsoCode":"SU","countryName":"States United"} {"countryNumber":16,"countryIsoCode":"USCA","countryName":"Usca"} +{"countryNumber":205,"countryIsoCode":"MMMM","countryName":"Fourems"} diff --git a/processing/src/test/resources/wikipedia/data.json b/processing/src/test/resources/wikipedia/data.json index eac2911afad..69283cf9f0c 100644 --- a/processing/src/test/resources/wikipedia/data.json +++ b/processing/src/test/resources/wikipedia/data.json @@ -26,3 +26,4 @@ {"time":"2015-09-12T02:33:40.059Z","channel":"#en.wikipedia","regionIsoCode":"VA","countryNumber":13,"countryIsoCode":"US","user":"68.100.166.227","delta":14,"isRobot":false,"isAnonymous":true,"page":"Old Anatolian Turkish","namespace":"Main"} {"time":"2015-09-12T03:43:40.059Z","channel":"#en.wikipedia","regionIsoCode":"AV","countryNumber":15,"countryIsoCode":"SU","user":"68.100.166.227","delta":14,"isRobot":false,"isAnonymous":true,"page":"Cream Soda","namespace":"Main"} {"time":"2015-09-12T04:43:40.059Z","channel":"#en.wikipedia","regionIsoCode":"MatchNothing","countryNumber":99,"countryIsoCode":"MatchNothing","user":"68.100.166.227","delta":14,"isRobot":false,"isAnonymous":true,"page":"Orange Soda","namespace":"Main"} +{"time":"2015-09-12T05:21:00.059Z","channel":"#en.wikipedia","regionIsoCode":"MMMM","countryNumber":205,"countryIsoCode":"MMMM","user":"SomeUser","delta":14,"isRobot":false,"isAnonymous":true,"page":"History of Fourems","namespace":"Main"} diff --git a/processing/src/test/resources/wikipedia/regions.json b/processing/src/test/resources/wikipedia/regions.json index 6d44b519f85..3c74579df0d 100644 --- a/processing/src/test/resources/wikipedia/regions.json +++ b/processing/src/test/resources/wikipedia/regions.json @@ -17,3 +17,4 @@ {"regionIsoCode":"VA","countryIsoCode":"US","regionName":"Virginia"} {"regionIsoCode":"AV","countryIsoCode":"SU","regionName":"Ainigriv"} {"regionIsoCode":"ZZ","countryIsoCode":"USCA","regionName":"Usca City"} +{"regionIsoCode":"MMMM","countryIsoCode":"MMMM","regionName":"Fourems Province"}