diff --git a/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java index bf0e8d4c444..666af0d2e61 100644 --- a/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java @@ -63,6 +63,7 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.TestIndex; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.timeline.SegmentId; @@ -79,6 +80,7 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -100,6 +102,13 @@ public class QueryRunnerTestHelper .collect(Collectors.toList()) ); + public static final DataSource UNNEST_DATA_SOURCE = UnnestDataSource.create( + new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE), + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, + null + ); + public static final Granularity DAY_GRAN = Granularities.DAY; public static final Granularity ALL_GRAN = Granularities.ALL; public static final Granularity MONTH_GRAN = Granularities.MONTH; @@ -109,6 +118,7 @@ public class QueryRunnerTestHelper public static final String PLACEMENT_DIMENSION = "placement"; public static final String PLACEMENTISH_DIMENSION = "placementish"; public static final String PARTIAL_NULL_DIMENSION = "partial_null_column"; + public static final String PLACEMENTISH_DIMENSION_UNNEST = "placementish_unnest"; public static final List DIMENSIONS = Lists.newArrayList( MARKET_DIMENSION, @@ -353,6 +363,7 @@ public class QueryRunnerTestHelper return !("rtIndex".equals(runnerName) || "noRollupRtIndex".equals(runnerName)); } + public static > List> makeQueryRunners( QueryRunnerFactory factory ) @@ -395,6 +406,7 @@ public class QueryRunnerTestHelper ); } + public static > QueryRunner makeQueryRunner( QueryRunnerFactory factory, String resourceFileName, @@ -467,6 +479,22 @@ public class QueryRunnerTestHelper }; } + + public static > QueryRunner makeQueryRunnerWithSegmentMapFn( + QueryRunnerFactory factory, + Segment adapter, + Query query, + final String runnerName + ) + { + final DataSource base = query.getDataSource(); + + final SegmentReference segmentReference = base.createSegmentMapFunction(query, new AtomicLong()) + .apply(ReferenceCountingSegment.wrapRootGenerationSegment( + adapter)); + return makeQueryRunner(factory, segmentReference, runnerName); + } + public static QueryRunner makeFilteringQueryRunner( final VersionedIntervalTimeline timeline, final QueryRunnerFactory> factory diff --git a/processing/src/test/java/org/apache/druid/query/groupby/UnnestGroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/UnnestGroupByQueryRunnerTest.java new file mode 100644 index 00000000000..cc2a722d460 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/groupby/UnnestGroupByQueryRunnerTest.java @@ -0,0 +1,795 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.groupby; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.DruidProcessingConfig; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.QueryRunnerTestHelper; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.UnnestDataSource; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.ExtractionDimensionSpec; +import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.query.extraction.StringFormatExtractionFn; +import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; +import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; +import org.apache.druid.query.groupby.strategy.GroupByStrategyV1; +import org.apache.druid.query.groupby.strategy.GroupByStrategyV2; +import org.apache.druid.segment.IncrementalIndexSegment; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.TestIndex; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.incremental.IncrementalIndex; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.joda.time.DateTime; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + + +@RunWith(Parameterized.class) +public class UnnestGroupByQueryRunnerTest extends InitializedNullHandlingTest +{ + private static TestGroupByBuffers BUFFER_POOLS = null; + + private final GroupByQueryRunnerFactory factory; + private final GroupByQueryConfig config; + private final boolean vectorize; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + public UnnestGroupByQueryRunnerTest( + GroupByQueryConfig config, + GroupByQueryRunnerFactory factory, + boolean vectorize + ) + { + this.config = config; + this.factory = factory; + this.vectorize = vectorize; + } + + public static List testConfigs() + { + + final GroupByQueryConfig v2Config = new GroupByQueryConfig() + { + @Override + public String getDefaultStrategy() + { + return GroupByStrategySelector.STRATEGY_V2; + } + + @Override + public int getBufferGrouperInitialBuckets() + { + // Small initial table to force some growing. + return 4; + } + + @Override + public String toString() + { + return "v2"; + } + }; + + return ImmutableList.of( + v2Config + ); + } + + public static GroupByQueryRunnerFactory makeQueryRunnerFactory( + final GroupByQueryConfig config, + final TestGroupByBuffers bufferPools + ) + { + return makeQueryRunnerFactory( + GroupByQueryRunnerTest.DEFAULT_MAPPER, + config, + bufferPools, + GroupByQueryRunnerTest.DEFAULT_PROCESSING_CONFIG + ); + } + + public static GroupByQueryRunnerFactory makeQueryRunnerFactory( + final ObjectMapper mapper, + final GroupByQueryConfig config, + final TestGroupByBuffers bufferPools + ) + { + return makeQueryRunnerFactory(mapper, config, bufferPools, GroupByQueryRunnerTest.DEFAULT_PROCESSING_CONFIG); + } + + public static GroupByQueryRunnerFactory makeQueryRunnerFactory( + final ObjectMapper mapper, + final GroupByQueryConfig config, + final TestGroupByBuffers bufferPools, + final DruidProcessingConfig processingConfig + ) + { + if (bufferPools.getBufferSize() != processingConfig.intermediateComputeSizeBytes()) { + throw new ISE( + "Provided buffer size [%,d] does not match configured size [%,d]", + bufferPools.getBufferSize(), + processingConfig.intermediateComputeSizeBytes() + ); + } + if (bufferPools.getNumMergeBuffers() != processingConfig.getNumMergeBuffers()) { + throw new ISE( + "Provided merge buffer count [%,d] does not match configured count [%,d]", + bufferPools.getNumMergeBuffers(), + processingConfig.getNumMergeBuffers() + ); + } + final Supplier configSupplier = Suppliers.ofInstance(config); + final GroupByStrategySelector strategySelector = new GroupByStrategySelector( + configSupplier, + new GroupByStrategyV1( + configSupplier, + new GroupByQueryEngine(configSupplier, bufferPools.getProcessingPool()), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ), + new GroupByStrategyV2( + processingConfig, + configSupplier, + bufferPools.getProcessingPool(), + bufferPools.getMergePool(), + TestHelper.makeJsonMapper(), + mapper, + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ) + ); + final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(strategySelector); + return new GroupByQueryRunnerFactory(strategySelector, toolChest); + } + + @Parameterized.Parameters(name = "{0}") + public static Collection constructorFeeder() + { + NullHandling.initializeForTests(); + setUpClass(); + + final List constructors = new ArrayList<>(); + for (GroupByQueryConfig config : testConfigs()) { + final GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(config, BUFFER_POOLS); + + for (boolean vectorize : ImmutableList.of(false)) { + // Add vectorization tests for any indexes that support it. + if (!vectorize || + config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { + constructors.add(new Object[]{config, factory, vectorize}); + } + } + + } + + return constructors; + } + + @BeforeClass + public static void setUpClass() + { + if (BUFFER_POOLS == null) { + BUFFER_POOLS = TestGroupByBuffers.createDefault(); + } + } + + @AfterClass + public static void tearDownClass() + { + BUFFER_POOLS.close(); + BUFFER_POOLS = null; + } + + private static ResultRow makeRow(final GroupByQuery query, final String timestamp, final Object... vals) + { + return GroupByQueryRunnerTestHelper.createExpectedRow(query, timestamp, vals); + } + + private static ResultRow makeRow(final GroupByQuery query, final DateTime timestamp, final Object... vals) + { + return GroupByQueryRunnerTestHelper.createExpectedRow(query, timestamp, vals); + } + + private static List makeRows( + final GroupByQuery query, + final String[] columnNames, + final Object[]... values + ) + { + return GroupByQueryRunnerTestHelper.createExpectedRows(query, columnNames, values); + } + + @Test + public void testGroupBy() + { + GroupByQuery query = makeQueryBuilder() + .setDataSource(UnnestDataSource.create( + new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE), + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, + null + )) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs( + QueryRunnerTestHelper.ROWS_COUNT, + new LongSumAggregatorFactory("idx", "index") + ) + .setGranularity(QueryRunnerTestHelper.DAY_GRAN) + .build(); + + List expectedResults = Arrays.asList( + makeRow( + query, + "2011-04-01", + "alias", + "automotive", + "rows", + 2L, + "idx", + 270L + ), + makeRow( + query, + "2011-04-01", + "alias", + "business", + "rows", + 2L, + "idx", + 236L + ), + makeRow( + query, + "2011-04-01", + "alias", + "entertainment", + "rows", + 2L, + "idx", + 316L + ), + makeRow( + query, + "2011-04-01", + "alias", + "health", + "rows", + 2L, + "idx", + 240L + ), + makeRow( + query, + "2011-04-01", + "alias", + "mezzanine", + "rows", + 6L, + "idx", + 5740L + ), + makeRow( + query, + "2011-04-01", + "alias", + "news", + "rows", + 2L, + "idx", + 242L + ), + makeRow( + query, + "2011-04-01", + "alias", + "premium", + "rows", + 6L, + "idx", + 5800L + ), + makeRow( + query, + "2011-04-01", + "alias", + "technology", + "rows", + 2L, + "idx", + 156L + ), + makeRow( + query, + "2011-04-01", + "alias", + "travel", + "rows", + 2L, + "idx", + 238L + ), + + makeRow( + query, + "2011-04-02", + "alias", + "automotive", + "rows", + 2L, + "idx", + 294L + ), + makeRow( + query, + "2011-04-02", + "alias", + "business", + "rows", + 2L, + "idx", + 224L + ), + makeRow( + query, + "2011-04-02", + "alias", + "entertainment", + "rows", + 2L, + "idx", + 332L + ), + makeRow( + query, + "2011-04-02", + "alias", + "health", + "rows", + 2L, + "idx", + 226L + ), + makeRow( + query, + "2011-04-02", + "alias", + "mezzanine", + "rows", + 6L, + "idx", + 4894L + ), + makeRow( + query, + "2011-04-02", + "alias", + "news", + "rows", + 2L, + "idx", + 228L + ), + makeRow( + query, + "2011-04-02", + "alias", + "premium", + "rows", + 6L, + "idx", + 5010L + ), + makeRow( + query, + "2011-04-02", + "alias", + "technology", + "rows", + 2L, + "idx", + 194L + ), + makeRow( + query, + "2011-04-02", + "alias", + "travel", + "rows", + 2L, + "idx", + 252L + ) + ); + final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(); + final QueryRunner queryRunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn( + factory, + new IncrementalIndexSegment( + rtIndex, + QueryRunnerTestHelper.SEGMENT_ID + ), + query, + "rtIndexvc" + ); + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, queryRunner, query); + TestHelper.assertExpectedObjects(expectedResults, results, "groupBy"); + } + + @Test + public void testGroupByOnMissingColumn() + { + // Cannot vectorize due to extraction dimension spec. + cannotVectorize(); + + GroupByQuery query = makeQueryBuilder() + .setDataSource(UnnestDataSource.create( + new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE), + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, + null + )) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimensions( + new DefaultDimensionSpec("nonexistent0", "alias0"), + new ExtractionDimensionSpec("nonexistent1", "alias1", new StringFormatExtractionFn("foo")) + ).setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT) + .setGranularity(QueryRunnerTestHelper.ALL_GRAN) + .build(); + + List expectedResults = Collections.singletonList( + makeRow( + query, + "2011-04-01", + "alias0", null, + "alias1", "foo", + "rows", 52L + ) + ); + final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(); + final QueryRunner queryRunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn( + factory, + new IncrementalIndexSegment( + rtIndex, + QueryRunnerTestHelper.SEGMENT_ID + ), + query, + "rtIndexvc" + ); + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, queryRunner, query); + TestHelper.assertExpectedObjects(expectedResults, results, "missing-column"); + } + + @Test + public void testGroupByOnUnnestedColumn() + { + cannotVectorize(); + + GroupByQuery query = makeQueryBuilder() + .setDataSource(QueryRunnerTestHelper.UNNEST_DATA_SOURCE) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimensions( + new DefaultDimensionSpec(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "alias0") + ).setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT) + .setGranularity(QueryRunnerTestHelper.ALL_GRAN) + .build(); + + // Total rows should add up to 26 * 2 = 52 + // 26 rows and each has 2 entries in the column to be unnested + List expectedResults = Arrays.asList( + makeRow( + query, + "2011-04-01", + "alias0", "a", + "rows", 2L + ), + makeRow( + query, + "2011-04-01", + "alias0", "preferred", + "rows", 26L + ), + makeRow( + query, + "2011-04-01", + "alias0", "b", + "rows", 2L + ), + makeRow( + query, + "2011-04-01", + "alias0", "e", + "rows", 2L + ), + makeRow( + query, + "2011-04-01", + "alias0", "h", + "rows", 2L + ), + makeRow( + query, + "2011-04-01", + "alias0", "m", + "rows", 6L + ), + makeRow( + query, + "2011-04-01", + "alias0", "n", + "rows", 2L + ), + makeRow( + query, + "2011-04-01", + "alias0", "p", + "rows", 6L + ), + makeRow( + query, + "2011-04-01", + "alias0", "t", + "rows", 4L + ) + ); + final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(); + final QueryRunner queryRunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn( + factory, + new IncrementalIndexSegment( + rtIndex, + QueryRunnerTestHelper.SEGMENT_ID + ), + query, + "rtIndexvc" + ); + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, queryRunner, query); + TestHelper.assertExpectedObjects(expectedResults, results, "groupBy-on-unnested-column"); + } + + @Test + public void testGroupByOnUnnestedVirtualColumn() + { + cannotVectorize(); + + final DataSource unnestDataSource = UnnestDataSource.create( + new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE), + "vc", + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, + null + ); + + GroupByQuery query = makeQueryBuilder() + .setDataSource(unnestDataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimensions( + new DefaultDimensionSpec(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "alias0") + ).setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT) + .setGranularity(QueryRunnerTestHelper.ALL_GRAN) + .setVirtualColumns( + new ExpressionVirtualColumn( + "vc", + "mv_to_array(placementish)", + ColumnType.STRING_ARRAY, + TestExprMacroTable.INSTANCE + ) + ) + .addOrderByColumn("alias0", OrderByColumnSpec.Direction.ASCENDING) + .build(); + + // Total rows should add up to 26 * 2 = 52 + // 26 rows and each has 2 entries in the column to be unnested + List expectedResults = Arrays.asList( + makeRow( + query, + "2011-04-01", + "alias0", "preferred", + "rows", 26L + ), + makeRow( + query, + "2011-04-01", + "alias0", "e", + "rows", 2L + ), + makeRow( + query, + "2011-04-01", + "alias0", "b", + "rows", 2L + ), + makeRow( + query, + "2011-04-01", + "alias0", "h", + "rows", 2L + ), + makeRow( + query, + "2011-04-01", + "alias0", "a", + "rows", 2L + ), + makeRow( + query, + "2011-04-01", + "alias0", "m", + "rows", 6L + ), + makeRow( + query, + "2011-04-01", + "alias0", "n", + "rows", 2L + ), + makeRow( + query, + "2011-04-01", + "alias0", "p", + "rows", 6L + ), + makeRow( + query, + "2011-04-01", + "alias0", "t", + "rows", 4L + ) + ); + + final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(); + final QueryRunner queryRunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn( + factory, + new IncrementalIndexSegment( + rtIndex, + QueryRunnerTestHelper.SEGMENT_ID + ), + query, + "rtIndexvc" + ); + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, queryRunner, query); + + TestHelper.assertExpectedObjects(expectedResults, results, "groupBy-on-unnested-virtual-column"); + } + + @Test + public void testGroupByOnUnnestedVirtualMultiColumn() + { + cannotVectorize(); + + final DataSource unnestDataSource = UnnestDataSource.create( + new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE), + "vc", + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, + null + ); + + GroupByQuery query = makeQueryBuilder() + .setDataSource(unnestDataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimensions( + new DefaultDimensionSpec(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "alias0") + ).setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT) + .setGranularity(QueryRunnerTestHelper.ALL_GRAN) + .setVirtualColumns( + new ExpressionVirtualColumn( + "vc", + "array(\"market\",\"quality\")", + ColumnType.STRING, + TestExprMacroTable.INSTANCE + ) + ) + .setLimit(3) + .build(); + + // Total rows should add up to 26 * 2 = 52 + // 26 rows and each has 2 entries in the column to be unnested + List expectedResults = Arrays.asList( + makeRow( + query, + "2011-04-01", + "alias0", "business", + "rows", 2L + ), + makeRow( + query, + "2011-04-01", + "alias0", "health", + "rows", 2L + ), + makeRow( + query, + "2011-04-01", + "alias0", "travel", + "rows", 2L + ) + ); + + final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(); + final QueryRunner queryRunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn( + factory, + new IncrementalIndexSegment( + rtIndex, + QueryRunnerTestHelper.SEGMENT_ID + ), + query, + "rtIndexvc" + ); + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, queryRunner, query); + TestHelper.assertExpectedObjects(expectedResults, results, "groupBy-on-unnested-virtual-columns"); + } + + /** + * Use this method instead of makeQueryBuilder() to make sure the context is set properly. Also, avoid + * setContext in tests. Only use overrideContext. + */ + private GroupByQuery.Builder makeQueryBuilder() + { + return GroupByQuery.builder().overrideContext(makeContext()); + } + + /** + * Use this method instead of makeQueryBuilder() to make sure the context is set properly. Also, avoid + * setContext in tests. Only use overrideContext. + */ + private GroupByQuery.Builder makeQueryBuilder(final GroupByQuery query) + { + return new GroupByQuery.Builder(query).overrideContext(makeContext()); + } + + private Map makeContext() + { + return ImmutableMap.builder() + .put(QueryContexts.VECTORIZE_KEY, vectorize ? "force" : "false") + .put(QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize ? "force" : "false") + .put("vectorSize", 16) // Small vector size to ensure we use more than one. + .build(); + } + + private void cannotVectorize() + { + if (vectorize && config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { + expectedException.expect(RuntimeException.class); + expectedException.expectMessage("Cannot vectorize!"); + } + } +} diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java index 4840a1ccfd1..b003f9580e8 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java @@ -284,6 +284,7 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest null, QueryRunnerTestHelper.INDEX_METRIC + ":DOUBLE" }, + legacy, V_0112_0114 ); @@ -334,6 +335,7 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest null, QueryRunnerTestHelper.INDEX_METRIC + ":DOUBLE" }, + legacy, V_0112_0114 ), legacy ? Lists.newArrayList(getTimestampName(), "market", "index") : Lists.newArrayList("market", "index"), @@ -371,6 +373,7 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest null, QueryRunnerTestHelper.INDEX_METRIC + ":DOUBLE" }, + legacy, V_0112_0114 ), legacy ? Lists.newArrayList(getTimestampName(), "market", "index") : Lists.newArrayList("market", "index"), @@ -403,6 +406,7 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest null, QueryRunnerTestHelper.INDEX_METRIC + ":DOUBLE" }, + legacy, // filtered values with day granularity new String[]{ "2011-01-12T00:00:00.000Z\tspot\tautomotive\tpreferred\tapreferred\t100.000000", @@ -466,6 +470,7 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest null, QueryRunnerTestHelper.INDEX_METRIC + ":DOUBLE" }, + legacy, // filtered values with day granularity new String[]{ "2011-01-12T00:00:00.000Z\ttotal_market\tmezzanine\tpreferred\tmpreferred\t1000.000000", @@ -529,6 +534,7 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest final List>> events = toEvents( legacy ? new String[]{getTimestampName() + ":TIME"} : new String[0], + legacy, V_0112_0114 ); @@ -592,6 +598,7 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest null, QueryRunnerTestHelper.INDEX_METRIC + ":DOUBLE" }, + legacy, (String[]) ArrayUtils.addAll(seg1Results, seg2Results) ); @@ -681,6 +688,7 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest null, QueryRunnerTestHelper.INDEX_METRIC + ":DOUBLE" }, + legacy, expectedRet ); if (legacy) { @@ -769,6 +777,7 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest null, QueryRunnerTestHelper.INDEX_METRIC + ":DOUBLE" }, + legacy, (String[]) ArrayUtils.addAll(seg1Results, seg2Results) ); if (legacy) { @@ -861,6 +870,7 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest null, QueryRunnerTestHelper.INDEX_METRIC + ":DOUBLE" }, + legacy, expectedRet //segments in reverse order from above ); if (legacy) { @@ -1008,11 +1018,12 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest "indexMaxFloat", "quality_uniques" }, + legacy, valueSet ); } - private List>> toEvents(final String[] dimSpecs, final String[]... valueSet) + public static List>> toEvents(final String[] dimSpecs, boolean legacy, final String[]... valueSet) { List values = new ArrayList<>(); for (String[] vSet : valueSet) { @@ -1074,13 +1085,30 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest if (specs.length == 1 || specs[1].equals("STRING")) { eventVal = values1[i]; } else if (specs[1].equals("TIME")) { - eventVal = toTimestamp(values1[i]); + eventVal = toTimestamp(values1[i], legacy); } else if (specs[1].equals("FLOAT")) { - eventVal = values1[i].isEmpty() ? NullHandling.defaultFloatValue() : Float.valueOf(values1[i]); + try { + eventVal = values1[i].isEmpty() ? NullHandling.defaultFloatValue() : Float.valueOf(values1[i]); + } + catch (NumberFormatException nfe) { + throw new ISE("This object cannot be converted to a Float!"); + } } else if (specs[1].equals("DOUBLE")) { - eventVal = values1[i].isEmpty() ? NullHandling.defaultDoubleValue() : Double.valueOf(values1[i]); + try { + eventVal = values1[i].isEmpty() + ? NullHandling.defaultDoubleValue() + : Double.valueOf(values1[i]); + } + catch (NumberFormatException nfe) { + throw new ISE("This object cannot be converted to a Double!"); + } } else if (specs[1].equals("LONG")) { - eventVal = values1[i].isEmpty() ? NullHandling.defaultLongValue() : Long.valueOf(values1[i]); + try { + eventVal = values1[i].isEmpty() ? NullHandling.defaultLongValue() : Long.valueOf(values1[i]); + } + catch (NumberFormatException nfe) { + throw new ISE("This object cannot be converted to a Long!"); + } } else if (specs[1].equals(("NULL"))) { eventVal = null; } else if (specs[1].equals("STRINGS")) { @@ -1099,7 +1127,7 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest return events; } - private Object toTimestamp(final String value) + private static Object toTimestamp(final String value, boolean legacy) { if (legacy) { return DateTimes.of(value); diff --git a/processing/src/test/java/org/apache/druid/query/scan/UnnestScanQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/scan/UnnestScanQueryRunnerTest.java new file mode 100644 index 00000000000..4de22cb0061 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/scan/UnnestScanQueryRunnerTest.java @@ -0,0 +1,551 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.scan; + +import com.google.common.collect.Lists; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.query.DefaultGenericQueryMetricsFactory; +import org.apache.druid.query.Druids; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.QueryRunnerTestHelper; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.UnnestDataSource; +import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.segment.IncrementalIndexSegment; +import org.apache.druid.segment.TestIndex; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.incremental.IncrementalIndex; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.joda.time.DateTime; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; + + +@RunWith(Parameterized.class) +public class UnnestScanQueryRunnerTest extends InitializedNullHandlingTest +{ + public static final QuerySegmentSpec I_0112_0114 = ScanQueryRunnerTest.I_0112_0114; + private static final ScanQueryQueryToolChest TOOL_CHEST = new ScanQueryQueryToolChest( + new ScanQueryConfig(), + DefaultGenericQueryMetricsFactory.instance() + ); + private static final ScanQueryRunnerFactory FACTORY = new ScanQueryRunnerFactory( + TOOL_CHEST, + new ScanQueryEngine(), + new ScanQueryConfig() + ); + private final IncrementalIndex index; + private final boolean legacy; + + public UnnestScanQueryRunnerTest(final IncrementalIndex index, final boolean legacy) + { + this.index = index; + this.legacy = legacy; + } + + @Parameterized.Parameters(name = "{0}") + public static Iterable constructorFeeder() + { + NullHandling.initializeForTests(); + final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(); + final List constructors = new ArrayList<>(); + constructors.add(new Object[]{rtIndex, true}); + constructors.add(new Object[]{rtIndex, false}); + return constructors; + } + + private Druids.ScanQueryBuilder newTestUnnestQuery() + { + return Druids.newScanQueryBuilder() + .dataSource(QueryRunnerTestHelper.UNNEST_DATA_SOURCE) + .columns(Collections.emptyList()) + .eternityInterval() + .limit(3) + .legacy(legacy); + } + + private Druids.ScanQueryBuilder newTestUnnestQueryWithAllowSet() + { + List allowList = Arrays.asList("a", "b", "c"); + LinkedHashSet allowSet = new LinkedHashSet(allowList); + return Druids.newScanQueryBuilder() + .dataSource(UnnestDataSource.create( + new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE), + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, + allowSet + )) + .columns(Collections.emptyList()) + .eternityInterval() + .limit(3) + .legacy(legacy); + } + + @Test + public void testScanOnUnnest() + { + ScanQuery query = newTestUnnestQuery() + .intervals(I_0112_0114) + .columns(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST) + .limit(3) + .build(); + + final QueryRunner queryRunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn( + FACTORY, + new IncrementalIndexSegment( + index, + QueryRunnerTestHelper.SEGMENT_ID + ), + query, + "rtIndexvc" + ); + + Iterable results = queryRunner.run(QueryPlus.wrap(query)).toList(); + String[] columnNames; + if (legacy) { + columnNames = new String[]{ + getTimestampName() + ":TIME", + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST + }; + } else { + columnNames = new String[]{ + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST + }; + } + String[] values; + if (legacy) { + values = new String[]{ + "2011-01-12T00:00:00.000Z\ta", + "2011-01-12T00:00:00.000Z\tpreferred", + "2011-01-12T00:00:00.000Z\tb" + }; + } else { + values = new String[]{ + "a", + "preferred", + "b" + }; + } + + final List>> events = ScanQueryRunnerTest.toEvents(columnNames, legacy, values); + List expectedResults = toExpected( + events, + legacy + ? Lists.newArrayList(getTimestampName(), QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST) + : Collections.singletonList(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST), + 0, + 3 + ); + ScanQueryRunnerTest.verify(expectedResults, results); + } + + @Test + public void testUnnestRunnerVirtualColumnsUsingSingleColumn() + { + ScanQuery query = + Druids.newScanQueryBuilder() + .intervals(I_0112_0114) + .dataSource(UnnestDataSource.create( + new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE), + "vc", + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, + null + )) + .columns(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST) + .eternityInterval() + .legacy(legacy) + .virtualColumns( + new ExpressionVirtualColumn( + "vc", + "mv_to_array(placementish)", + ColumnType.STRING, + TestExprMacroTable.INSTANCE + ) + ) + .limit(3) + .build(); + + QueryRunner vcrunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn( + FACTORY, + new IncrementalIndexSegment( + index, + QueryRunnerTestHelper.SEGMENT_ID + ), + query, + "rtIndexvc" + ); + Iterable results = vcrunner.run(QueryPlus.wrap(query)).toList(); + String[] columnNames; + if (legacy) { + columnNames = new String[]{ + getTimestampName() + ":TIME", + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST + }; + } else { + columnNames = new String[]{ + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST + }; + } + String[] values; + if (legacy) { + values = new String[]{ + "2011-01-12T00:00:00.000Z\ta", + "2011-01-12T00:00:00.000Z\tpreferred", + "2011-01-12T00:00:00.000Z\tb" + }; + } else { + values = new String[]{ + "a", + "preferred", + "b" + }; + } + + final List>> events = ScanQueryRunnerTest.toEvents(columnNames, legacy, values); + List expectedResults = toExpected( + events, + legacy + ? Lists.newArrayList(getTimestampName(), QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST) + : Collections.singletonList(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST), + 0, + 3 + ); + ScanQueryRunnerTest.verify(expectedResults, results); + } + + @Test + public void testUnnestRunnerVirtualColumnsUsingMultipleColumn() + { + ScanQuery query = + Druids.newScanQueryBuilder() + .intervals(I_0112_0114) + .dataSource(UnnestDataSource.create( + new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE), + "vc", + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, + null + )) + .columns(QueryRunnerTestHelper.MARKET_DIMENSION, QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST) + .eternityInterval() + .legacy(legacy) + .virtualColumns( + new ExpressionVirtualColumn( + "vc", + "array(\"market\",\"quality\")", + ColumnType.STRING, + TestExprMacroTable.INSTANCE + ) + ) + .limit(4) + .build(); + + QueryRunner vcrunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn( + FACTORY, + new IncrementalIndexSegment( + index, + QueryRunnerTestHelper.SEGMENT_ID + ), + query, + "rtIndexvc" + ); + + Iterable results = vcrunner.run(QueryPlus.wrap(query)).toList(); + String[] columnNames; + if (legacy) { + columnNames = new String[]{ + getTimestampName() + ":TIME", + QueryRunnerTestHelper.MARKET_DIMENSION, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST + }; + } else { + columnNames = new String[]{ + QueryRunnerTestHelper.MARKET_DIMENSION, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST + }; + } + String[] values; + if (legacy) { + values = new String[]{ + "2011-01-12T00:00:00.000Z\tspot\tspot", + "2011-01-12T00:00:00.000Z\tspot\tautomotive", + "2011-01-12T00:00:00.000Z\tspot\tspot", + "2011-01-12T00:00:00.000Z\tspot\tbusiness", + }; + } else { + values = new String[]{ + "spot\tspot", + "spot\tautomotive", + "spot\tspot", + "spot\tbusiness" + }; + } + + final List>> events = ScanQueryRunnerTest.toEvents(columnNames, legacy, values); + List expectedResults = toExpected( + events, + legacy + ? Lists.newArrayList( + getTimestampName(), + QueryRunnerTestHelper.MARKET_DIMENSION, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST + ) + : Lists.newArrayList( + QueryRunnerTestHelper.MARKET_DIMENSION, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST + ), + 0, + 4 + ); + ScanQueryRunnerTest.verify(expectedResults, results); + } + + @Test + public void testUnnestRunnerWithFilter() + { + ScanQuery query = newTestUnnestQuery() + .intervals(I_0112_0114) + .columns(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST) + .limit(3) + .filters(new SelectorDimFilter(QueryRunnerTestHelper.MARKET_DIMENSION, "spot", null)) + .build(); + + final QueryRunner queryRunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn( + FACTORY, + new IncrementalIndexSegment( + index, + QueryRunnerTestHelper.SEGMENT_ID + ), + query, + "rtIndexvc" + ); + + Iterable results = queryRunner.run(QueryPlus.wrap(query)).toList(); + String[] columnNames; + if (legacy) { + columnNames = new String[]{ + getTimestampName() + ":TIME", + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST + }; + } else { + columnNames = new String[]{ + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST + }; + } + String[] values; + if (legacy) { + values = new String[]{ + "2011-01-12T00:00:00.000Z\ta", + "2011-01-12T00:00:00.000Z\tpreferred", + "2011-01-12T00:00:00.000Z\tb" + }; + } else { + values = new String[]{ + "a", + "preferred", + "b" + }; + } + + final List>> events = ScanQueryRunnerTest.toEvents(columnNames, legacy, values); + List expectedResults = toExpected( + events, + legacy + ? Lists.newArrayList(getTimestampName(), QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST) + : Collections.singletonList(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST), + 0, + 3 + ); + ScanQueryRunnerTest.verify(expectedResults, results); + } + + @Test + public void testUnnestRunnerWithOrdering() + { + ScanQuery query = newTestUnnestQuery() + .intervals(I_0112_0114) + .columns(QueryRunnerTestHelper.TIME_DIMENSION, QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST) + .limit(3) + .filters(new SelectorDimFilter(QueryRunnerTestHelper.MARKET_DIMENSION, "spot", null)) + .order(ScanQuery.Order.ASCENDING) + .build(); + + + final QueryRunner queryRunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn( + FACTORY, + new IncrementalIndexSegment( + index, + QueryRunnerTestHelper.SEGMENT_ID + ), + query, + "rtIndexvc" + ); + + Iterable results = queryRunner.run(QueryPlus.wrap(query)).toList(); + String[] columnNames; + if (legacy) { + columnNames = new String[]{ + getTimestampName() + ":TIME", + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST + }; + } else { + columnNames = new String[]{ + ColumnHolder.TIME_COLUMN_NAME, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST + }; + } + String[] values; + values = new String[]{ + "2011-01-12T00:00:00.000Z\ta", + "2011-01-12T00:00:00.000Z\tpreferred", + "2011-01-12T00:00:00.000Z\tb" + }; + + final List>> ascendingEvents = ScanQueryRunnerTest.toEvents(columnNames, legacy, values); + if (legacy) { + for (List> batch : ascendingEvents) { + for (Map event : batch) { + event.put("__time", ((DateTime) event.get("timestamp")).getMillis()); + } + } + } else { + for (List> batch : ascendingEvents) { + for (Map event : batch) { + event.put("__time", (DateTimes.of((String) event.get("__time"))).getMillis()); + } + } + } + List ascendingExpectedResults = toExpected( + ascendingEvents, + legacy ? + Lists.newArrayList( + QueryRunnerTestHelper.TIME_DIMENSION, + getTimestampName(), + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST + ) : + Lists.newArrayList( + QueryRunnerTestHelper.TIME_DIMENSION, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST + ), + 0, + 3 + ); + + ScanQueryRunnerTest.verify(ascendingExpectedResults, results); + } + + @Test + public void testUnnestRunnerNonNullAllowSet() + { + ScanQuery query = newTestUnnestQueryWithAllowSet() + .intervals(I_0112_0114) + .columns(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST) + .limit(3) + .build(); + + final QueryRunner queryRunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn( + FACTORY, + new IncrementalIndexSegment( + index, + QueryRunnerTestHelper.SEGMENT_ID + ), + query, + "rtIndexvc" + ); + + Iterable results = queryRunner.run(QueryPlus.wrap(query)).toList(); + + String[] columnNames; + if (legacy) { + columnNames = new String[]{ + getTimestampName() + ":TIME", + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST + }; + } else { + columnNames = new String[]{ + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST + }; + } + String[] values; + if (legacy) { + values = new String[]{ + "2011-01-12T00:00:00.000Z\ta", + "2011-01-12T00:00:00.000Z\tb", + "2011-01-13T00:00:00.000Z\ta" + }; + } else { + values = new String[]{ + "a", + "b", + "a" + }; + } + + final List>> events = ScanQueryRunnerTest.toEvents(columnNames, legacy, values); + List expectedResults = toExpected( + events, + legacy + ? Lists.newArrayList(getTimestampName(), QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST) + : Collections.singletonList(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST), + 0, + 3 + ); + ScanQueryRunnerTest.verify(expectedResults, results); + } + + + private String getTimestampName() + { + return legacy ? "timestamp" : ColumnHolder.TIME_COLUMN_NAME; + } + + private List toExpected( + List>> targets, + List columns, + final int offset, + final int limit + ) + { + List expected = Lists.newArrayListWithExpectedSize(targets.size()); + for (List> group : targets) { + List> events = Lists.newArrayListWithExpectedSize(limit); + int end = Math.min(group.size(), offset + limit); + if (end == 0) { + end = group.size(); + } + events.addAll(group.subList(offset, end)); + expected.add(new ScanResultValue(QueryRunnerTestHelper.SEGMENT_ID.toString(), columns, events)); + } + return expected; + } +} diff --git a/processing/src/test/java/org/apache/druid/query/topn/UnnestTopNQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/topn/UnnestTopNQueryRunnerTest.java new file mode 100644 index 00000000000..cfb50d06823 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/topn/UnnestTopNQueryRunnerTest.java @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.topn; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.apache.druid.collections.CloseableStupidPool; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.FinalizeResultsQueryRunner; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.QueryRunnerTestHelper; +import org.apache.druid.query.Result; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.TestQueryRunners; +import org.apache.druid.query.UnnestDataSource; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory; +import org.apache.druid.query.aggregation.DoubleMinAggregatorFactory; +import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory; +import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.query.ordering.StringComparators; +import org.apache.druid.segment.IncrementalIndexSegment; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.TestIndex; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.incremental.IncrementalIndex; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + + +/** + * + */ +@RunWith(Parameterized.class) +public class UnnestTopNQueryRunnerTest extends InitializedNullHandlingTest +{ + private static final Closer RESOURCE_CLOSER = Closer.create(); + private final List commonAggregators; + + + public UnnestTopNQueryRunnerTest( + List commonAggregators + ) + { + this.commonAggregators = commonAggregators; + } + + @AfterClass + public static void teardown() throws IOException + { + RESOURCE_CLOSER.close(); + } + + @Parameterized.Parameters(name = "{0}") + public static Iterable constructorFeeder() + { + List constructors = new ArrayList<>(); + constructors.add(new Object[]{QueryRunnerTestHelper.COMMON_FLOAT_AGGREGATORS}); + return constructors; + } + + private Sequence> assertExpectedResultsWithCustomRunner( + Iterable> expectedResults, + TopNQuery query, + QueryRunner runner + ) + { + final Sequence> retval = runWithMerge(query, runner); + TestHelper.assertExpectedResults(expectedResults, retval); + return retval; + } + + private Sequence> runWithMerge(TopNQuery query, QueryRunner runner) + { + return runWithMerge(query, ResponseContext.createEmpty(), runner); + } + + + private Sequence> runWithMerge(TopNQuery query, ResponseContext context, QueryRunner runner1) + { + final TopNQueryQueryToolChest chest = new TopNQueryQueryToolChest(new TopNQueryConfig()); + final QueryRunner> mergeRunner = new FinalizeResultsQueryRunner( + chest.mergeResults(runner1), + chest + ); + return mergeRunner.run(QueryPlus.wrap(query), context); + } + + @Test + public void testEmptyTopN() + { + final CloseableStupidPool defaultPool = TestQueryRunners.createDefaultNonBlockingPool(); + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.UNNEST_DATA_SOURCE) + .granularity(QueryRunnerTestHelper.ALL_GRAN) + .dimension(QueryRunnerTestHelper.MARKET_DIMENSION) + .metric(QueryRunnerTestHelper.INDEX_METRIC) + .threshold(4) + .intervals(QueryRunnerTestHelper.EMPTY_INTERVAL) + .aggregators( + Lists.newArrayList( + Iterables.concat( + commonAggregators, + Lists.newArrayList( + new DoubleMaxAggregatorFactory("maxIndex", "index"), + new DoubleMinAggregatorFactory("minIndex", "index"), + new DoubleFirstAggregatorFactory("first", "index", null) + ) + ) + ) + ) + .postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT) + .build(); + + final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(); + + QueryRunner queryRunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn( + new TopNQueryRunnerFactory( + defaultPool, + new TopNQueryQueryToolChest(new TopNQueryConfig()), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ), + new IncrementalIndexSegment( + rtIndex, + QueryRunnerTestHelper.SEGMENT_ID + ), + query, + "rtIndexvc" + ); + + List> expectedResults = ImmutableList.of( + new Result<>( + DateTimes.of("2020-04-02T00:00:00.000Z"), + new TopNResultValue(ImmutableList.of()) + ) + ); + assertExpectedResultsWithCustomRunner(expectedResults, query, queryRunner); + } + + @Test + public void testTopNLexicographicUnnest() + { + final CloseableStupidPool defaultPool = TestQueryRunners.createDefaultNonBlockingPool(); + + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.UNNEST_DATA_SOURCE) + .granularity(QueryRunnerTestHelper.ALL_GRAN) + .dimension(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST) + .metric(new DimensionTopNMetricSpec("", StringComparators.LEXICOGRAPHIC)) + .threshold(4) + .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD) + .aggregators(commonAggregators) + .postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT) + .build(); + + final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(); + + QueryRunner queryRunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn( + new TopNQueryRunnerFactory( + defaultPool, + new TopNQueryQueryToolChest(new TopNQueryConfig()), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ), + new IncrementalIndexSegment( + rtIndex, + QueryRunnerTestHelper.SEGMENT_ID + ), + query, + "rtIndexvc" + ); + + List> expectedResults = Collections.singletonList( + new Result<>( + DateTimes.of("2011-04-01T00:00:00.000Z"), + new TopNResultValue( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "a", + "rows", 2L, + "index", 283.311029D, + "addRowsIndexConstant", 286.311029D, + "uniques", QueryRunnerTestHelper.UNIQUES_1 + ), + ImmutableMap.of( + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "b", + "rows", 2L, + "index", 231.557367D, + "addRowsIndexConstant", 234.557367D, + "uniques", QueryRunnerTestHelper.UNIQUES_1 + ), + ImmutableMap.of( + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "e", + "rows", 2L, + "index", 324.763273D, + "addRowsIndexConstant", 327.763273D, + "uniques", QueryRunnerTestHelper.UNIQUES_1 + ), + ImmutableMap.of( + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "h", + "rows", 2L, + "index", 233.580712D, + "addRowsIndexConstant", 236.580712D, + "uniques", QueryRunnerTestHelper.UNIQUES_1 + ) + ) + ) + ) + ); + assertExpectedResultsWithCustomRunner(expectedResults, query, queryRunner); + } + + @Test + public void testTopNStringVirtualColumnUnnest() + { + final CloseableStupidPool defaultPool = TestQueryRunners.createDefaultNonBlockingPool(); + + TopNQuery query = new TopNQueryBuilder() + .dataSource(UnnestDataSource.create( + new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE), + "vc", + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, + null + )) + .granularity(QueryRunnerTestHelper.ALL_GRAN) + .virtualColumns( + new ExpressionVirtualColumn( + "vc", + "mv_to_array(\"placementish\")", + ColumnType.STRING_ARRAY, + TestExprMacroTable.INSTANCE + ) + ) + .dimension(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST) + .metric("rows") + .threshold(4) + .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD) + .aggregators(commonAggregators) + .postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT) + .build(); + final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(); + + QueryRunner vcrunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn( + new TopNQueryRunnerFactory( + defaultPool, + new TopNQueryQueryToolChest(new TopNQueryConfig()), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ), + new IncrementalIndexSegment( + rtIndex, + QueryRunnerTestHelper.SEGMENT_ID + ), + query, + "rtIndexvc" + ); + List> expectedResults = Collections.singletonList( + new Result<>( + DateTimes.of("2011-04-01T00:00:00.000Z"), + new TopNResultValue( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "preferred", + "rows", 26L, + "index", 12459.361287D, + "addRowsIndexConstant", 12486.361287D, + "uniques", QueryRunnerTestHelper.UNIQUES_9 + ), + ImmutableMap.of( + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "m", + "rows", 6L, + "index", 5320.717303D, + "addRowsIndexConstant", 5327.717303D, + "uniques", QueryRunnerTestHelper.UNIQUES_1 + ), + ImmutableMap.of( + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "p", + "rows", 6L, + "index", 5407.213795D, + "addRowsIndexConstant", 5414.213795D, + "uniques", QueryRunnerTestHelper.UNIQUES_1 + ), + ImmutableMap.of( + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "t", + "rows", 4L, + "index", 422.344086D, + "addRowsIndexConstant", 427.344086D, + "uniques", QueryRunnerTestHelper.UNIQUES_2 + ) + ) + ) + ) + ); + assertExpectedResultsWithCustomRunner(expectedResults, query, vcrunner); + } + + @Test + public void testTopNStringVirtualMultiColumnUnnest() + { + final CloseableStupidPool defaultPool = TestQueryRunners.createDefaultNonBlockingPool(); + final CloseableStupidPool customPool = new CloseableStupidPool<>( + "TopNQueryRunnerFactory-bufferPool", + () -> ByteBuffer.allocate(20000) + ); + + TopNQuery query = new TopNQueryBuilder() + .dataSource(UnnestDataSource.create( + new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE), + "vc", + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, + null + )) + .granularity(QueryRunnerTestHelper.ALL_GRAN) + .virtualColumns( + new ExpressionVirtualColumn( + "vc", + "array(\"market\",\"quality\")", + ColumnType.STRING, + TestExprMacroTable.INSTANCE + ) + ) + .dimension(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST) + .metric("rows") + .threshold(2) + .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD) + .aggregators(commonAggregators) + .postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT) + .build(); + + final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(); + + QueryRunner vcrunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn( + new TopNQueryRunnerFactory( + defaultPool, + new TopNQueryQueryToolChest(new TopNQueryConfig()), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ), + new IncrementalIndexSegment( + rtIndex, + QueryRunnerTestHelper.SEGMENT_ID + ), + query, + "rtIndexvc" + ); + + List> expectedResults = Collections.singletonList( + new Result<>( + DateTimes.of("2011-04-01T00:00:00.000Z"), + new TopNResultValue( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "spot", + "rows", 18L, + "index", 2231.876812D, + "addRowsIndexConstant", 2250.876812D, + "uniques", QueryRunnerTestHelper.UNIQUES_9 + ), + ImmutableMap.of( + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "premium", + "rows", 6L, + "index", 5407.213795D, + "addRowsIndexConstant", 5414.213795D, + "uniques", QueryRunnerTestHelper.UNIQUES_1 + ) + ) + ) + ) + ); + assertExpectedResultsWithCustomRunner(expectedResults, query, vcrunner); + RESOURCE_CLOSER.register(() -> { + // Verify that all objects have been returned to the pool. + Assert.assertEquals("defaultPool objects created", defaultPool.poolSize(), defaultPool.objectsCreatedCount()); + Assert.assertEquals("customPool objects created", customPool.poolSize(), customPool.objectsCreatedCount()); + defaultPool.close(); + customPool.close(); + }); + } +} +