diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java index 241bd80ff72..bf7733db180 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java @@ -165,7 +165,7 @@ public class TopNTypeInterfaceBenchmark queryAggs.add(new DoubleMinAggregatorFactory("minFloatZipf", "minFloatZipf")); queryAggs.add(new HyperUniquesAggregatorFactory("hyperUniquesMet", "hyper")); - // Use an IdentityExtractionFn to force usage of DimExtractionTopNAlgorithm + // Use an IdentityExtractionFn to force usage of HeapBasedTopNAlgorithm TopNQueryBuilder queryBuilderString = new TopNQueryBuilder() .dataSource("blah") .granularity(Granularities.ALL) @@ -174,7 +174,7 @@ public class TopNTypeInterfaceBenchmark .intervals(intervalSpec) .aggregators(queryAggs); - // DimExtractionTopNAlgorithm is always used for numeric columns + // HeapBasedTopNAlgorithm is always used for numeric columns TopNQueryBuilder queryBuilderLong = new TopNQueryBuilder() .dataSource("blah") .granularity(Granularities.ALL) diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java index 1d1bccdd7b1..f1eab7fe268 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java @@ -120,36 +120,34 @@ public class TopNQueryEngine final TopNAlgorithm topNAlgorithm; - if ( - selector.isHasExtractionFn() && + if (requiresHeapAlgorithm(selector, query, columnCapabilities)) { + // heap based algorithm selection + if (selector.isHasExtractionFn() && dimension.equals(ColumnHolder.TIME_COLUMN_NAME)) { // TimeExtractionTopNAlgorithm can work on any single-value dimension of type long. - // Once we have arbitrary dimension types following check should be replaced by checking - // that the column is of type long and single-value. - dimension.equals(ColumnHolder.TIME_COLUMN_NAME) - ) { - // A special TimeExtractionTopNAlgorithm is required, since DimExtractionTopNAlgorithm - // currently relies on the dimension cardinality to support lexicographic sorting - topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query); - } else if (selector.isHasExtractionFn()) { - topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query); - } else if (columnCapabilities == null || !(columnCapabilities.getType() == ValueType.STRING - && columnCapabilities.isDictionaryEncoded())) { - // Use HeapBasedTopNAlgorithm for non-Strings and for non-dictionary-encoded Strings, and for things we don't know - // which can happen for 'inline' data sources when this is run on the broker - topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query); - } else if (query.getDimensionSpec().getOutputType() != ValueType.STRING) { - // Use HeapBasedTopNAlgorithm when the dimension output type is a non-String. (It's like an extractionFn: there can be - // a many-to-one mapping, since numeric types can't represent all possible values of other types.) - topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query); - } else if (selector.isAggregateAllMetrics()) { - // sorted by dimension - topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool); - } else if (selector.isAggregateTopNMetricFirst() || query.getContextBoolean("doAggregateTopNMetricFirst", false)) { - // high cardinality dimensions with larger result sets - topNAlgorithm = new AggregateTopNMetricFirstAlgorithm(adapter, query, bufferPool); + // We might be able to use this for any long column with an extraction function, that is + // ValueType.LONG.equals(columnCapabilities.getType()) + // but this needs investigation to ensure that it is an improvement over HeapBasedTopNAlgorithm + + // A special TimeExtractionTopNAlgorithm is required since DimExtractionTopNAlgorithm + // currently relies on the dimension cardinality to support lexicographic sorting + topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query); + } else { + topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query); + } } else { - // anything else - topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool); + // pool based algorithm selection + if (selector.isAggregateAllMetrics()) { + // if sorted by dimension we should aggregate all metrics in a single pass, use the regular pooled algorithm for + // this + topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool); + } else if (selector.isAggregateTopNMetricFirst() || query.getContextBoolean("doAggregateTopNMetricFirst", false)) { + // for high cardinality dimensions with larger result sets we aggregate with only the ordering aggregation to + // compute the first 'n' values, and then for the rest of the metrics but for only the 'n' values + topNAlgorithm = new AggregateTopNMetricFirstAlgorithm(adapter, query, bufferPool); + } else { + // anything else, use the regular pooled algorithm + topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool); + } } if (queryMetrics != null) { queryMetrics.algorithm(topNAlgorithm); @@ -158,6 +156,40 @@ public class TopNQueryEngine return new TopNMapFn(query, topNAlgorithm); } + /** + * {@link PooledTopNAlgorithm} (and {@link AggregateTopNMetricFirstAlgorithm} which utilizes the pooled + * algorithm) are optimized off-heap algorithms for aggregating dictionary encoded string columns. These algorithms + * rely on dictionary ids being unique so to aggregate on the dictionary ids directly and defer + * {@link org.apache.druid.segment.DimensionSelector#lookupName(int)} until as late as possible in query processing. + * + * When these conditions are not true, we have an on-heap fall-back algorithm, the {@link HeapBasedTopNAlgorithm} + * (and {@link TimeExtractionTopNAlgorithm} for a specialized form for long columns) which aggregates on values of + * selectors. + */ + private static boolean requiresHeapAlgorithm( + final TopNAlgorithmSelector selector, + final TopNQuery query, + final ColumnCapabilities capabilities + ) + { + if (selector.isHasExtractionFn()) { + // extraction functions can have a many to one mapping, and should use a heap algorithm + return true; + } + + if (query.getDimensionSpec().getOutputType() != ValueType.STRING) { + // non-string output cannot use the pooled algorith, even if the underlying selector supports it + return true; + } + if (capabilities != null && capabilities.getType() == ValueType.STRING) { + // string columns must use the on heap algorithm unless they have the following capabilites + return !(capabilities.isDictionaryEncoded() && capabilities.areDictionaryValuesUnique().isTrue()); + } else { + // non-strings are not eligible to use the pooled algorithm, and should use a heap algorithm + return true; + } + } + public static boolean canApplyExtractionInPost(TopNQuery query) { return query.getDimensionSpec() != null diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java index cea7c775a75..92b2e8a3fe9 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java @@ -59,7 +59,7 @@ public class StringTopNColumnAggregatesProcessor implements TopNColumnAggregates throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality"); } - // This method is used for the DimExtractionTopNAlgorithm only. + // This method is used for the HeapBasedTopNAlgorithm only. // Unlike regular topN we cannot rely on ordering to optimize. // Optimization possibly requires a reverse lookup from value to ID, which is // not possible when applying an extraction function diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java index 445d8d64495..8a468d2edc3 100644 --- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java +++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java @@ -25,9 +25,12 @@ import org.apache.druid.collections.CloseableStupidPool; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.DataSource; import org.apache.druid.query.DefaultGenericQueryMetricsFactory; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; import org.apache.druid.query.DruidProcessingConfig; +import org.apache.druid.query.InlineDataSource; +import org.apache.druid.query.LookupDataSource; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryRunnerFactoryConglomerate; @@ -41,6 +44,7 @@ import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryRunnerFactory; import org.apache.druid.query.groupby.GroupByQueryRunnerTest; import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; +import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; import org.apache.druid.query.metadata.SegmentMetadataQueryConfig; import org.apache.druid.query.metadata.SegmentMetadataQueryQueryToolChest; import org.apache.druid.query.metadata.SegmentMetadataQueryRunnerFactory; @@ -61,7 +65,10 @@ import org.apache.druid.query.topn.TopNQueryRunnerFactory; import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.SegmentWrangler; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.join.InlineJoinableFactory; import org.apache.druid.segment.join.JoinableFactory; +import org.apache.druid.segment.join.LookupJoinableFactory; +import org.apache.druid.segment.join.MapJoinableFactoryTest; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy; @@ -273,4 +280,26 @@ public class QueryStackTests return conglomerate; } + public static JoinableFactory makeJoinableFactoryForLookup( + LookupExtractorFactoryContainerProvider lookupProvider + ) + { + return makeJoinableFactoryFromDefault(lookupProvider, null); + } + + public static JoinableFactory makeJoinableFactoryFromDefault( + @Nullable LookupExtractorFactoryContainerProvider lookupProvider, + @Nullable Map, JoinableFactory> custom + ) + { + ImmutableMap.Builder, JoinableFactory> builder = ImmutableMap.builder(); + builder.put(InlineDataSource.class, new InlineJoinableFactory()); + if (lookupProvider != null) { + builder.put(LookupDataSource.class, new LookupJoinableFactory(lookupProvider)); + } + if (custom != null) { + builder.putAll(custom); + } + return MapJoinableFactoryTest.fromMap(builder.build()); + } } diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java index 6b836b83ea4..3f43f76c175 100644 --- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java +++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java @@ -384,6 +384,12 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase final DatabaseMetaData metaData = client.getMetaData(); Assert.assertEquals( ImmutableList.of( + row( + Pair.of("TABLE_CAT", "druid"), + Pair.of("TABLE_NAME", CalciteTests.BROADCAST_DATASOURCE), + Pair.of("TABLE_SCHEM", "druid"), + Pair.of("TABLE_TYPE", "TABLE") + ), row( Pair.of("TABLE_CAT", "druid"), Pair.of("TABLE_NAME", CalciteTests.DATASOURCE1), @@ -441,6 +447,12 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase final DatabaseMetaData metaData = superuserClient.getMetaData(); Assert.assertEquals( ImmutableList.of( + row( + Pair.of("TABLE_CAT", "druid"), + Pair.of("TABLE_NAME", CalciteTests.BROADCAST_DATASOURCE), + Pair.of("TABLE_SCHEM", "druid"), + Pair.of("TABLE_TYPE", "TABLE") + ), row( Pair.of("TABLE_CAT", "druid"), Pair.of("TABLE_NAME", CalciteTests.DATASOURCE1), diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index 952b3cfe2f7..7c475e5fe03 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -422,7 +422,10 @@ public class BaseCalciteQueryTest extends CalciteTestBase @Before public void setUp() throws Exception { - walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder()); + walker = CalciteTests.createMockWalker( + conglomerate, + temporaryFolder.newFolder() + ); } @After diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 1606ef7e7b9..622842fd32e 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -38,6 +38,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.PeriodGranularity; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.Druids; +import org.apache.druid.query.GlobalTableDataSource; import org.apache.druid.query.LookupDataSource; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryDataSource; @@ -712,6 +713,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest + "WHERE TABLE_TYPE IN ('SYSTEM_TABLE', 'TABLE', 'VIEW')", ImmutableList.of(), ImmutableList.builder() + .add(new Object[]{"druid", CalciteTests.BROADCAST_DATASOURCE, "TABLE", "YES", "YES"}) .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE", "NO", "NO"}) .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE", "NO", "NO"}) .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE", "NO", "NO"}) @@ -741,6 +743,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest CalciteTests.SUPER_USER_AUTH_RESULT, ImmutableList.of(), ImmutableList.builder() + .add(new Object[]{"druid", CalciteTests.BROADCAST_DATASOURCE, "TABLE", "YES", "YES"}) .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE", "NO", "NO"}) .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE", "NO", "NO"}) .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE", "NO", "NO"}) @@ -14997,6 +15000,46 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ); } + @Test + @Parameters(source = QueryContextForJoinProvider.class) + public void testTopNOnStringWithNonSortedOrUniqueDictionary(Map queryContext) throws Exception + { + testQuery( + "SELECT druid.broadcast.dim4, COUNT(*)\n" + + "FROM druid.numfoo\n" + + "INNER JOIN druid.broadcast ON numfoo.dim4 = broadcast.dim4\n" + + "GROUP BY 1 ORDER BY 2 LIMIT 4", + queryContext, + ImmutableList.of( + new TopNQueryBuilder() + .dataSource( + join( + new TableDataSource(CalciteTests.DATASOURCE3), + new GlobalTableDataSource(CalciteTests.BROADCAST_DATASOURCE), + "j0.", + equalsCondition( + DruidExpression.fromColumn("dim4"), + DruidExpression.fromColumn("j0.dim4") + ), + JoinType.INNER + + ) + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .dimension(new DefaultDimensionSpec("j0.dim4", "_d0", ValueType.STRING)) + .threshold(4) + .aggregators(aggregators(new CountAggregatorFactory("a0"))) + .context(queryContext) + .metric(new InvertedTopNMetricSpec(new NumericTopNMetricSpec("a0"))) + .build() + ), + ImmutableList.of( + new Object[]{"a", 9L}, + new Object[]{"b", 9L} + ) + ); + } + /** * This is a provider of query contexts that should be used by join tests. * It tests various configs that can be passed to join queries. All the configs provided by this provider should diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index ca683a243ae..dfee466bcef 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -50,13 +50,17 @@ import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.NodeRole; import org.apache.druid.guice.ExpressionModule; import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.emitter.core.NoopEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.HttpResponseHandler; import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.DataSource; import org.apache.druid.query.DefaultGenericQueryMetricsFactory; +import org.apache.druid.query.GlobalTableDataSource; +import org.apache.druid.query.InlineDataSource; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.QuerySegmentWalker; @@ -72,8 +76,14 @@ import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.incremental.IncrementalIndexSchema; -import org.apache.druid.segment.join.MapJoinableFactory; +import org.apache.druid.segment.join.JoinConditionAnalysis; +import org.apache.druid.segment.join.Joinable; +import org.apache.druid.segment.join.JoinableFactory; +import org.apache.druid.segment.join.table.IndexedTableJoinable; +import org.apache.druid.segment.join.table.RowBasedIndexedTable; import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.DruidNode; @@ -125,6 +135,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Executor; import java.util.function.BooleanSupplier; @@ -140,6 +151,7 @@ public class CalciteTests public static final String DATASOURCE3 = "numfoo"; public static final String DATASOURCE4 = "foo4"; public static final String DATASOURCE5 = "lotsocolumns"; + public static final String BROADCAST_DATASOURCE = "broadcast"; public static final String FORBIDDEN_DATASOURCE = "forbiddenDatasource"; public static final String SOME_DATASOURCE = "some_datasource"; public static final String SOME_DATSOURCE_ESCAPED = "some\\_datasource"; @@ -214,7 +226,7 @@ public class CalciteTests private static final String TIMESTAMP_COLUMN = "t"; - private static final Injector INJECTOR = Guice.createInjector( + public static final Injector INJECTOR = Guice.createInjector( binder -> { binder.bind(Key.get(ObjectMapper.class, Json.class)).toInstance(TestHelper.makeJsonMapper()); @@ -605,6 +617,68 @@ public class CalciteTests ) ); + private static final InlineDataSource JOINABLE_BACKING_DATA = InlineDataSource.fromIterable( + RAW_ROWS1_WITH_NUMERIC_DIMS.stream().map(x -> new Object[]{ + x.get("dim1"), + x.get("dim2"), + x.get("dim3"), + x.get("dim4"), + x.get("dim5"), + x.get("d1"), + x.get("d2"), + x.get("f1"), + x.get("f2"), + x.get("l1"), + x.get("l2") + }).collect(Collectors.toList()), + RowSignature.builder() + .add("dim1", ValueType.STRING) + .add("dim2", ValueType.STRING) + .add("dim3", ValueType.STRING) + .add("dim4", ValueType.STRING) + .add("dim5", ValueType.STRING) + .add("d1", ValueType.DOUBLE) + .add("d2", ValueType.DOUBLE) + .add("f1", ValueType.FLOAT) + .add("f2", ValueType.FLOAT) + .add("l1", ValueType.LONG) + .add("l2", ValueType.LONG) + .build() + ); + + private static final Set KEY_COLUMNS = ImmutableSet.of("dim4"); + + private static final RowBasedIndexedTable JOINABLE_TABLE = new RowBasedIndexedTable( + JOINABLE_BACKING_DATA.getRowsAsList(), + JOINABLE_BACKING_DATA.rowAdapter(), + JOINABLE_BACKING_DATA.getRowSignature(), + KEY_COLUMNS, + DateTimes.nowUtc().toString() + ); + + public static GlobalTableDataSource CUSTOM_TABLE = new GlobalTableDataSource(BROADCAST_DATASOURCE); + + public static JoinableFactory CUSTOM_ROW_TABLE_JOINABLE = new JoinableFactory() + { + @Override + public boolean isDirectlyJoinable(DataSource dataSource) + { + return CUSTOM_TABLE.equals(dataSource); + } + + @Override + public Optional build( + DataSource dataSource, + JoinConditionAnalysis condition + ) + { + if (dataSource instanceof GlobalTableDataSource) { + return Optional.of(new IndexedTableJoinable(JOINABLE_TABLE)); + } + return Optional.empty(); + } + }; + private CalciteTests() { // No instantiation. @@ -649,12 +723,28 @@ public class CalciteTests return INJECTOR.getInstance(Key.get(ObjectMapper.class, Json.class)); } + public static JoinableFactory createDefaultJoinableFactory() + { + return QueryStackTests.makeJoinableFactoryFromDefault( + INJECTOR.getInstance(LookupExtractorFactoryContainerProvider.class), + ImmutableMap.of( + GlobalTableDataSource.class, + CUSTOM_ROW_TABLE_JOINABLE + ) + ); + } + public static SpecificSegmentsQuerySegmentWalker createMockWalker( final QueryRunnerFactoryConglomerate conglomerate, final File tmpDir ) { - return createMockWalker(conglomerate, tmpDir, QueryStackTests.DEFAULT_NOOP_SCHEDULER); + return createMockWalker( + conglomerate, + tmpDir, + QueryStackTests.DEFAULT_NOOP_SCHEDULER, + createDefaultJoinableFactory() + ); } public static SpecificSegmentsQuerySegmentWalker createMockWalker( @@ -662,6 +752,16 @@ public class CalciteTests final File tmpDir, final QueryScheduler scheduler ) + { + return createMockWalker(conglomerate, tmpDir, scheduler, null); + } + + public static SpecificSegmentsQuerySegmentWalker createMockWalker( + final QueryRunnerFactoryConglomerate conglomerate, + final File tmpDir, + final QueryScheduler scheduler, + final JoinableFactory joinableFactory + ) { final QueryableIndex index1 = IndexBuilder .create() @@ -713,7 +813,7 @@ public class CalciteTests final QueryableIndex someDatasourceIndex = IndexBuilder .create() - .tmpDir(new File(tmpDir, "1")) + .tmpDir(new File(tmpDir, "6")) .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) .schema(INDEX_SCHEMA) .rows(ROWS1) @@ -721,7 +821,7 @@ public class CalciteTests final QueryableIndex someXDatasourceIndex = IndexBuilder .create() - .tmpDir(new File(tmpDir, "1")) + .tmpDir(new File(tmpDir, "7")) .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) .schema(INDEX_SCHEMA_WITH_X_COLUMNS) .rows(RAW_ROWS1_X) @@ -731,7 +831,7 @@ public class CalciteTests return new SpecificSegmentsQuerySegmentWalker( conglomerate, INJECTOR.getInstance(LookupExtractorFactoryContainerProvider.class), - null, + joinableFactory, scheduler ).add( DataSegment.builder() @@ -805,6 +905,15 @@ public class CalciteTests .size(0) .build(), someXDatasourceIndex + ).add( + DataSegment.builder() + .dataSource(BROADCAST_DATASOURCE) + .interval(indexNumericDims.getDataInterval()) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .size(0) + .build(), + indexNumericDims ); } @@ -979,8 +1088,15 @@ public class CalciteTests final DruidSchema schema = new DruidSchema( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), new TestServerInventoryView(walker.getSegments()), - new SegmentManager(EasyMock.createMock(SegmentLoader.class)), - new MapJoinableFactory(ImmutableMap.of()), + new SegmentManager(EasyMock.createMock(SegmentLoader.class)) + { + @Override + public Set getDataSourceNames() + { + return ImmutableSet.of(BROADCAST_DATASOURCE); + } + }, + createDefaultJoinableFactory(), plannerConfig, viewManager, TEST_AUTHENTICATOR_ESCALATOR diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java index 0490420cdd7..7fde642ca8c 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java @@ -40,10 +40,7 @@ import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.SegmentWrangler; -import org.apache.druid.segment.join.InlineJoinableFactory; import org.apache.druid.segment.join.JoinableFactory; -import org.apache.druid.segment.join.LookupJoinableFactory; -import org.apache.druid.segment.join.MapJoinableFactoryTest; import org.apache.druid.server.ClientQuerySegmentWalker; import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.QueryStackTests; @@ -91,12 +88,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C final JoinableFactory joinableFactoryToUse; if (joinableFactory == null) { - joinableFactoryToUse = MapJoinableFactoryTest.fromMap( - ImmutableMap., JoinableFactory>builder() - .put(InlineDataSource.class, new InlineJoinableFactory()) - .put(LookupDataSource.class, new LookupJoinableFactory(lookupProvider)) - .build() - ); + joinableFactoryToUse = QueryStackTests.makeJoinableFactoryForLookup(lookupProvider); } else { joinableFactoryToUse = joinableFactory; }