From 9da42a9ef05bbeefddfc62b4019227fd7c975f93 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Mon, 3 Oct 2022 12:50:13 -0700 Subject: [PATCH] Temp changes refactoring --- .../msq/querykit/InputNumberDataSource.java | 6 + .../org/apache/druid/query/DataSource.java | 9 + .../apache/druid/query/InlineDataSource.java | 7 + .../apache/druid/query/JoinDataSource.java | 33 +++ .../apache/druid/query/LookupDataSource.java | 7 + .../apache/druid/query/QueryDataSource.java | 7 + .../apache/druid/query/TableDataSource.java | 7 + .../apache/druid/query/UnionDataSource.java | 7 + .../druid/query/JoinDataSourceTest.java | 39 +++ .../join/JoinableFactoryWrapperTest.java | 271 ------------------ .../druid/segment/join/NoopDataSource.java | 7 + .../druid/client/CachingClusteredClient.java | 3 +- .../appenderator/SinkQuerySegmentWalker.java | 2 +- .../server/coordination/ServerManager.java | 2 +- .../calcite/external/ExternalDataSource.java | 7 + 15 files changed, 139 insertions(+), 275 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java index 483344de394..2cbddaf910b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java @@ -103,6 +103,12 @@ public class InputNumberDataSource implements DataSource return analysis.getDataSource().createSegmentMapFunction(query, new AtomicLong()); } + @Override + public byte[] getCacheKey(DataSourceAnalysis dataSourceAnalysis) + { + return new byte[]{}; + } + @JsonProperty public int getInputNumber() { diff --git a/processing/src/main/java/org/apache/druid/query/DataSource.java b/processing/src/main/java/org/apache/druid/query/DataSource.java index aad3ac1ef27..f515b0ed292 100644 --- a/processing/src/main/java/org/apache/druid/query/DataSource.java +++ b/processing/src/main/java/org/apache/druid/query/DataSource.java @@ -21,6 +21,7 @@ package org.apache.druid.query; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.SegmentReference; import java.util.List; @@ -99,4 +100,12 @@ public interface DataSource * @return the segment function */ Function createSegmentMapFunction(Query query, AtomicLong cpuTimeAcc); + + /** + * + * @return a non-empty byte array - If there is join datasource involved and caching is possible. + * NULL - There is a join but caching is not possible. It may happen if one of the participating datasource + * in the JOIN is not cacheable. + */ + byte[] getCacheKey(DataSourceAnalysis dataSourceAnalysis); } diff --git a/processing/src/main/java/org/apache/druid/query/InlineDataSource.java b/processing/src/main/java/org/apache/druid/query/InlineDataSource.java index dc7606e1be2..200e6bab435 100644 --- a/processing/src/main/java/org/apache/druid/query/InlineDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/InlineDataSource.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.RowAdapter; import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.column.ColumnType; @@ -243,6 +244,12 @@ public class InlineDataSource implements DataSource return Function.identity(); } + @Override + public byte[] getCacheKey(DataSourceAnalysis dataSourceAnalysis) + { + return new byte[]{}; + } + /** * Returns the row signature (map of column name to type) for this inline datasource. Note that types may * be null, meaning we know we have a column with a certain name, but we don't know what its type is. diff --git a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java index a3dda5ea664..4df0a62d796 100644 --- a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java @@ -33,7 +33,9 @@ import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.filter.Filter; import org.apache.druid.query.planning.DataSourceAnalysis; @@ -59,6 +61,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; @@ -86,6 +89,8 @@ public class JoinDataSource implements DataSource private final String rightPrefix; private final JoinConditionAnalysis conditionAnalysis; private final JoinType joinType; + private static final byte JOIN_OPERATION = 0x1; + private static final Logger log = new Logger(JoinDataSource.class); // An optional filter on the left side if left is direct table access @Nullable private final DimFilter leftFilter; @@ -425,4 +430,32 @@ public class JoinDataSource implements DataSource ); return segmentMapFn; } + + @Override + public byte[] getCacheKey(DataSourceAnalysis dataSourceAnalysis) + { + final List clauses = dataSourceAnalysis.getPreJoinableClauses(); + if (clauses.isEmpty()) { + throw new IAE("No join clauses to build the cache key for data source [%s]", dataSourceAnalysis.getDataSource()); + } + + final CacheKeyBuilder keyBuilder; + keyBuilder = new CacheKeyBuilder(JOIN_OPERATION); + if (dataSourceAnalysis.getJoinBaseTableFilter().isPresent()) { + keyBuilder.appendCacheable(dataSourceAnalysis.getJoinBaseTableFilter().get()); + } + for (PreJoinableClause clause : clauses) { + Optional bytes = joinableFactoryWrapper.getJoinableFactory().computeJoinCacheKey(clause.getDataSource(), clause.getCondition()); + if (!bytes.isPresent()) { + // Encountered a data source which didn't support cache yet + log.debug("skipping caching for join since [%s] does not support caching", clause.getDataSource()); + return new byte[]{}; + } + keyBuilder.appendByteArray(bytes.get()); + keyBuilder.appendString(clause.getCondition().getOriginalExpression()); + keyBuilder.appendString(clause.getPrefix()); + keyBuilder.appendString(clause.getJoinType().name()); + } + return keyBuilder.build(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/LookupDataSource.java b/processing/src/main/java/org/apache/druid/query/LookupDataSource.java index 098a50374b1..c36fc6b424d 100644 --- a/processing/src/main/java/org/apache/druid/query/LookupDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/LookupDataSource.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.SegmentReference; import java.util.Collections; @@ -108,6 +109,12 @@ public class LookupDataSource implements DataSource return Function.identity(); } + @Override + public byte[] getCacheKey(DataSourceAnalysis dataSourceAnalysis) + { + return new byte[]{}; + } + @Override public boolean equals(Object o) { diff --git a/processing/src/main/java/org/apache/druid/query/QueryDataSource.java b/processing/src/main/java/org/apache/druid/query/QueryDataSource.java index 5cff91e3ed1..807ea4bc2aa 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/QueryDataSource.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.SegmentReference; import java.util.Collections; @@ -100,6 +101,12 @@ public class QueryDataSource implements DataSource return Function.identity(); } + @Override + public byte[] getCacheKey(DataSourceAnalysis dataSourceAnalysis) + { + return new byte[]{}; + } + @Override public String toString() { diff --git a/processing/src/main/java/org/apache/druid/query/TableDataSource.java b/processing/src/main/java/org/apache/druid/query/TableDataSource.java index f547a1d263d..ac89b0b4584 100644 --- a/processing/src/main/java/org/apache/druid/query/TableDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/TableDataSource.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.SegmentReference; import java.util.Collections; @@ -105,6 +106,12 @@ public class TableDataSource implements DataSource return Function.identity(); } + @Override + public byte[] getCacheKey(DataSourceAnalysis dataSourceAnalysis) + { + return new byte[]{}; + } + @Override public String toString() { diff --git a/processing/src/main/java/org/apache/druid/query/UnionDataSource.java b/processing/src/main/java/org/apache/druid/query/UnionDataSource.java index 6fd36f1188f..a857897e727 100644 --- a/processing/src/main/java/org/apache/druid/query/UnionDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/UnionDataSource.java @@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.SegmentReference; import java.util.List; @@ -117,6 +118,12 @@ public class UnionDataSource implements DataSource return Function.identity(); } + @Override + public byte[] getCacheKey(DataSourceAnalysis dataSourceAnalysis) + { + return new byte[]{}; + } + @Override public boolean equals(Object o) { diff --git a/processing/src/test/java/org/apache/druid/query/JoinDataSourceTest.java b/processing/src/test/java/org/apache/druid/query/JoinDataSourceTest.java index c1fe72d18ee..2cdac7efadd 100644 --- a/processing/src/test/java/org/apache/druid/query/JoinDataSourceTest.java +++ b/processing/src/test/java/org/apache/druid/query/JoinDataSourceTest.java @@ -22,12 +22,22 @@ package org.apache.druid.query; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterators; import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.filter.TrueDimFilter; +import org.apache.druid.query.planning.DataSourceAnalysis; +import org.apache.druid.query.planning.PreJoinableClause; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.join.JoinConditionAnalysis; import org.apache.druid.segment.join.JoinType; import org.apache.druid.segment.join.JoinableFactoryWrapper; +import org.apache.druid.segment.join.JoinableFactoryWrapperTest; +import org.apache.druid.segment.join.NoopDataSource; +import org.apache.druid.segment.join.NoopJoinableFactory; +import org.easymock.EasyMock; import org.easymock.Mock; import org.junit.Assert; import org.junit.Rule; @@ -35,7 +45,9 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.mockito.Mockito; +import java.util.Arrays; import java.util.Collections; +import java.util.Optional; public class JoinDataSourceTest @@ -180,6 +192,33 @@ public class JoinDataSourceTest Assert.assertEquals(joinDataSource, deserialized); } + @Test + public void test_computeJoinDataSourceCacheKey_noClauses() + { + DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class); + JoinDataSource joinDataSource = JoinDataSource.create( + new TableDataSource("table1"), + new TableDataSource("table2"), + "j.", + "x == \"j.x\"", + JoinType.LEFT, + TrueDimFilter.instance(), + ExprMacroTable.nil(), + joinableFactoryWrapper + ); + EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.emptyList()); + EasyMock.expect(analysis.getJoinBaseTableFilter()).andReturn(Optional.empty()); + EasyMock.expect(analysis.getDataSource()).andReturn(joinDataSource); + EasyMock.replay(analysis); + + expectedException.expect(IAE.class); + expectedException.expectMessage(StringUtils.format( + "No join clauses to build the cache key for data source [%s]", + joinDataSource + )); + joinDataSource.getCacheKey(analysis); + } + @Test public void testException_leftFilterOnNonTableSource() { diff --git a/processing/src/test/java/org/apache/druid/segment/join/JoinableFactoryWrapperTest.java b/processing/src/test/java/org/apache/druid/segment/join/JoinableFactoryWrapperTest.java index 198c25782fc..e5eb8be18dc 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/JoinableFactoryWrapperTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/JoinableFactoryWrapperTest.java @@ -124,228 +124,6 @@ public class JoinableFactoryWrapperTest extends NullHandlingTest @Rule public ExpectedException expectedException = ExpectedException.none(); - @Test - public void test_computeJoinDataSourceCacheKey_noClauses() - { - DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class); - DataSource dataSource = new NoopDataSource(); - EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.emptyList()); - EasyMock.expect(analysis.getJoinBaseTableFilter()).andReturn(Optional.empty()); - EasyMock.expect(analysis.getDataSource()).andReturn(dataSource); - EasyMock.replay(analysis); - JoinableFactoryWrapper joinableFactoryWrapper = new JoinableFactoryWrapper(new JoinableFactoryWithCacheKey()); - - expectedException.expect(IAE.class); - expectedException.expectMessage(StringUtils.format( - "No join clauses to build the cache key for data source [%s]", - dataSource - )); - joinableFactoryWrapper.computeJoinDataSourceCacheKey(analysis); - } - - @Test - public void test_computeJoinDataSourceCacheKey_noHashJoin() - { - PreJoinableClause clause1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j."); - PreJoinableClause clause2 = makeGlobalPreJoinableClause("dataSource_2", "x != \"h.x\"", "h."); - DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class); - EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Arrays.asList(clause1, clause2)).anyTimes(); - EasyMock.expect(analysis.getJoinBaseTableFilter()).andReturn(Optional.of(TrueDimFilter.instance())).anyTimes(); - EasyMock.replay(analysis); - JoinableFactoryWrapper joinableFactoryWrapper = new JoinableFactoryWrapper(new JoinableFactoryWithCacheKey()); - Optional cacheKey = joinableFactoryWrapper.computeJoinDataSourceCacheKey(analysis); - - Assert.assertFalse(cacheKey.isPresent()); - } - - @Test - public void test_computeJoinDataSourceCacheKey_cachingUnsupported() - { - PreJoinableClause clause1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j."); - DataSource dataSource = new LookupDataSource("lookup"); - PreJoinableClause clause2 = makePreJoinableClause(dataSource, "x == \"h.x\"", "h.", JoinType.LEFT); - DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class); - EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Arrays.asList(clause1, clause2)).anyTimes(); - EasyMock.expect(analysis.getJoinBaseTableFilter()).andReturn(Optional.of(TrueDimFilter.instance())).anyTimes(); - EasyMock.replay(analysis); - JoinableFactoryWrapper joinableFactoryWrapper = new JoinableFactoryWrapper(new JoinableFactoryWithCacheKey()); - Optional cacheKey = joinableFactoryWrapper.computeJoinDataSourceCacheKey(analysis); - - Assert.assertFalse(cacheKey.isPresent()); - } - - @Test - public void test_computeJoinDataSourceCacheKey_usableClauses() - { - - PreJoinableClause clause1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j."); - PreJoinableClause clause2 = makeGlobalPreJoinableClause("dataSource_2", "x == \"h.x\"", "h."); - DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class); - EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Arrays.asList(clause1, clause2)).anyTimes(); - EasyMock.expect(analysis.getJoinBaseTableFilter()).andReturn(Optional.empty()).anyTimes(); - EasyMock.replay(analysis); - JoinableFactoryWrapper joinableFactoryWrapper = new JoinableFactoryWrapper(new JoinableFactoryWithCacheKey()); - Optional cacheKey = joinableFactoryWrapper.computeJoinDataSourceCacheKey(analysis); - - Assert.assertTrue(cacheKey.isPresent()); - } - - @Test - public void test_computeJoinDataSourceCacheKey_keyChangesWithExpression() - { - DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class); - EasyMock.expect(analysis.getJoinBaseTableFilter()).andReturn(Optional.empty()).anyTimes(); - JoinableFactoryWrapper joinableFactoryWrapper = new JoinableFactoryWrapper(new JoinableFactoryWithCacheKey()); - - PreJoinableClause clause1 = makeGlobalPreJoinableClause("dataSource_1", "y == \"j.y\"", "j."); - EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause1)).anyTimes(); - EasyMock.replay(analysis); - - Optional cacheKey1 = joinableFactoryWrapper.computeJoinDataSourceCacheKey(analysis); - Assert.assertTrue(cacheKey1.isPresent()); - Assert.assertNotEquals(0, cacheKey1.get().length); - - PreJoinableClause clause2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j."); - EasyMock.reset(analysis); - EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause2)).anyTimes(); - EasyMock.expect(analysis.getJoinBaseTableFilter()).andReturn(Optional.empty()).anyTimes(); - EasyMock.replay(analysis); - Optional cacheKey2 = joinableFactoryWrapper.computeJoinDataSourceCacheKey(analysis); - Assert.assertTrue(cacheKey2.isPresent()); - - Assert.assertFalse(Arrays.equals(cacheKey1.get(), cacheKey2.get())); - } - - @Test - public void test_computeJoinDataSourceCacheKey_keyChangesWithJoinType() - { - DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class); - EasyMock.expect(analysis.getJoinBaseTableFilter()).andReturn(Optional.empty()).anyTimes(); - JoinableFactoryWrapper joinableFactoryWrapper = new JoinableFactoryWrapper(new JoinableFactoryWithCacheKey()); - - PreJoinableClause clause1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.", JoinType.LEFT); - EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause1)).anyTimes(); - EasyMock.replay(analysis); - - Optional cacheKey1 = joinableFactoryWrapper.computeJoinDataSourceCacheKey(analysis); - Assert.assertTrue(cacheKey1.isPresent()); - Assert.assertNotEquals(0, cacheKey1.get().length); - - PreJoinableClause clause2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j.", JoinType.INNER); - EasyMock.reset(analysis); - EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause2)).anyTimes(); - EasyMock.expect(analysis.getJoinBaseTableFilter()).andReturn(Optional.empty()).anyTimes(); - EasyMock.replay(analysis); - Optional cacheKey2 = joinableFactoryWrapper.computeJoinDataSourceCacheKey(analysis); - Assert.assertTrue(cacheKey2.isPresent()); - - Assert.assertFalse(Arrays.equals(cacheKey1.get(), cacheKey2.get())); - } - - @Test - public void test_computeJoinDataSourceCacheKey_keyChangesWithPrefix() - { - DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class); - EasyMock.expect(analysis.getJoinBaseTableFilter()).andReturn(Optional.empty()).anyTimes(); - JoinableFactoryWrapper joinableFactoryWrapper = new JoinableFactoryWrapper(new JoinableFactoryWithCacheKey()); - - PreJoinableClause clause1 = makeGlobalPreJoinableClause("dataSource_1", "abc == xyz", "ab"); - EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause1)).anyTimes(); - EasyMock.replay(analysis); - - Optional cacheKey1 = joinableFactoryWrapper.computeJoinDataSourceCacheKey(analysis); - Assert.assertTrue(cacheKey1.isPresent()); - Assert.assertNotEquals(0, cacheKey1.get().length); - - PreJoinableClause clause2 = makeGlobalPreJoinableClause("dataSource_1", "abc == xyz", "xy"); - EasyMock.reset(analysis); - EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause2)).anyTimes(); - EasyMock.expect(analysis.getJoinBaseTableFilter()).andReturn(Optional.empty()).anyTimes(); - EasyMock.replay(analysis); - Optional cacheKey2 = joinableFactoryWrapper.computeJoinDataSourceCacheKey(analysis); - Assert.assertTrue(cacheKey2.isPresent()); - - Assert.assertFalse(Arrays.equals(cacheKey1.get(), cacheKey2.get())); - } - - @Test - public void test_computeJoinDataSourceCacheKey_keyChangesWithBaseFilter() - { - DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class); - EasyMock.expect(analysis.getJoinBaseTableFilter()).andReturn(Optional.of(TrueDimFilter.instance())).anyTimes(); - JoinableFactoryWrapper joinableFactoryWrapper = new JoinableFactoryWrapper(new JoinableFactoryWithCacheKey()); - - PreJoinableClause clause1 = makeGlobalPreJoinableClause("dataSource_1", "abc == xyz", "ab"); - EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause1)).anyTimes(); - EasyMock.replay(analysis); - - Optional cacheKey1 = joinableFactoryWrapper.computeJoinDataSourceCacheKey(analysis); - Assert.assertTrue(cacheKey1.isPresent()); - Assert.assertNotEquals(0, cacheKey1.get().length); - - PreJoinableClause clause2 = makeGlobalPreJoinableClause("dataSource_1", "abc == xyz", "ab"); - EasyMock.reset(analysis); - EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause2)).anyTimes(); - EasyMock.expect(analysis.getJoinBaseTableFilter()).andReturn(Optional.of(FalseDimFilter.instance())).anyTimes(); - EasyMock.replay(analysis); - Optional cacheKey2 = joinableFactoryWrapper.computeJoinDataSourceCacheKey(analysis); - Assert.assertTrue(cacheKey2.isPresent()); - - Assert.assertFalse(Arrays.equals(cacheKey1.get(), cacheKey2.get())); - } - - @Test - public void test_computeJoinDataSourceCacheKey_keyChangesWithJoinable() - { - DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class); - EasyMock.expect(analysis.getJoinBaseTableFilter()).andReturn(Optional.empty()).anyTimes(); - JoinableFactoryWrapper joinableFactoryWrapper = new JoinableFactoryWrapper(new JoinableFactoryWithCacheKey()); - - PreJoinableClause clause1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j."); - EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause1)).anyTimes(); - EasyMock.replay(analysis); - - Optional cacheKey1 = joinableFactoryWrapper.computeJoinDataSourceCacheKey(analysis); - Assert.assertTrue(cacheKey1.isPresent()); - Assert.assertNotEquals(0, cacheKey1.get().length); - - PreJoinableClause clause2 = makeGlobalPreJoinableClause("dataSource_2", "x == \"j.x\"", "j."); - EasyMock.reset(analysis); - EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause2)).anyTimes(); - EasyMock.expect(analysis.getJoinBaseTableFilter()).andReturn(Optional.empty()).anyTimes(); - - EasyMock.replay(analysis); - Optional cacheKey2 = joinableFactoryWrapper.computeJoinDataSourceCacheKey(analysis); - Assert.assertTrue(cacheKey2.isPresent()); - - Assert.assertFalse(Arrays.equals(cacheKey1.get(), cacheKey2.get())); - } - - @Test - public void test_computeJoinDataSourceCacheKey_sameKeyForSameJoin() - { - DataSourceAnalysis analysis = EasyMock.mock(DataSourceAnalysis.class); - JoinableFactoryWrapper joinableFactoryWrapper = new JoinableFactoryWrapper(new JoinableFactoryWithCacheKey()); - - PreJoinableClause clause1 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j."); - EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause1)).anyTimes(); - EasyMock.expect(analysis.getJoinBaseTableFilter()).andReturn(Optional.empty()).anyTimes(); - EasyMock.replay(analysis); - - Optional cacheKey1 = joinableFactoryWrapper.computeJoinDataSourceCacheKey(analysis); - Assert.assertTrue(cacheKey1.isPresent()); - Assert.assertNotEquals(0, cacheKey1.get().length); - - PreJoinableClause clause2 = makeGlobalPreJoinableClause("dataSource_1", "x == \"j.x\"", "j."); - EasyMock.reset(analysis); - EasyMock.expect(analysis.getPreJoinableClauses()).andReturn(Collections.singletonList(clause2)).anyTimes(); - EasyMock.expect(analysis.getJoinBaseTableFilter()).andReturn(Optional.empty()).anyTimes(); - EasyMock.replay(analysis); - Optional cacheKey2 = joinableFactoryWrapper.computeJoinDataSourceCacheKey(analysis); - Assert.assertTrue(cacheKey2.isPresent()); - - Assert.assertArrayEquals(cacheKey1.get(), cacheKey2.get()); - } @Test public void test_checkClausePrefixesForDuplicatesAndShadowing_noConflicts() @@ -824,53 +602,4 @@ public class JoinableFactoryWrapperTest extends NullHandlingTest conversion ); } - - private PreJoinableClause makeGlobalPreJoinableClause(String tableName, String expression, String prefix) - { - return makeGlobalPreJoinableClause(tableName, expression, prefix, JoinType.LEFT); - } - - private PreJoinableClause makeGlobalPreJoinableClause( - String tableName, - String expression, - String prefix, - JoinType joinType - ) - { - GlobalTableDataSource dataSource = new GlobalTableDataSource(tableName); - return makePreJoinableClause(dataSource, expression, prefix, joinType); - } - - private PreJoinableClause makePreJoinableClause( - DataSource dataSource, - String expression, - String prefix, - JoinType joinType - ) - { - JoinConditionAnalysis conditionAnalysis = JoinConditionAnalysis.forExpression( - expression, - prefix, - ExprMacroTable.nil() - ); - return new PreJoinableClause( - prefix, - dataSource, - joinType, - conditionAnalysis - ); - } - - private static class JoinableFactoryWithCacheKey extends NoopJoinableFactory - { - @Override - public Optional computeJoinCacheKey(DataSource dataSource, JoinConditionAnalysis condition) - { - if (dataSource.isCacheable(false) && condition.canHashJoin()) { - String tableName = Iterators.getOnlyElement(dataSource.getTableNames().iterator()); - return Optional.of(StringUtils.toUtf8(tableName)); - } - return Optional.empty(); - } - } } diff --git a/processing/src/test/java/org/apache/druid/segment/join/NoopDataSource.java b/processing/src/test/java/org/apache/druid/segment/join/NoopDataSource.java index c3cdf6b7764..e134a7a9f8c 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/NoopDataSource.java +++ b/processing/src/test/java/org/apache/druid/segment/join/NoopDataSource.java @@ -21,6 +21,7 @@ package org.apache.druid.segment.join; import org.apache.druid.query.DataSource; import org.apache.druid.query.Query; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.SegmentReference; import java.util.List; @@ -77,4 +78,10 @@ public class NoopDataSource implements DataSource { return Function.identity(); } + + @Override + public byte[] getCacheKey(DataSourceAnalysis dataSourceAnalysis) + { + return new byte[]{}; + } } diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 69952e22ce3..c3ab1b4f05c 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -837,8 +837,7 @@ public class CachingClusteredClient implements QuerySegmentWalker { Preconditions.checkNotNull(strategy, "strategy cannot be null"); if (dataSourceAnalysis.isJoin()) { - byte[] joinDataSourceCacheKey = joinableFactoryWrapper.computeJoinDataSourceCacheKey(dataSourceAnalysis) - .orElse(null); + byte[] joinDataSourceCacheKey = dataSourceAnalysis.getDataSource().getCacheKey(dataSourceAnalysis); if (null == joinDataSourceCacheKey) { return null; // A join operation that does not support caching } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 3456a9e6ec9..1d5f13d8701 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -178,7 +178,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker // We compute the join cache key here itself so it doesn't need to be re-computed for every segment final Optional cacheKeyPrefix = analysis.isJoin() - ? joinableFactoryWrapper.computeJoinDataSourceCacheKey(analysis) + ? Optional.of(analysis.getDataSource().getCacheKey(analysis)) : Optional.of(StringUtils.EMPTY_BYTES); Iterable> perSegmentRunners = Iterables.transform( diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java index 333525ee581..2681fdc8f43 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java @@ -198,7 +198,7 @@ public class ServerManager implements QuerySegmentWalker // We compute the join cache key here itself so it doesn't need to be re-computed for every segment final Optional cacheKeyPrefix = analysis.isJoin() - ? joinableFactoryWrapper.computeJoinDataSourceCacheKey(analysis) + ? Optional.of(analysis.getDataSource().getCacheKey(analysis)) : Optional.of(StringUtils.EMPTY_BYTES); final FunctionalIterable> queryRunners = FunctionalIterable diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalDataSource.java b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalDataSource.java index f685f889e21..81b05dfd368 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalDataSource.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalDataSource.java @@ -28,6 +28,7 @@ import org.apache.druid.data.input.InputSource; import org.apache.druid.java.util.common.IAE; import org.apache.druid.query.DataSource; import org.apache.druid.query.Query; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.column.RowSignature; @@ -132,6 +133,12 @@ public class ExternalDataSource implements DataSource return Function.identity(); } + @Override + public byte[] getCacheKey(final DataSourceAnalysis dataSourceAnalysis) + { + return new byte[]{}; + } + @Override public boolean equals(Object o) {