Push join build table values as filter incase of duplicates (#12225)

* Push join build table values as filter

* Add tests for JoinableFactoryWrapper

* fixup! Push join build table values as filter

* fixup! Add tests for JoinableFactoryWrapper

* fixup! Push join build table values as filter
This commit is contained in:
Rohan Garg 2022-06-14 05:48:27 +05:30 committed by GitHub
parent 27e8b43673
commit afaea251f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 275 additions and 57 deletions

View File

@ -86,8 +86,11 @@ public interface Joinable extends ReferenceCountedObject
);
/**
* Returns all nonnull values from a particular column if they are all unique, if there are "maxNumValues" or fewer,
* and if the column exists and supports this operation. Otherwise, returns an empty Optional.
* Returns all non-null values from a particular column along with a flag to tell if they are all unique in the column.
* If the non-null values are greater than "maxNumValues" or if the column doesn't exists or doesn't supports this
* operation, returns an object with empty set for column values and false for uniqueness flag.
* The uniqueness flag will only be true if we've collected all non-null values in the column and found that they're
* all unique. In all other cases it will be false.
*
* The returned set may be passed to {@link org.apache.druid.query.filter.InDimFilter}. For efficiency,
* implementations should prefer creating the returned set with
@ -96,7 +99,7 @@ public interface Joinable extends ReferenceCountedObject
* @param columnName name of the column
* @param maxNumValues maximum number of values to return
*/
Optional<Set<String>> getNonNullColumnValuesIfAllUnique(String columnName, int maxNumValues);
ColumnValuesWithUniqueFlag getNonNullColumnValues(String columnName, int maxNumValues);
/**
* Searches a column from this Joinable for a particular value, finds rows that match,
@ -125,4 +128,27 @@ public interface Joinable extends ReferenceCountedObject
long maxCorrelationSetSize,
boolean allowNonKeyColumnSearch
);
class ColumnValuesWithUniqueFlag
{
final Set<String> columnValues;
final boolean allUnique;
public ColumnValuesWithUniqueFlag(Set<String> columnValues, boolean allUnique)
{
this.columnValues = columnValues;
this.allUnique = allUnique;
}
public Set<String> getColumnValues()
{
return columnValues;
}
public boolean isAllUnique()
{
return allUnique;
}
}
}

View File

@ -28,7 +28,6 @@ import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.Query;
@ -38,6 +37,7 @@ import org.apache.druid.query.filter.InDimFilter;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.planning.PreJoinableClause;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.filter.FalseFilter;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
@ -228,16 +228,19 @@ public class JoinableFactoryWrapper
columnsRequiredByJoinClauses.remove(column, 1);
}
final Optional<Filter> filter =
final JoinClauseToFilterConversion joinClauseToFilterConversion =
convertJoinToFilter(
clause,
Sets.union(requiredColumns, columnsRequiredByJoinClauses.elementSet()),
maxNumFilterValues
);
if (filter.isPresent()) {
filterList.add(filter.get());
} else {
// add the converted filter to the filter list
if (joinClauseToFilterConversion.getConvertedFilter() != null) {
filterList.add(joinClauseToFilterConversion.getConvertedFilter());
}
// if the converted filter is partial, keep the join clause too
if (!joinClauseToFilterConversion.isJoinClauseFullyConverted()) {
clausesToUse.add(clause);
atStart = false;
}
@ -246,11 +249,6 @@ public class JoinableFactoryWrapper
}
}
// Sanity check. If this exception is ever thrown, it's a bug.
if (filterList.size() + clausesToUse.size() != clauses.size()) {
throw new ISE("Lost a join clause during planning");
}
return Pair.of(filterList, clausesToUse);
}
@ -260,11 +258,17 @@ public class JoinableFactoryWrapper
* The requirements are:
*
* - it must be an INNER equi-join
* - the right-hand columns referenced by the condition must not have any duplicate values
* - the right-hand columns referenced by the condition must not have any duplicate values. If there are duplicates
* values in the column, then the join is tried to be converted to a filter while maintaining the join clause on top
* as well for correct results.
* - no columns from the right-hand side can appear in "requiredColumns"
*
* @return {@link JoinClauseToFilterConversion} object which contains the converted filter for the clause and a boolean
* to represent whether the converted filter encapsulates the whole clause or not. More semantics of the object are
* present in the class level docs.
*/
@VisibleForTesting
static Optional<Filter> convertJoinToFilter(
static JoinClauseToFilterConversion convertJoinToFilter(
final JoinableClause clause,
final Set<String> requiredColumns,
final int maxNumFilterValues
@ -276,28 +280,74 @@ public class JoinableFactoryWrapper
&& clause.getCondition().getEquiConditions().size() > 0) {
final List<Filter> filters = new ArrayList<>();
int numValues = maxNumFilterValues;
boolean joinClauseFullyConverted = true;
for (final Equality condition : clause.getCondition().getEquiConditions()) {
final String leftColumn = condition.getLeftExpr().getBindingIfIdentifier();
if (leftColumn == null) {
return Optional.empty();
return new JoinClauseToFilterConversion(null, false);
}
final Optional<Set<String>> columnValuesForFilter =
clause.getJoinable().getNonNullColumnValuesIfAllUnique(condition.getRightColumn(), numValues);
Joinable.ColumnValuesWithUniqueFlag columnValuesWithUniqueFlag =
clause.getJoinable().getNonNullColumnValues(condition.getRightColumn(), numValues);
// For an empty values set, isAllUnique flag will be true only if the column had no non-null values.
if (columnValuesWithUniqueFlag.getColumnValues().isEmpty()) {
if (columnValuesWithUniqueFlag.isAllUnique()) {
return new JoinClauseToFilterConversion(FalseFilter.instance(), true);
} else {
joinClauseFullyConverted = false;
}
continue;
}
if (columnValuesForFilter.isPresent()) {
numValues -= columnValuesForFilter.get().size();
filters.add(Filters.toFilter(new InDimFilter(leftColumn, columnValuesForFilter.get())));
} else {
return Optional.empty();
numValues -= columnValuesWithUniqueFlag.getColumnValues().size();
filters.add(Filters.toFilter(new InDimFilter(leftColumn, columnValuesWithUniqueFlag.getColumnValues())));
if (!columnValuesWithUniqueFlag.isAllUnique()) {
joinClauseFullyConverted = false;
}
}
return Optional.of(Filters.and(filters));
return new JoinClauseToFilterConversion(Filters.maybeAnd(filters).orElse(null), joinClauseFullyConverted);
}
return Optional.empty();
return new JoinClauseToFilterConversion(null, false);
}
/**
* Encapsulates the conversion which happened for a joinable clause.
* convertedFilter represents the filter which got generated from the conversion.
* joinClauseFullyConverted represents whether convertedFilter fully encapsulated the joinable clause or not.
* Encapsulation of the clause means that the filter can replace the whole joinable clause.
*
* If convertedFilter is null and joinClauseFullyConverted is true, it means that all parts of the joinable clause can
* be broken into filters. Further, all the clause conditions are on columns where the right side is only null values.
* In that case, we replace joinable with a FalseFilter.
* If convertedFilter is null and joinClauseFullyConverted is false, it means that no parts of the joinable clause can
* be broken into filters.
* If convertedFilter is non-null, then joinClauseFullyConverted represents whether the filter encapsulates the clause
* which was converted.
*/
private static class JoinClauseToFilterConversion
{
private final @Nullable Filter convertedFilter;
private final boolean joinClauseFullyConverted;
public JoinClauseToFilterConversion(@Nullable Filter convertedFilter, boolean joinClauseFullyConverted)
{
this.convertedFilter = convertedFilter;
this.joinClauseFullyConverted = joinClauseFullyConverted;
}
@Nullable
public Filter getConvertedFilter()
{
return convertedFilter;
}
public boolean isJoinClauseFullyConverted()
{
return joinClauseFullyConverted;
}
}
}

View File

@ -96,7 +96,7 @@ public class LookupJoinable implements Joinable
}
@Override
public Optional<Set<String>> getNonNullColumnValuesIfAllUnique(String columnName, int maxNumValues)
public ColumnValuesWithUniqueFlag getNonNullColumnValues(String columnName, int maxNumValues)
{
if (LookupColumnSelectorFactory.KEY_COLUMN.equals(columnName) && extractor.canGetKeySet()) {
final Set<String> keys = extractor.keySet();
@ -117,14 +117,14 @@ public class LookupJoinable implements Joinable
}
if (nonNullKeys > maxNumValues) {
return Optional.empty();
return new ColumnValuesWithUniqueFlag(ImmutableSet.of(), false);
} else if (nonNullKeys == keys.size()) {
return Optional.of(keys);
return new ColumnValuesWithUniqueFlag(keys, true);
} else {
return Optional.of(Sets.difference(keys, nullEquivalentValues));
return new ColumnValuesWithUniqueFlag(Sets.difference(keys, nullEquivalentValues), true);
}
} else {
return Optional.empty();
return new ColumnValuesWithUniqueFlag(ImmutableSet.of(), false);
}
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.segment.join.table;
import com.google.common.collect.ImmutableSet;
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.guava.Comparators;
@ -92,35 +93,36 @@ public class IndexedTableJoinable implements Joinable
}
@Override
public Optional<Set<String>> getNonNullColumnValuesIfAllUnique(final String columnName, final int maxNumValues)
public ColumnValuesWithUniqueFlag getNonNullColumnValues(String columnName, final int maxNumValues)
{
final int columnPosition = table.rowSignature().indexOf(columnName);
if (columnPosition < 0) {
return Optional.empty();
return new ColumnValuesWithUniqueFlag(ImmutableSet.of(), false);
}
try (final IndexedTable.Reader reader = table.columnReader(columnPosition)) {
// Sorted set to encourage "in" filters that result from this method to do dictionary lookups in order.
// The hopes are that this will improve locality and therefore improve performance.
final Set<String> allValues = createValuesSet();
boolean allUnique = true;
for (int i = 0; i < table.numRows(); i++) {
final String s = DimensionHandlerUtils.convertObjectToString(reader.read(i));
if (!NullHandling.isNullOrEquivalent(s)) {
if (!allValues.add(s)) {
// Duplicate found. Since the values are not all unique, we must return an empty Optional.
return Optional.empty();
// Duplicate found
allUnique = false;
}
if (allValues.size() > maxNumValues) {
return Optional.empty();
return new ColumnValuesWithUniqueFlag(ImmutableSet.of(), false);
}
}
}
return Optional.of(allValues);
return new ColumnValuesWithUniqueFlag(allValues, allUnique);
}
catch (IOException e) {
throw new RuntimeException(e);

View File

@ -26,6 +26,7 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.common.config.NullHandlingTest;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
@ -33,6 +34,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.GlobalTableDataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.TestQuery;
@ -45,7 +47,13 @@ import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.planning.PreJoinableClause;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.filter.FalseFilter;
import org.apache.druid.segment.join.lookup.LookupJoinable;
import org.apache.druid.segment.join.table.IndexedTable;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
import org.apache.druid.segment.join.table.RowBasedIndexedTable;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Rule;
@ -61,6 +69,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
public class JoinableFactoryWrapperTest extends NullHandlingTest
{
@ -81,6 +90,40 @@ public class JoinableFactoryWrapperTest extends NullHandlingTest
? TEST_LOOKUP.keySet()
: Sets.difference(TEST_LOOKUP.keySet(), Collections.singleton(""));
private static final InlineDataSource INDEXED_TABLE_DS = InlineDataSource.fromIterable(
ImmutableList.of(
new Object[]{"Mexico"},
new Object[]{"Norway"},
new Object[]{"El Salvador"},
new Object[]{"United States"},
new Object[]{"United States"}
),
RowSignature.builder().add("country", ColumnType.STRING).build()
);
private static final InlineDataSource NULL_INDEXED_TABLE_DS = InlineDataSource.fromIterable(
ImmutableList.of(
new Object[]{null}
),
RowSignature.builder().add("nullCol", ColumnType.STRING).build()
);
private static final IndexedTable TEST_INDEXED_TABLE = new RowBasedIndexedTable<>(
INDEXED_TABLE_DS.getRowsAsList(),
INDEXED_TABLE_DS.rowAdapter(),
INDEXED_TABLE_DS.getRowSignature(),
ImmutableSet.of("country"),
DateTimes.nowUtc().toString()
);
private static final IndexedTable TEST_NULL_INDEXED_TABLE = new RowBasedIndexedTable<>(
NULL_INDEXED_TABLE_DS.getRowsAsList(),
NULL_INDEXED_TABLE_DS.rowAdapter(),
NULL_INDEXED_TABLE_DS.getRowSignature(),
ImmutableSet.of("nullCol"),
DateTimes.nowUtc().toString()
);
@Rule
public ExpectedException expectedException = ExpectedException.none();
@ -467,6 +510,33 @@ public class JoinableFactoryWrapperTest extends NullHandlingTest
);
}
@Test
public void test_convertJoinsToPartialFilters_convertInnerJoin()
{
JoinableClause joinableClause = new JoinableClause(
"j.",
new IndexedTableJoinable(TEST_INDEXED_TABLE),
JoinType.INNER,
JoinConditionAnalysis.forExpression("x == \"j.country\"", "j.", ExprMacroTable.nil())
);
final Pair<List<Filter>, List<JoinableClause>> conversion = JoinableFactoryWrapper.convertJoinsToFilters(
ImmutableList.of(joinableClause),
ImmutableSet.of("x"),
Integer.MAX_VALUE
);
Assert.assertEquals(
Pair.of(
ImmutableList.of(new InDimFilter(
"x",
INDEXED_TABLE_DS.getRowsAsList().stream().map(row -> row[0].toString()).collect(Collectors.toSet()))
),
ImmutableList.of(joinableClause) // the joinable clause remains intact since we've duplicates in country column
),
conversion
);
}
@Test
public void test_convertJoinsToFilters_convertTwoInnerJoins()
{
@ -506,6 +576,51 @@ public class JoinableFactoryWrapperTest extends NullHandlingTest
);
}
@Test
public void test_convertJoinsToPartialAndFullFilters_convertMultipleInnerJoins()
{
final ImmutableList<JoinableClause> clauses = ImmutableList.of(
new JoinableClause(
"j.",
LookupJoinable.wrap(new MapLookupExtractor(TEST_LOOKUP, false)),
JoinType.INNER,
JoinConditionAnalysis.forExpression("x == \"j.k\"", "j.", ExprMacroTable.nil())
), // this joinable will be fully converted to a filter
new JoinableClause(
"_j.",
new IndexedTableJoinable(TEST_INDEXED_TABLE),
JoinType.INNER,
JoinConditionAnalysis.forExpression("x == \"_j.country\"", "_j.", ExprMacroTable.nil())
), // this joinable will be partially converted to a filter since we've duplicates on country column
new JoinableClause(
"__j.",
new IndexedTableJoinable(TEST_INDEXED_TABLE),
JoinType.LEFT,
JoinConditionAnalysis.forExpression("x == \"__j.country\"", "__j.", ExprMacroTable.nil())
) // this joinable will not be converted to filter since its a LEFT join
);
final Pair<List<Filter>, List<JoinableClause>> conversion = JoinableFactoryWrapper.convertJoinsToFilters(
clauses,
ImmutableSet.of("x"),
Integer.MAX_VALUE
);
Assert.assertEquals(
Pair.of(
ImmutableList.of(
new InDimFilter("x", TEST_LOOKUP_KEYS),
new InDimFilter(
"x",
INDEXED_TABLE_DS.getRowsAsList().stream().map(row -> row[0].toString()).collect(Collectors.toSet())
)
),
ImmutableList.of(clauses.get(1), clauses.get(2))
),
conversion
);
}
@Test
public void test_convertJoinsToFilters_dontConvertTooManyValues()
{
@ -583,6 +698,31 @@ public class JoinableFactoryWrapperTest extends NullHandlingTest
);
}
@Test
public void test_convertJoinsToFilters_convertToFalseFilterWhenOnlyNullValues()
{
final JoinableClause clause = new JoinableClause(
"j.",
new IndexedTableJoinable(TEST_NULL_INDEXED_TABLE),
JoinType.INNER,
JoinConditionAnalysis.forExpression("x == \"j.nullCol\"", "j.", ExprMacroTable.nil())
);
final Pair<List<Filter>, List<JoinableClause>> conversion = JoinableFactoryWrapper.convertJoinsToFilters(
ImmutableList.of(clause),
ImmutableSet.of("x"),
Integer.MAX_VALUE
);
Assert.assertEquals(
Pair.of(
ImmutableList.of(FalseFilter.instance()),
ImmutableList.of()
),
conversion
);
}
@Test
public void test_convertJoinsToFilters_dontConvertLhsFunctions()
{

View File

@ -276,38 +276,36 @@ public class LookupJoinableTest extends NullHandlingTest
@Test
public void getNonNullColumnValuesIfAllUniqueForValueColumnShouldReturnEmpty()
{
final Optional<Set<String>> values = target.getNonNullColumnValuesIfAllUnique(
final Joinable.ColumnValuesWithUniqueFlag values = target.getNonNullColumnValues(
LookupColumnSelectorFactory.VALUE_COLUMN,
Integer.MAX_VALUE
);
Assert.assertEquals(Optional.empty(), values);
Assert.assertEquals(ImmutableSet.of(), values.getColumnValues());
}
@Test
public void getNonNullColumnValuesIfAllUniqueForKeyColumnShouldReturnValues()
{
final Optional<Set<String>> values = target.getNonNullColumnValuesIfAllUnique(
final Joinable.ColumnValuesWithUniqueFlag values = target.getNonNullColumnValues(
LookupColumnSelectorFactory.KEY_COLUMN,
Integer.MAX_VALUE
);
Assert.assertEquals(
Optional.of(
NullHandling.replaceWithDefault() ? ImmutableSet.of("foo", "bar") : ImmutableSet.of("foo", "bar", "")
),
values
NullHandling.replaceWithDefault() ? ImmutableSet.of("foo", "bar") : ImmutableSet.of("foo", "bar", ""),
values.getColumnValues()
);
}
@Test
public void getNonNullColumnValuesIfAllUniqueForKeyColumnWithLowMaxValuesShouldReturnEmpty()
{
final Optional<Set<String>> values = target.getNonNullColumnValuesIfAllUnique(
final Joinable.ColumnValuesWithUniqueFlag values = target.getNonNullColumnValues(
LookupColumnSelectorFactory.KEY_COLUMN,
1
);
Assert.assertEquals(Optional.empty(), values);
Assert.assertEquals(ImmutableSet.of(), values.getColumnValues());
}
}

View File

@ -38,6 +38,7 @@ import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.JoinMatcher;
import org.apache.druid.segment.join.Joinable;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -347,46 +348,47 @@ public class IndexedTableJoinableTest
@Test
public void getNonNullColumnValuesIfAllUniqueForValueColumnShouldReturnValues()
{
final Optional<Set<String>> values = target.getNonNullColumnValuesIfAllUnique(VALUE_COLUMN, Integer.MAX_VALUE);
final Joinable.ColumnValuesWithUniqueFlag values = target.getNonNullColumnValues(VALUE_COLUMN, Integer.MAX_VALUE);
Assert.assertEquals(Optional.of(ImmutableSet.of("1", "2")), values);
Assert.assertEquals(ImmutableSet.of("1", "2"), values.getColumnValues());
}
@Test
public void getNonNullColumnValuesIfAllUniqueForNonexistentColumnShouldReturnEmpty()
{
final Optional<Set<String>> values = target.getNonNullColumnValuesIfAllUnique("nonexistent", Integer.MAX_VALUE);
final Joinable.ColumnValuesWithUniqueFlag values = target.getNonNullColumnValues("nonexistent", Integer.MAX_VALUE);
Assert.assertEquals(Optional.empty(), values);
Assert.assertEquals(ImmutableSet.of(), values.getColumnValues());
}
@Test
public void getNonNullColumnValuesIfAllUniqueForKeyColumnShouldReturnValues()
{
final Optional<Set<String>> values = target.getNonNullColumnValuesIfAllUnique(KEY_COLUMN, Integer.MAX_VALUE);
final Joinable.ColumnValuesWithUniqueFlag values = target.getNonNullColumnValues(KEY_COLUMN, Integer.MAX_VALUE);
Assert.assertEquals(
Optional.of(ImmutableSet.of("foo", "bar", "baz")),
values
ImmutableSet.of("foo", "bar", "baz"),
values.getColumnValues()
);
}
@Test
public void getNonNullColumnValuesIfAllUniqueForAllSameColumnShouldReturnEmpty()
{
final Optional<Set<String>> values = target.getNonNullColumnValuesIfAllUnique(ALL_SAME_COLUMN, Integer.MAX_VALUE);
final Joinable.ColumnValuesWithUniqueFlag values = target.getNonNullColumnValues(ALL_SAME_COLUMN, Integer.MAX_VALUE);
Assert.assertEquals(
Optional.empty(),
values
ImmutableSet.of("1"),
values.getColumnValues()
);
Assert.assertFalse(values.isAllUnique());
}
@Test
public void getNonNullColumnValuesIfAllUniqueForKeyColumnWithLowMaxValuesShouldReturnEmpty()
{
final Optional<Set<String>> values = target.getNonNullColumnValuesIfAllUnique(KEY_COLUMN, 1);
final Joinable.ColumnValuesWithUniqueFlag values = target.getNonNullColumnValues(KEY_COLUMN, 1);
Assert.assertEquals(Optional.empty(), values);
Assert.assertEquals(ImmutableSet.of(), values.getColumnValues());
}
}