Additional native query tests for unnest datasource (#13554)

Native tests for the unnest datasource.
This commit is contained in:
somu-imply 2023-01-25 15:57:52 -08:00 committed by GitHub
parent 00cee329bd
commit 17c0167248
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 1821 additions and 6 deletions

View File

@ -63,6 +63,7 @@ import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.timeline.SegmentId;
@ -79,6 +80,7 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -100,6 +102,13 @@ public class QueryRunnerTestHelper
.collect(Collectors.toList())
);
public static final DataSource UNNEST_DATA_SOURCE = UnnestDataSource.create(
new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE),
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION,
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST,
null
);
public static final Granularity DAY_GRAN = Granularities.DAY;
public static final Granularity ALL_GRAN = Granularities.ALL;
public static final Granularity MONTH_GRAN = Granularities.MONTH;
@ -109,6 +118,7 @@ public class QueryRunnerTestHelper
public static final String PLACEMENT_DIMENSION = "placement";
public static final String PLACEMENTISH_DIMENSION = "placementish";
public static final String PARTIAL_NULL_DIMENSION = "partial_null_column";
public static final String PLACEMENTISH_DIMENSION_UNNEST = "placementish_unnest";
public static final List<String> DIMENSIONS = Lists.newArrayList(
MARKET_DIMENSION,
@ -353,6 +363,7 @@ public class QueryRunnerTestHelper
return !("rtIndex".equals(runnerName) || "noRollupRtIndex".equals(runnerName));
}
public static <T, QueryType extends Query<T>> List<QueryRunner<T>> makeQueryRunners(
QueryRunnerFactory<T, QueryType> factory
)
@ -395,6 +406,7 @@ public class QueryRunnerTestHelper
);
}
public static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunner(
QueryRunnerFactory<T, QueryType> factory,
String resourceFileName,
@ -467,6 +479,22 @@ public class QueryRunnerTestHelper
};
}
public static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunnerWithSegmentMapFn(
QueryRunnerFactory<T, QueryType> factory,
Segment adapter,
Query<T> query,
final String runnerName
)
{
final DataSource base = query.getDataSource();
final SegmentReference segmentReference = base.createSegmentMapFunction(query, new AtomicLong())
.apply(ReferenceCountingSegment.wrapRootGenerationSegment(
adapter));
return makeQueryRunner(factory, segmentReference, runnerName);
}
public static <T> QueryRunner<T> makeFilteringQueryRunner(
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline,
final QueryRunnerFactory<T, Query<T>> factory

View File

@ -0,0 +1,795 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.groupby;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.ExtractionDimensionSpec;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.extraction.StringFormatExtractionFn;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV1;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.joda.time.DateTime;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@RunWith(Parameterized.class)
public class UnnestGroupByQueryRunnerTest extends InitializedNullHandlingTest
{
private static TestGroupByBuffers BUFFER_POOLS = null;
private final GroupByQueryRunnerFactory factory;
private final GroupByQueryConfig config;
private final boolean vectorize;
@Rule
public ExpectedException expectedException = ExpectedException.none();
public UnnestGroupByQueryRunnerTest(
GroupByQueryConfig config,
GroupByQueryRunnerFactory factory,
boolean vectorize
)
{
this.config = config;
this.factory = factory;
this.vectorize = vectorize;
}
public static List<GroupByQueryConfig> testConfigs()
{
final GroupByQueryConfig v2Config = new GroupByQueryConfig()
{
@Override
public String getDefaultStrategy()
{
return GroupByStrategySelector.STRATEGY_V2;
}
@Override
public int getBufferGrouperInitialBuckets()
{
// Small initial table to force some growing.
return 4;
}
@Override
public String toString()
{
return "v2";
}
};
return ImmutableList.of(
v2Config
);
}
public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
final GroupByQueryConfig config,
final TestGroupByBuffers bufferPools
)
{
return makeQueryRunnerFactory(
GroupByQueryRunnerTest.DEFAULT_MAPPER,
config,
bufferPools,
GroupByQueryRunnerTest.DEFAULT_PROCESSING_CONFIG
);
}
public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
final ObjectMapper mapper,
final GroupByQueryConfig config,
final TestGroupByBuffers bufferPools
)
{
return makeQueryRunnerFactory(mapper, config, bufferPools, GroupByQueryRunnerTest.DEFAULT_PROCESSING_CONFIG);
}
public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
final ObjectMapper mapper,
final GroupByQueryConfig config,
final TestGroupByBuffers bufferPools,
final DruidProcessingConfig processingConfig
)
{
if (bufferPools.getBufferSize() != processingConfig.intermediateComputeSizeBytes()) {
throw new ISE(
"Provided buffer size [%,d] does not match configured size [%,d]",
bufferPools.getBufferSize(),
processingConfig.intermediateComputeSizeBytes()
);
}
if (bufferPools.getNumMergeBuffers() != processingConfig.getNumMergeBuffers()) {
throw new ISE(
"Provided merge buffer count [%,d] does not match configured count [%,d]",
bufferPools.getNumMergeBuffers(),
processingConfig.getNumMergeBuffers()
);
}
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final GroupByStrategySelector strategySelector = new GroupByStrategySelector(
configSupplier,
new GroupByStrategyV1(
configSupplier,
new GroupByQueryEngine(configSupplier, bufferPools.getProcessingPool()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
),
new GroupByStrategyV2(
processingConfig,
configSupplier,
bufferPools.getProcessingPool(),
bufferPools.getMergePool(),
TestHelper.makeJsonMapper(),
mapper,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
);
final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(strategySelector);
return new GroupByQueryRunnerFactory(strategySelector, toolChest);
}
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> constructorFeeder()
{
NullHandling.initializeForTests();
setUpClass();
final List<Object[]> constructors = new ArrayList<>();
for (GroupByQueryConfig config : testConfigs()) {
final GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(config, BUFFER_POOLS);
for (boolean vectorize : ImmutableList.of(false)) {
// Add vectorization tests for any indexes that support it.
if (!vectorize ||
config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
constructors.add(new Object[]{config, factory, vectorize});
}
}
}
return constructors;
}
@BeforeClass
public static void setUpClass()
{
if (BUFFER_POOLS == null) {
BUFFER_POOLS = TestGroupByBuffers.createDefault();
}
}
@AfterClass
public static void tearDownClass()
{
BUFFER_POOLS.close();
BUFFER_POOLS = null;
}
private static ResultRow makeRow(final GroupByQuery query, final String timestamp, final Object... vals)
{
return GroupByQueryRunnerTestHelper.createExpectedRow(query, timestamp, vals);
}
private static ResultRow makeRow(final GroupByQuery query, final DateTime timestamp, final Object... vals)
{
return GroupByQueryRunnerTestHelper.createExpectedRow(query, timestamp, vals);
}
private static List<ResultRow> makeRows(
final GroupByQuery query,
final String[] columnNames,
final Object[]... values
)
{
return GroupByQueryRunnerTestHelper.createExpectedRows(query, columnNames, values);
}
@Test
public void testGroupBy()
{
GroupByQuery query = makeQueryBuilder()
.setDataSource(UnnestDataSource.create(
new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE),
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION,
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST,
null
))
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setDimensions(new DefaultDimensionSpec("quality", "alias"))
.setAggregatorSpecs(
QueryRunnerTestHelper.ROWS_COUNT,
new LongSumAggregatorFactory("idx", "index")
)
.setGranularity(QueryRunnerTestHelper.DAY_GRAN)
.build();
List<ResultRow> expectedResults = Arrays.asList(
makeRow(
query,
"2011-04-01",
"alias",
"automotive",
"rows",
2L,
"idx",
270L
),
makeRow(
query,
"2011-04-01",
"alias",
"business",
"rows",
2L,
"idx",
236L
),
makeRow(
query,
"2011-04-01",
"alias",
"entertainment",
"rows",
2L,
"idx",
316L
),
makeRow(
query,
"2011-04-01",
"alias",
"health",
"rows",
2L,
"idx",
240L
),
makeRow(
query,
"2011-04-01",
"alias",
"mezzanine",
"rows",
6L,
"idx",
5740L
),
makeRow(
query,
"2011-04-01",
"alias",
"news",
"rows",
2L,
"idx",
242L
),
makeRow(
query,
"2011-04-01",
"alias",
"premium",
"rows",
6L,
"idx",
5800L
),
makeRow(
query,
"2011-04-01",
"alias",
"technology",
"rows",
2L,
"idx",
156L
),
makeRow(
query,
"2011-04-01",
"alias",
"travel",
"rows",
2L,
"idx",
238L
),
makeRow(
query,
"2011-04-02",
"alias",
"automotive",
"rows",
2L,
"idx",
294L
),
makeRow(
query,
"2011-04-02",
"alias",
"business",
"rows",
2L,
"idx",
224L
),
makeRow(
query,
"2011-04-02",
"alias",
"entertainment",
"rows",
2L,
"idx",
332L
),
makeRow(
query,
"2011-04-02",
"alias",
"health",
"rows",
2L,
"idx",
226L
),
makeRow(
query,
"2011-04-02",
"alias",
"mezzanine",
"rows",
6L,
"idx",
4894L
),
makeRow(
query,
"2011-04-02",
"alias",
"news",
"rows",
2L,
"idx",
228L
),
makeRow(
query,
"2011-04-02",
"alias",
"premium",
"rows",
6L,
"idx",
5010L
),
makeRow(
query,
"2011-04-02",
"alias",
"technology",
"rows",
2L,
"idx",
194L
),
makeRow(
query,
"2011-04-02",
"alias",
"travel",
"rows",
2L,
"idx",
252L
)
);
final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex();
final QueryRunner queryRunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn(
factory,
new IncrementalIndexSegment(
rtIndex,
QueryRunnerTestHelper.SEGMENT_ID
),
query,
"rtIndexvc"
);
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, queryRunner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "groupBy");
}
@Test
public void testGroupByOnMissingColumn()
{
// Cannot vectorize due to extraction dimension spec.
cannotVectorize();
GroupByQuery query = makeQueryBuilder()
.setDataSource(UnnestDataSource.create(
new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE),
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION,
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST,
null
))
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setDimensions(
new DefaultDimensionSpec("nonexistent0", "alias0"),
new ExtractionDimensionSpec("nonexistent1", "alias1", new StringFormatExtractionFn("foo"))
).setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT)
.setGranularity(QueryRunnerTestHelper.ALL_GRAN)
.build();
List<ResultRow> expectedResults = Collections.singletonList(
makeRow(
query,
"2011-04-01",
"alias0", null,
"alias1", "foo",
"rows", 52L
)
);
final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex();
final QueryRunner queryRunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn(
factory,
new IncrementalIndexSegment(
rtIndex,
QueryRunnerTestHelper.SEGMENT_ID
),
query,
"rtIndexvc"
);
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, queryRunner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "missing-column");
}
@Test
public void testGroupByOnUnnestedColumn()
{
cannotVectorize();
GroupByQuery query = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.UNNEST_DATA_SOURCE)
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setDimensions(
new DefaultDimensionSpec(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "alias0")
).setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT)
.setGranularity(QueryRunnerTestHelper.ALL_GRAN)
.build();
// Total rows should add up to 26 * 2 = 52
// 26 rows and each has 2 entries in the column to be unnested
List<ResultRow> expectedResults = Arrays.asList(
makeRow(
query,
"2011-04-01",
"alias0", "a",
"rows", 2L
),
makeRow(
query,
"2011-04-01",
"alias0", "preferred",
"rows", 26L
),
makeRow(
query,
"2011-04-01",
"alias0", "b",
"rows", 2L
),
makeRow(
query,
"2011-04-01",
"alias0", "e",
"rows", 2L
),
makeRow(
query,
"2011-04-01",
"alias0", "h",
"rows", 2L
),
makeRow(
query,
"2011-04-01",
"alias0", "m",
"rows", 6L
),
makeRow(
query,
"2011-04-01",
"alias0", "n",
"rows", 2L
),
makeRow(
query,
"2011-04-01",
"alias0", "p",
"rows", 6L
),
makeRow(
query,
"2011-04-01",
"alias0", "t",
"rows", 4L
)
);
final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex();
final QueryRunner queryRunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn(
factory,
new IncrementalIndexSegment(
rtIndex,
QueryRunnerTestHelper.SEGMENT_ID
),
query,
"rtIndexvc"
);
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, queryRunner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "groupBy-on-unnested-column");
}
@Test
public void testGroupByOnUnnestedVirtualColumn()
{
cannotVectorize();
final DataSource unnestDataSource = UnnestDataSource.create(
new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE),
"vc",
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST,
null
);
GroupByQuery query = makeQueryBuilder()
.setDataSource(unnestDataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setDimensions(
new DefaultDimensionSpec(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "alias0")
).setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT)
.setGranularity(QueryRunnerTestHelper.ALL_GRAN)
.setVirtualColumns(
new ExpressionVirtualColumn(
"vc",
"mv_to_array(placementish)",
ColumnType.STRING_ARRAY,
TestExprMacroTable.INSTANCE
)
)
.addOrderByColumn("alias0", OrderByColumnSpec.Direction.ASCENDING)
.build();
// Total rows should add up to 26 * 2 = 52
// 26 rows and each has 2 entries in the column to be unnested
List<ResultRow> expectedResults = Arrays.asList(
makeRow(
query,
"2011-04-01",
"alias0", "preferred",
"rows", 26L
),
makeRow(
query,
"2011-04-01",
"alias0", "e",
"rows", 2L
),
makeRow(
query,
"2011-04-01",
"alias0", "b",
"rows", 2L
),
makeRow(
query,
"2011-04-01",
"alias0", "h",
"rows", 2L
),
makeRow(
query,
"2011-04-01",
"alias0", "a",
"rows", 2L
),
makeRow(
query,
"2011-04-01",
"alias0", "m",
"rows", 6L
),
makeRow(
query,
"2011-04-01",
"alias0", "n",
"rows", 2L
),
makeRow(
query,
"2011-04-01",
"alias0", "p",
"rows", 6L
),
makeRow(
query,
"2011-04-01",
"alias0", "t",
"rows", 4L
)
);
final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex();
final QueryRunner queryRunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn(
factory,
new IncrementalIndexSegment(
rtIndex,
QueryRunnerTestHelper.SEGMENT_ID
),
query,
"rtIndexvc"
);
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, queryRunner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "groupBy-on-unnested-virtual-column");
}
@Test
public void testGroupByOnUnnestedVirtualMultiColumn()
{
cannotVectorize();
final DataSource unnestDataSource = UnnestDataSource.create(
new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE),
"vc",
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST,
null
);
GroupByQuery query = makeQueryBuilder()
.setDataSource(unnestDataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setDimensions(
new DefaultDimensionSpec(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "alias0")
).setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT)
.setGranularity(QueryRunnerTestHelper.ALL_GRAN)
.setVirtualColumns(
new ExpressionVirtualColumn(
"vc",
"array(\"market\",\"quality\")",
ColumnType.STRING,
TestExprMacroTable.INSTANCE
)
)
.setLimit(3)
.build();
// Total rows should add up to 26 * 2 = 52
// 26 rows and each has 2 entries in the column to be unnested
List<ResultRow> expectedResults = Arrays.asList(
makeRow(
query,
"2011-04-01",
"alias0", "business",
"rows", 2L
),
makeRow(
query,
"2011-04-01",
"alias0", "health",
"rows", 2L
),
makeRow(
query,
"2011-04-01",
"alias0", "travel",
"rows", 2L
)
);
final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex();
final QueryRunner queryRunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn(
factory,
new IncrementalIndexSegment(
rtIndex,
QueryRunnerTestHelper.SEGMENT_ID
),
query,
"rtIndexvc"
);
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, queryRunner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "groupBy-on-unnested-virtual-columns");
}
/**
* Use this method instead of makeQueryBuilder() to make sure the context is set properly. Also, avoid
* setContext in tests. Only use overrideContext.
*/
private GroupByQuery.Builder makeQueryBuilder()
{
return GroupByQuery.builder().overrideContext(makeContext());
}
/**
* Use this method instead of makeQueryBuilder() to make sure the context is set properly. Also, avoid
* setContext in tests. Only use overrideContext.
*/
private GroupByQuery.Builder makeQueryBuilder(final GroupByQuery query)
{
return new GroupByQuery.Builder(query).overrideContext(makeContext());
}
private Map<String, Object> makeContext()
{
return ImmutableMap.<String, Object>builder()
.put(QueryContexts.VECTORIZE_KEY, vectorize ? "force" : "false")
.put(QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize ? "force" : "false")
.put("vectorSize", 16) // Small vector size to ensure we use more than one.
.build();
}
private void cannotVectorize()
{
if (vectorize && config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
expectedException.expect(RuntimeException.class);
expectedException.expectMessage("Cannot vectorize!");
}
}
}

View File

@ -284,6 +284,7 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest
null,
QueryRunnerTestHelper.INDEX_METRIC + ":DOUBLE"
},
legacy,
V_0112_0114
);
@ -334,6 +335,7 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest
null,
QueryRunnerTestHelper.INDEX_METRIC + ":DOUBLE"
},
legacy,
V_0112_0114
),
legacy ? Lists.newArrayList(getTimestampName(), "market", "index") : Lists.newArrayList("market", "index"),
@ -371,6 +373,7 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest
null,
QueryRunnerTestHelper.INDEX_METRIC + ":DOUBLE"
},
legacy,
V_0112_0114
),
legacy ? Lists.newArrayList(getTimestampName(), "market", "index") : Lists.newArrayList("market", "index"),
@ -403,6 +406,7 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest
null,
QueryRunnerTestHelper.INDEX_METRIC + ":DOUBLE"
},
legacy,
// filtered values with day granularity
new String[]{
"2011-01-12T00:00:00.000Z\tspot\tautomotive\tpreferred\tapreferred\t100.000000",
@ -466,6 +470,7 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest
null,
QueryRunnerTestHelper.INDEX_METRIC + ":DOUBLE"
},
legacy,
// filtered values with day granularity
new String[]{
"2011-01-12T00:00:00.000Z\ttotal_market\tmezzanine\tpreferred\tmpreferred\t1000.000000",
@ -529,6 +534,7 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest
final List<List<Map<String, Object>>> events = toEvents(
legacy ? new String[]{getTimestampName() + ":TIME"} : new String[0],
legacy,
V_0112_0114
);
@ -592,6 +598,7 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest
null,
QueryRunnerTestHelper.INDEX_METRIC + ":DOUBLE"
},
legacy,
(String[]) ArrayUtils.addAll(seg1Results, seg2Results)
);
@ -681,6 +688,7 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest
null,
QueryRunnerTestHelper.INDEX_METRIC + ":DOUBLE"
},
legacy,
expectedRet
);
if (legacy) {
@ -769,6 +777,7 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest
null,
QueryRunnerTestHelper.INDEX_METRIC + ":DOUBLE"
},
legacy,
(String[]) ArrayUtils.addAll(seg1Results, seg2Results)
);
if (legacy) {
@ -861,6 +870,7 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest
null,
QueryRunnerTestHelper.INDEX_METRIC + ":DOUBLE"
},
legacy,
expectedRet //segments in reverse order from above
);
if (legacy) {
@ -1008,11 +1018,12 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest
"indexMaxFloat",
"quality_uniques"
},
legacy,
valueSet
);
}
private List<List<Map<String, Object>>> toEvents(final String[] dimSpecs, final String[]... valueSet)
public static List<List<Map<String, Object>>> toEvents(final String[] dimSpecs, boolean legacy, final String[]... valueSet)
{
List<String> values = new ArrayList<>();
for (String[] vSet : valueSet) {
@ -1074,13 +1085,30 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest
if (specs.length == 1 || specs[1].equals("STRING")) {
eventVal = values1[i];
} else if (specs[1].equals("TIME")) {
eventVal = toTimestamp(values1[i]);
eventVal = toTimestamp(values1[i], legacy);
} else if (specs[1].equals("FLOAT")) {
eventVal = values1[i].isEmpty() ? NullHandling.defaultFloatValue() : Float.valueOf(values1[i]);
try {
eventVal = values1[i].isEmpty() ? NullHandling.defaultFloatValue() : Float.valueOf(values1[i]);
}
catch (NumberFormatException nfe) {
throw new ISE("This object cannot be converted to a Float!");
}
} else if (specs[1].equals("DOUBLE")) {
eventVal = values1[i].isEmpty() ? NullHandling.defaultDoubleValue() : Double.valueOf(values1[i]);
try {
eventVal = values1[i].isEmpty()
? NullHandling.defaultDoubleValue()
: Double.valueOf(values1[i]);
}
catch (NumberFormatException nfe) {
throw new ISE("This object cannot be converted to a Double!");
}
} else if (specs[1].equals("LONG")) {
eventVal = values1[i].isEmpty() ? NullHandling.defaultLongValue() : Long.valueOf(values1[i]);
try {
eventVal = values1[i].isEmpty() ? NullHandling.defaultLongValue() : Long.valueOf(values1[i]);
}
catch (NumberFormatException nfe) {
throw new ISE("This object cannot be converted to a Long!");
}
} else if (specs[1].equals(("NULL"))) {
eventVal = null;
} else if (specs[1].equals("STRINGS")) {
@ -1099,7 +1127,7 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest
return events;
}
private Object toTimestamp(final String value)
private static Object toTimestamp(final String value, boolean legacy)
{
if (legacy) {
return DateTimes.of(value);

View File

@ -0,0 +1,551 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.scan;
import com.google.common.collect.Lists;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.joda.time.DateTime;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@RunWith(Parameterized.class)
public class UnnestScanQueryRunnerTest extends InitializedNullHandlingTest
{
public static final QuerySegmentSpec I_0112_0114 = ScanQueryRunnerTest.I_0112_0114;
private static final ScanQueryQueryToolChest TOOL_CHEST = new ScanQueryQueryToolChest(
new ScanQueryConfig(),
DefaultGenericQueryMetricsFactory.instance()
);
private static final ScanQueryRunnerFactory FACTORY = new ScanQueryRunnerFactory(
TOOL_CHEST,
new ScanQueryEngine(),
new ScanQueryConfig()
);
private final IncrementalIndex index;
private final boolean legacy;
public UnnestScanQueryRunnerTest(final IncrementalIndex index, final boolean legacy)
{
this.index = index;
this.legacy = legacy;
}
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder()
{
NullHandling.initializeForTests();
final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex();
final List<Object[]> constructors = new ArrayList<>();
constructors.add(new Object[]{rtIndex, true});
constructors.add(new Object[]{rtIndex, false});
return constructors;
}
private Druids.ScanQueryBuilder newTestUnnestQuery()
{
return Druids.newScanQueryBuilder()
.dataSource(QueryRunnerTestHelper.UNNEST_DATA_SOURCE)
.columns(Collections.emptyList())
.eternityInterval()
.limit(3)
.legacy(legacy);
}
private Druids.ScanQueryBuilder newTestUnnestQueryWithAllowSet()
{
List<String> allowList = Arrays.asList("a", "b", "c");
LinkedHashSet allowSet = new LinkedHashSet(allowList);
return Druids.newScanQueryBuilder()
.dataSource(UnnestDataSource.create(
new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE),
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION,
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST,
allowSet
))
.columns(Collections.emptyList())
.eternityInterval()
.limit(3)
.legacy(legacy);
}
@Test
public void testScanOnUnnest()
{
ScanQuery query = newTestUnnestQuery()
.intervals(I_0112_0114)
.columns(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
.limit(3)
.build();
final QueryRunner queryRunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn(
FACTORY,
new IncrementalIndexSegment(
index,
QueryRunnerTestHelper.SEGMENT_ID
),
query,
"rtIndexvc"
);
Iterable<ScanResultValue> results = queryRunner.run(QueryPlus.wrap(query)).toList();
String[] columnNames;
if (legacy) {
columnNames = new String[]{
getTimestampName() + ":TIME",
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
};
} else {
columnNames = new String[]{
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
};
}
String[] values;
if (legacy) {
values = new String[]{
"2011-01-12T00:00:00.000Z\ta",
"2011-01-12T00:00:00.000Z\tpreferred",
"2011-01-12T00:00:00.000Z\tb"
};
} else {
values = new String[]{
"a",
"preferred",
"b"
};
}
final List<List<Map<String, Object>>> events = ScanQueryRunnerTest.toEvents(columnNames, legacy, values);
List<ScanResultValue> expectedResults = toExpected(
events,
legacy
? Lists.newArrayList(getTimestampName(), QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
: Collections.singletonList(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST),
0,
3
);
ScanQueryRunnerTest.verify(expectedResults, results);
}
@Test
public void testUnnestRunnerVirtualColumnsUsingSingleColumn()
{
ScanQuery query =
Druids.newScanQueryBuilder()
.intervals(I_0112_0114)
.dataSource(UnnestDataSource.create(
new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE),
"vc",
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST,
null
))
.columns(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
.eternityInterval()
.legacy(legacy)
.virtualColumns(
new ExpressionVirtualColumn(
"vc",
"mv_to_array(placementish)",
ColumnType.STRING,
TestExprMacroTable.INSTANCE
)
)
.limit(3)
.build();
QueryRunner vcrunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn(
FACTORY,
new IncrementalIndexSegment(
index,
QueryRunnerTestHelper.SEGMENT_ID
),
query,
"rtIndexvc"
);
Iterable<ScanResultValue> results = vcrunner.run(QueryPlus.wrap(query)).toList();
String[] columnNames;
if (legacy) {
columnNames = new String[]{
getTimestampName() + ":TIME",
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
};
} else {
columnNames = new String[]{
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
};
}
String[] values;
if (legacy) {
values = new String[]{
"2011-01-12T00:00:00.000Z\ta",
"2011-01-12T00:00:00.000Z\tpreferred",
"2011-01-12T00:00:00.000Z\tb"
};
} else {
values = new String[]{
"a",
"preferred",
"b"
};
}
final List<List<Map<String, Object>>> events = ScanQueryRunnerTest.toEvents(columnNames, legacy, values);
List<ScanResultValue> expectedResults = toExpected(
events,
legacy
? Lists.newArrayList(getTimestampName(), QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
: Collections.singletonList(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST),
0,
3
);
ScanQueryRunnerTest.verify(expectedResults, results);
}
@Test
public void testUnnestRunnerVirtualColumnsUsingMultipleColumn()
{
ScanQuery query =
Druids.newScanQueryBuilder()
.intervals(I_0112_0114)
.dataSource(UnnestDataSource.create(
new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE),
"vc",
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST,
null
))
.columns(QueryRunnerTestHelper.MARKET_DIMENSION, QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
.eternityInterval()
.legacy(legacy)
.virtualColumns(
new ExpressionVirtualColumn(
"vc",
"array(\"market\",\"quality\")",
ColumnType.STRING,
TestExprMacroTable.INSTANCE
)
)
.limit(4)
.build();
QueryRunner vcrunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn(
FACTORY,
new IncrementalIndexSegment(
index,
QueryRunnerTestHelper.SEGMENT_ID
),
query,
"rtIndexvc"
);
Iterable<ScanResultValue> results = vcrunner.run(QueryPlus.wrap(query)).toList();
String[] columnNames;
if (legacy) {
columnNames = new String[]{
getTimestampName() + ":TIME",
QueryRunnerTestHelper.MARKET_DIMENSION,
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
};
} else {
columnNames = new String[]{
QueryRunnerTestHelper.MARKET_DIMENSION,
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
};
}
String[] values;
if (legacy) {
values = new String[]{
"2011-01-12T00:00:00.000Z\tspot\tspot",
"2011-01-12T00:00:00.000Z\tspot\tautomotive",
"2011-01-12T00:00:00.000Z\tspot\tspot",
"2011-01-12T00:00:00.000Z\tspot\tbusiness",
};
} else {
values = new String[]{
"spot\tspot",
"spot\tautomotive",
"spot\tspot",
"spot\tbusiness"
};
}
final List<List<Map<String, Object>>> events = ScanQueryRunnerTest.toEvents(columnNames, legacy, values);
List<ScanResultValue> expectedResults = toExpected(
events,
legacy
? Lists.newArrayList(
getTimestampName(),
QueryRunnerTestHelper.MARKET_DIMENSION,
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
)
: Lists.newArrayList(
QueryRunnerTestHelper.MARKET_DIMENSION,
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
),
0,
4
);
ScanQueryRunnerTest.verify(expectedResults, results);
}
@Test
public void testUnnestRunnerWithFilter()
{
ScanQuery query = newTestUnnestQuery()
.intervals(I_0112_0114)
.columns(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
.limit(3)
.filters(new SelectorDimFilter(QueryRunnerTestHelper.MARKET_DIMENSION, "spot", null))
.build();
final QueryRunner queryRunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn(
FACTORY,
new IncrementalIndexSegment(
index,
QueryRunnerTestHelper.SEGMENT_ID
),
query,
"rtIndexvc"
);
Iterable<ScanResultValue> results = queryRunner.run(QueryPlus.wrap(query)).toList();
String[] columnNames;
if (legacy) {
columnNames = new String[]{
getTimestampName() + ":TIME",
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
};
} else {
columnNames = new String[]{
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
};
}
String[] values;
if (legacy) {
values = new String[]{
"2011-01-12T00:00:00.000Z\ta",
"2011-01-12T00:00:00.000Z\tpreferred",
"2011-01-12T00:00:00.000Z\tb"
};
} else {
values = new String[]{
"a",
"preferred",
"b"
};
}
final List<List<Map<String, Object>>> events = ScanQueryRunnerTest.toEvents(columnNames, legacy, values);
List<ScanResultValue> expectedResults = toExpected(
events,
legacy
? Lists.newArrayList(getTimestampName(), QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
: Collections.singletonList(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST),
0,
3
);
ScanQueryRunnerTest.verify(expectedResults, results);
}
@Test
public void testUnnestRunnerWithOrdering()
{
ScanQuery query = newTestUnnestQuery()
.intervals(I_0112_0114)
.columns(QueryRunnerTestHelper.TIME_DIMENSION, QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
.limit(3)
.filters(new SelectorDimFilter(QueryRunnerTestHelper.MARKET_DIMENSION, "spot", null))
.order(ScanQuery.Order.ASCENDING)
.build();
final QueryRunner queryRunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn(
FACTORY,
new IncrementalIndexSegment(
index,
QueryRunnerTestHelper.SEGMENT_ID
),
query,
"rtIndexvc"
);
Iterable<ScanResultValue> results = queryRunner.run(QueryPlus.wrap(query)).toList();
String[] columnNames;
if (legacy) {
columnNames = new String[]{
getTimestampName() + ":TIME",
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
};
} else {
columnNames = new String[]{
ColumnHolder.TIME_COLUMN_NAME,
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
};
}
String[] values;
values = new String[]{
"2011-01-12T00:00:00.000Z\ta",
"2011-01-12T00:00:00.000Z\tpreferred",
"2011-01-12T00:00:00.000Z\tb"
};
final List<List<Map<String, Object>>> ascendingEvents = ScanQueryRunnerTest.toEvents(columnNames, legacy, values);
if (legacy) {
for (List<Map<String, Object>> batch : ascendingEvents) {
for (Map<String, Object> event : batch) {
event.put("__time", ((DateTime) event.get("timestamp")).getMillis());
}
}
} else {
for (List<Map<String, Object>> batch : ascendingEvents) {
for (Map<String, Object> event : batch) {
event.put("__time", (DateTimes.of((String) event.get("__time"))).getMillis());
}
}
}
List<ScanResultValue> ascendingExpectedResults = toExpected(
ascendingEvents,
legacy ?
Lists.newArrayList(
QueryRunnerTestHelper.TIME_DIMENSION,
getTimestampName(),
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
) :
Lists.newArrayList(
QueryRunnerTestHelper.TIME_DIMENSION,
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
),
0,
3
);
ScanQueryRunnerTest.verify(ascendingExpectedResults, results);
}
@Test
public void testUnnestRunnerNonNullAllowSet()
{
ScanQuery query = newTestUnnestQueryWithAllowSet()
.intervals(I_0112_0114)
.columns(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
.limit(3)
.build();
final QueryRunner queryRunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn(
FACTORY,
new IncrementalIndexSegment(
index,
QueryRunnerTestHelper.SEGMENT_ID
),
query,
"rtIndexvc"
);
Iterable<ScanResultValue> results = queryRunner.run(QueryPlus.wrap(query)).toList();
String[] columnNames;
if (legacy) {
columnNames = new String[]{
getTimestampName() + ":TIME",
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
};
} else {
columnNames = new String[]{
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
};
}
String[] values;
if (legacy) {
values = new String[]{
"2011-01-12T00:00:00.000Z\ta",
"2011-01-12T00:00:00.000Z\tb",
"2011-01-13T00:00:00.000Z\ta"
};
} else {
values = new String[]{
"a",
"b",
"a"
};
}
final List<List<Map<String, Object>>> events = ScanQueryRunnerTest.toEvents(columnNames, legacy, values);
List<ScanResultValue> expectedResults = toExpected(
events,
legacy
? Lists.newArrayList(getTimestampName(), QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
: Collections.singletonList(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST),
0,
3
);
ScanQueryRunnerTest.verify(expectedResults, results);
}
private String getTimestampName()
{
return legacy ? "timestamp" : ColumnHolder.TIME_COLUMN_NAME;
}
private List<ScanResultValue> toExpected(
List<List<Map<String, Object>>> targets,
List<String> columns,
final int offset,
final int limit
)
{
List<ScanResultValue> expected = Lists.newArrayListWithExpectedSize(targets.size());
for (List<Map<String, Object>> group : targets) {
List<Map<String, Object>> events = Lists.newArrayListWithExpectedSize(limit);
int end = Math.min(group.size(), offset + limit);
if (end == 0) {
end = group.size();
}
events.addAll(group.subList(offset, end));
expected.add(new ScanResultValue(QueryRunnerTestHelper.SEGMENT_ID.toString(), columns, events));
}
return expected;
}
}

View File

@ -0,0 +1,413 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.topn;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.druid.collections.CloseableStupidPool;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.Result;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.TestQueryRunners;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleMinAggregatorFactory;
import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
*
*/
@RunWith(Parameterized.class)
public class UnnestTopNQueryRunnerTest extends InitializedNullHandlingTest
{
private static final Closer RESOURCE_CLOSER = Closer.create();
private final List<AggregatorFactory> commonAggregators;
public UnnestTopNQueryRunnerTest(
List<AggregatorFactory> commonAggregators
)
{
this.commonAggregators = commonAggregators;
}
@AfterClass
public static void teardown() throws IOException
{
RESOURCE_CLOSER.close();
}
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder()
{
List<Object[]> constructors = new ArrayList<>();
constructors.add(new Object[]{QueryRunnerTestHelper.COMMON_FLOAT_AGGREGATORS});
return constructors;
}
private Sequence<Result<TopNResultValue>> assertExpectedResultsWithCustomRunner(
Iterable<Result<TopNResultValue>> expectedResults,
TopNQuery query,
QueryRunner runner
)
{
final Sequence<Result<TopNResultValue>> retval = runWithMerge(query, runner);
TestHelper.assertExpectedResults(expectedResults, retval);
return retval;
}
private Sequence<Result<TopNResultValue>> runWithMerge(TopNQuery query, QueryRunner runner)
{
return runWithMerge(query, ResponseContext.createEmpty(), runner);
}
private Sequence<Result<TopNResultValue>> runWithMerge(TopNQuery query, ResponseContext context, QueryRunner runner1)
{
final TopNQueryQueryToolChest chest = new TopNQueryQueryToolChest(new TopNQueryConfig());
final QueryRunner<Result<TopNResultValue>> mergeRunner = new FinalizeResultsQueryRunner(
chest.mergeResults(runner1),
chest
);
return mergeRunner.run(QueryPlus.wrap(query), context);
}
@Test
public void testEmptyTopN()
{
final CloseableStupidPool<ByteBuffer> defaultPool = TestQueryRunners.createDefaultNonBlockingPool();
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.UNNEST_DATA_SOURCE)
.granularity(QueryRunnerTestHelper.ALL_GRAN)
.dimension(QueryRunnerTestHelper.MARKET_DIMENSION)
.metric(QueryRunnerTestHelper.INDEX_METRIC)
.threshold(4)
.intervals(QueryRunnerTestHelper.EMPTY_INTERVAL)
.aggregators(
Lists.newArrayList(
Iterables.concat(
commonAggregators,
Lists.newArrayList(
new DoubleMaxAggregatorFactory("maxIndex", "index"),
new DoubleMinAggregatorFactory("minIndex", "index"),
new DoubleFirstAggregatorFactory("first", "index", null)
)
)
)
)
.postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT)
.build();
final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex();
QueryRunner queryRunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn(
new TopNQueryRunnerFactory(
defaultPool,
new TopNQueryQueryToolChest(new TopNQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
),
new IncrementalIndexSegment(
rtIndex,
QueryRunnerTestHelper.SEGMENT_ID
),
query,
"rtIndexvc"
);
List<Result<TopNResultValue>> expectedResults = ImmutableList.of(
new Result<>(
DateTimes.of("2020-04-02T00:00:00.000Z"),
new TopNResultValue(ImmutableList.of())
)
);
assertExpectedResultsWithCustomRunner(expectedResults, query, queryRunner);
}
@Test
public void testTopNLexicographicUnnest()
{
final CloseableStupidPool<ByteBuffer> defaultPool = TestQueryRunners.createDefaultNonBlockingPool();
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.UNNEST_DATA_SOURCE)
.granularity(QueryRunnerTestHelper.ALL_GRAN)
.dimension(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
.metric(new DimensionTopNMetricSpec("", StringComparators.LEXICOGRAPHIC))
.threshold(4)
.intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
.aggregators(commonAggregators)
.postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT)
.build();
final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex();
QueryRunner queryRunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn(
new TopNQueryRunnerFactory(
defaultPool,
new TopNQueryQueryToolChest(new TopNQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
),
new IncrementalIndexSegment(
rtIndex,
QueryRunnerTestHelper.SEGMENT_ID
),
query,
"rtIndexvc"
);
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "a",
"rows", 2L,
"index", 283.311029D,
"addRowsIndexConstant", 286.311029D,
"uniques", QueryRunnerTestHelper.UNIQUES_1
),
ImmutableMap.of(
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "b",
"rows", 2L,
"index", 231.557367D,
"addRowsIndexConstant", 234.557367D,
"uniques", QueryRunnerTestHelper.UNIQUES_1
),
ImmutableMap.of(
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "e",
"rows", 2L,
"index", 324.763273D,
"addRowsIndexConstant", 327.763273D,
"uniques", QueryRunnerTestHelper.UNIQUES_1
),
ImmutableMap.of(
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "h",
"rows", 2L,
"index", 233.580712D,
"addRowsIndexConstant", 236.580712D,
"uniques", QueryRunnerTestHelper.UNIQUES_1
)
)
)
)
);
assertExpectedResultsWithCustomRunner(expectedResults, query, queryRunner);
}
@Test
public void testTopNStringVirtualColumnUnnest()
{
final CloseableStupidPool<ByteBuffer> defaultPool = TestQueryRunners.createDefaultNonBlockingPool();
TopNQuery query = new TopNQueryBuilder()
.dataSource(UnnestDataSource.create(
new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE),
"vc",
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST,
null
))
.granularity(QueryRunnerTestHelper.ALL_GRAN)
.virtualColumns(
new ExpressionVirtualColumn(
"vc",
"mv_to_array(\"placementish\")",
ColumnType.STRING_ARRAY,
TestExprMacroTable.INSTANCE
)
)
.dimension(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
.metric("rows")
.threshold(4)
.intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
.aggregators(commonAggregators)
.postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT)
.build();
final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex();
QueryRunner vcrunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn(
new TopNQueryRunnerFactory(
defaultPool,
new TopNQueryQueryToolChest(new TopNQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
),
new IncrementalIndexSegment(
rtIndex,
QueryRunnerTestHelper.SEGMENT_ID
),
query,
"rtIndexvc"
);
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "preferred",
"rows", 26L,
"index", 12459.361287D,
"addRowsIndexConstant", 12486.361287D,
"uniques", QueryRunnerTestHelper.UNIQUES_9
),
ImmutableMap.of(
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "m",
"rows", 6L,
"index", 5320.717303D,
"addRowsIndexConstant", 5327.717303D,
"uniques", QueryRunnerTestHelper.UNIQUES_1
),
ImmutableMap.of(
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "p",
"rows", 6L,
"index", 5407.213795D,
"addRowsIndexConstant", 5414.213795D,
"uniques", QueryRunnerTestHelper.UNIQUES_1
),
ImmutableMap.of(
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "t",
"rows", 4L,
"index", 422.344086D,
"addRowsIndexConstant", 427.344086D,
"uniques", QueryRunnerTestHelper.UNIQUES_2
)
)
)
)
);
assertExpectedResultsWithCustomRunner(expectedResults, query, vcrunner);
}
@Test
public void testTopNStringVirtualMultiColumnUnnest()
{
final CloseableStupidPool<ByteBuffer> defaultPool = TestQueryRunners.createDefaultNonBlockingPool();
final CloseableStupidPool<ByteBuffer> customPool = new CloseableStupidPool<>(
"TopNQueryRunnerFactory-bufferPool",
() -> ByteBuffer.allocate(20000)
);
TopNQuery query = new TopNQueryBuilder()
.dataSource(UnnestDataSource.create(
new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE),
"vc",
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST,
null
))
.granularity(QueryRunnerTestHelper.ALL_GRAN)
.virtualColumns(
new ExpressionVirtualColumn(
"vc",
"array(\"market\",\"quality\")",
ColumnType.STRING,
TestExprMacroTable.INSTANCE
)
)
.dimension(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
.metric("rows")
.threshold(2)
.intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
.aggregators(commonAggregators)
.postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT)
.build();
final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex();
QueryRunner vcrunner = QueryRunnerTestHelper.makeQueryRunnerWithSegmentMapFn(
new TopNQueryRunnerFactory(
defaultPool,
new TopNQueryQueryToolChest(new TopNQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
),
new IncrementalIndexSegment(
rtIndex,
QueryRunnerTestHelper.SEGMENT_ID
),
query,
"rtIndexvc"
);
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "spot",
"rows", 18L,
"index", 2231.876812D,
"addRowsIndexConstant", 2250.876812D,
"uniques", QueryRunnerTestHelper.UNIQUES_9
),
ImmutableMap.of(
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "premium",
"rows", 6L,
"index", 5407.213795D,
"addRowsIndexConstant", 5414.213795D,
"uniques", QueryRunnerTestHelper.UNIQUES_1
)
)
)
)
);
assertExpectedResultsWithCustomRunner(expectedResults, query, vcrunner);
RESOURCE_CLOSER.register(() -> {
// Verify that all objects have been returned to the pool.
Assert.assertEquals("defaultPool objects created", defaultPool.poolSize(), defaultPool.objectsCreatedCount());
Assert.assertEquals("customPool objects created", customPool.poolSize(), customPool.objectsCreatedCount());
defaultPool.close();
customPool.close();
});
}
}