Add join prefix duplicate/shadowing check (#9384)

* Add join prefix duplicate/shadowing check

* Fix format string

* PR comments

* PR comment

* Optimize loop PR comment
This commit is contained in:
Jonathan Wei 2020-02-25 18:17:23 -08:00 committed by GitHub
parent d771b42ed1
commit 5ce9c81b68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 121 additions and 11 deletions

View File

@ -42,6 +42,12 @@ public class HashJoinSegment extends AbstractSegment
private final List<JoinableClause> clauses;
private final boolean enableFilterPushDown;
/**
* @param baseSegment The left-hand side base segment
* @param clauses The right-hand side clauses. The caller is responsible for ensuring that there are no
* duplicate prefixes or prefixes that shadow each other across the clauses
* @param enableFilterPushDown Whether to enable filter push down optimizations to the base segment
*/
public HashJoinSegment(
Segment baseSegment,
List<JoinableClause> clauses,

View File

@ -19,6 +19,7 @@
package org.apache.druid.segment.join;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.granularity.Granularity;
@ -56,16 +57,12 @@ public class HashJoinSegmentStorageAdapter implements StorageAdapter
private final List<JoinableClause> clauses;
private final boolean enableFilterPushDown;
HashJoinSegmentStorageAdapter(
StorageAdapter baseAdapter,
List<JoinableClause> clauses
)
{
this.baseAdapter = baseAdapter;
this.clauses = clauses;
this.enableFilterPushDown = QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN;
}
/**
* @param baseAdapter A StorageAdapter for the left-hand side base segment
* @param clauses The right-hand side clauses. The caller is responsible for ensuring that there are no
* duplicate prefixes or prefixes that shadow each other across the clauses
* @param enableFilterPushDown Whether to enable filter push down optimizations to the base segment
*/
HashJoinSegmentStorageAdapter(
StorageAdapter baseAdapter,
List<JoinableClause> clauses,
@ -77,6 +74,15 @@ public class HashJoinSegmentStorageAdapter implements StorageAdapter
this.enableFilterPushDown = enableFilterPushDown;
}
@VisibleForTesting
HashJoinSegmentStorageAdapter(
StorageAdapter baseAdapter,
List<JoinableClause> clauses
)
{
this(baseAdapter, clauses, QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN);
}
@Override
public Interval getInterval()
{

View File

@ -27,6 +27,8 @@ import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.utils.JvmUtils;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
@ -38,6 +40,9 @@ import java.util.stream.Collectors;
*/
public class Joinables
{
private static final Comparator<String> DESCENDING_LENGTH_STRING_COMPARATOR = (s1, s2) ->
Integer.compare(s2.length(), s1.length());
/**
* Checks that "prefix" is a valid prefix for a join clause (see {@link JoinableClause#getPrefix()}) and, if so,
* returns it. Otherwise, throws an exception.
@ -59,7 +64,7 @@ public class Joinables
public static boolean isPrefixedBy(final String columnName, final String prefix)
{
return columnName.startsWith(prefix) && columnName.length() > prefix.length();
return columnName.length() > prefix.length() && columnName.startsWith(prefix);
}
/**
@ -100,6 +105,9 @@ public class Joinables
final JoinableFactory joinableFactory
)
{
// Since building a JoinableClause can be expensive, check for prefix conflicts before building
checkPreJoinableClausesForDuplicatesAndShadowing(clauses);
return clauses.stream().map(preJoinableClause -> {
final Optional<Joinable> joinable = joinableFactory.build(
preJoinableClause.getDataSource(),
@ -114,4 +122,42 @@ public class Joinables
);
}).collect(Collectors.toList());
}
private static void checkPreJoinableClausesForDuplicatesAndShadowing(
final List<PreJoinableClause> preJoinableClauses
)
{
List<String> prefixes = new ArrayList<>();
for (PreJoinableClause clause : preJoinableClauses) {
prefixes.add(clause.getPrefix());
}
checkPrefixesForDuplicatesAndShadowing(prefixes);
}
/**
* Check if any prefixes in the provided list duplicate or shadow each other.
*
* @param prefixes A mutable list containing the prefixes to check. This list will be sorted by descending
* string length.
*/
public static void checkPrefixesForDuplicatesAndShadowing(
final List<String> prefixes
)
{
// this is a naive approach that assumes we'll typically handle only a small number of prefixes
prefixes.sort(DESCENDING_LENGTH_STRING_COMPARATOR);
for (int i = 0; i < prefixes.size(); i++) {
String prefix = prefixes.get(i);
for (int k = i + 1; k < prefixes.size(); k++) {
String otherPrefix = prefixes.get(k);
if (prefix.equals(otherPrefix)) {
throw new IAE("Detected duplicate prefix in join clauses: [%s]", prefix);
}
if (isPrefixedBy(prefix, otherPrefix)) {
throw new IAE("Detected conflicting prefixes in join clauses: [%s, %s]", prefix, otherPrefix);
}
}
}
}
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.segment.join;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.QueryContexts;
@ -34,6 +35,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
@ -156,4 +159,53 @@ public class JoinablesTest
Assert.assertNotSame(Function.identity(), segmentMapFn);
}
@Test
public void test_checkClausePrefixesForDuplicatesAndShadowing_noConflicts()
{
List<String> prefixes = Arrays.asList(
"AA",
"AB",
"AC",
"aa",
"ab",
"ac",
"BA"
);
Joinables.checkPrefixesForDuplicatesAndShadowing(prefixes);
}
@Test
public void test_checkClausePrefixesForDuplicatesAndShadowing_duplicate()
{
expectedException.expect(IAE.class);
expectedException.expectMessage("Detected duplicate prefix in join clauses: [AA]");
List<String> prefixes = Arrays.asList(
"AA",
"AA",
"ABCD"
);
Joinables.checkPrefixesForDuplicatesAndShadowing(prefixes);
}
@Test
public void test_checkClausePrefixesForDuplicatesAndShadowing_shadowing()
{
expectedException.expect(IAE.class);
expectedException.expectMessage("Detected conflicting prefixes in join clauses: [ABC.DEF, ABC.]");
List<String> prefixes = Arrays.asList(
"BASE.",
"BASEBALL",
"123.456",
"23.45",
"ABC.",
"ABC.DEF"
);
Joinables.checkPrefixesForDuplicatesAndShadowing(prefixes);
}
}