Enable multiple distinct aggregators in same query (#11014)

* Enable multiple distinct count

* Add more tests

* fix sql test

* docs fix

* Address nits
This commit is contained in:
Abhishek Agarwal 2021-04-07 13:22:19 +05:30 committed by GitHub
parent d28d4e8ed3
commit 0df0bff44b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 396 additions and 104 deletions

View File

@ -1709,6 +1709,7 @@ The Druid SQL server is configured through the following properties on the Broke
|`druid.sql.planner.maxTopNLimit`|Maximum threshold for a [TopN query](../querying/topnquery.md). Higher limits will be planned as [GroupBy queries](../querying/groupbyquery.md) instead.|100000|
|`druid.sql.planner.metadataRefreshPeriod`|Throttle for metadata refreshes.|PT1M|
|`druid.sql.planner.useApproximateCountDistinct`|Whether to use an approximate cardinality algorithm for `COUNT(DISTINCT foo)`.|true|
|`druid.sql.planner.useGroupingSetForExactDistinct`|Only relevant when `useApproximateCountDistinct` is disabled. If set to true, exact distinct queries are re-written using grouping sets. Otherwise, exact distinct queries are re-written using joins. This should be set to true for group by query with multiple exact distinct aggregations. This flag can be overridden per query.|false|
|`druid.sql.planner.useApproximateTopN`|Whether to use approximate [TopN queries](../querying/topnquery.md) when a SQL query could be expressed as such. If false, exact [GroupBy queries](../querying/groupbyquery.md) will be used instead.|true|
|`druid.sql.planner.requireTimeCondition`|Whether to require SQL to have filter conditions on __time column so that all generated native queries will have user specified intervals. If true, all queries without filter condition on __time column will fail|false|
|`druid.sql.planner.sqlTimeZone`|Sets the default time zone for the server, which will affect how time functions and timestamp literals behave. Should be a time zone name like "America/Los_Angeles" or offset like "-08:00".|UTC|

View File

@ -322,7 +322,7 @@ Only the COUNT aggregation can accept DISTINCT.
|Function|Notes|
|--------|-----|
|`COUNT(*)`|Counts the number of rows.|
|`COUNT(DISTINCT expr)`|Counts distinct values of expr, which can be string, numeric, or hyperUnique. By default this is approximate, using a variant of [HyperLogLog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf). To get exact counts set "useApproximateCountDistinct" to "false". If you do this, expr must be string or numeric, since exact counts are not possible using hyperUnique columns. See also `APPROX_COUNT_DISTINCT(expr)`. In exact mode, only one distinct count per query is permitted.|
|`COUNT(DISTINCT expr)`|Counts distinct values of expr, which can be string, numeric, or hyperUnique. By default this is approximate, using a variant of [HyperLogLog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf). To get exact counts set "useApproximateCountDistinct" to "false". If you do this, expr must be string or numeric, since exact counts are not possible using hyperUnique columns. See also `APPROX_COUNT_DISTINCT(expr)`. In exact mode, only one distinct count per query is permitted unless `useGroupingSetForExactDistinct` is set to true in query contexts or broker configurations.|
|`SUM(expr)`|Sums numbers.|
|`MIN(expr)`|Takes the minimum of numbers.|
|`MAX(expr)`|Takes the maximum of numbers.|
@ -1015,6 +1015,7 @@ Connection context can be specified as JDBC connection properties or as a "conte
|`sqlQueryId`|Unique identifier given to this SQL query. For HTTP client, it will be returned in `X-Druid-SQL-Query-Id` header.|auto-generated|
|`sqlTimeZone`|Sets the time zone for this connection, which will affect how time functions and timestamp literals behave. Should be a time zone name like "America/Los_Angeles" or offset like "-08:00".|druid.sql.planner.sqlTimeZone on the Broker (default: UTC)|
|`useApproximateCountDistinct`|Whether to use an approximate cardinality algorithm for `COUNT(DISTINCT foo)`.|druid.sql.planner.useApproximateCountDistinct on the Broker (default: true)|
|`useGroupingSetForExactDistinct`|Whether to use grouping sets to execute queries with multiple exact distinct aggregations.|druid.sql.planner.useGroupingSetForExactDistinct on the Broker (default: false)|
|`useApproximateTopN`|Whether to use approximate [TopN queries](topnquery.md) when a SQL query could be expressed as such. If false, exact [GroupBy queries](groupbyquery.md) will be used instead.|druid.sql.planner.useApproximateTopN on the Broker (default: true)|
## Metadata tables

View File

@ -20,6 +20,7 @@
package org.apache.druid.query.groupby.strategy;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
@ -85,8 +86,8 @@ public class GroupByStrategyV2 implements GroupByStrategy
public static final String CTX_KEY_FUDGE_TIMESTAMP = "fudgeTimestamp";
public static final String CTX_KEY_OUTERMOST = "groupByOutermost";
// see countRequiredMergeBufferNum() for explanation
private static final int MAX_MERGE_BUFFER_NUM = 2;
// see countRequiredMergeBufferNumWithoutSubtotal() for explanation
private static final int MAX_MERGE_BUFFER_NUM_WITHOUT_SUBTOTAL = 2;
private final DruidProcessingConfig processingConfig;
private final Supplier<GroupByQueryConfig> configSupplier;
@ -116,8 +117,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
@Override
public GroupByQueryResource prepareResource(GroupByQuery query)
{
final int requiredMergeBufferNum = countRequiredMergeBufferNum(query, 1) +
numMergeBuffersNeededForSubtotalsSpec(query);
final int requiredMergeBufferNum = countRequiredMergeBufferNum(query);
if (requiredMergeBufferNum > mergeBufferPool.maxSize()) {
throw new ResourceLimitExceededException(
@ -146,7 +146,13 @@ public class GroupByStrategyV2 implements GroupByStrategy
}
}
private static int countRequiredMergeBufferNum(Query query, int foundNum)
@VisibleForTesting
public static int countRequiredMergeBufferNum(GroupByQuery query)
{
return countRequiredMergeBufferNumWithoutSubtotal(query, 1) + numMergeBuffersNeededForSubtotalsSpec(query);
}
private static int countRequiredMergeBufferNumWithoutSubtotal(Query query, int foundNum)
{
// Note: A broker requires merge buffers for processing the groupBy layers beyond the inner-most one.
// For example, the number of required merge buffers for a nested groupBy (groupBy -> groupBy -> table) is 1.
@ -156,10 +162,10 @@ public class GroupByStrategyV2 implements GroupByStrategy
// This is same for subsequent groupBy layers, and thus the maximum number of required merge buffers becomes 2.
final DataSource dataSource = query.getDataSource();
if (foundNum == MAX_MERGE_BUFFER_NUM + 1 || !(dataSource instanceof QueryDataSource)) {
if (foundNum == MAX_MERGE_BUFFER_NUM_WITHOUT_SUBTOTAL + 1 || !(dataSource instanceof QueryDataSource)) {
return foundNum - 1;
} else {
return countRequiredMergeBufferNum(((QueryDataSource) dataSource).getQuery(), foundNum + 1);
return countRequiredMergeBufferNumWithoutSubtotal(((QueryDataSource) dataSource).getQuery(), foundNum + 1);
}
}
@ -522,11 +528,20 @@ public class GroupByStrategyV2 implements GroupByStrategy
return aggsAndPostAggs;
}
private int numMergeBuffersNeededForSubtotalsSpec(GroupByQuery query)
private static int numMergeBuffersNeededForSubtotalsSpec(GroupByQuery query)
{
List<List<String>> subtotalSpecs = query.getSubtotalsSpec();
final DataSource dataSource = query.getDataSource();
int numMergeBuffersNeededForSubQuerySubtotal = 0;
if (dataSource instanceof QueryDataSource) {
Query<?> subQuery = ((QueryDataSource) dataSource).getQuery();
if (subQuery instanceof GroupByQuery) {
numMergeBuffersNeededForSubQuerySubtotal = numMergeBuffersNeededForSubtotalsSpec((GroupByQuery) subQuery);
}
}
if (subtotalSpecs == null || subtotalSpecs.size() == 0) {
return 0;
return numMergeBuffersNeededForSubQuerySubtotal;
}
List<String> queryDimOutputNames = query.getDimensions().stream().map(DimensionSpec::getOutputName).collect(
@ -537,7 +552,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
}
}
return 1;
return Math.max(1, numMergeBuffersNeededForSubQuerySubtotal);
}
@Override

View File

@ -51,6 +51,7 @@ import org.junit.runners.Parameterized.Parameters;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@ -109,7 +110,7 @@ public class GroupByQueryMergeBufferTest extends InitializedNullHandlingTest
@Override
public int getNumMergeBuffers()
{
return 3;
return 4;
}
@Override
@ -211,10 +212,11 @@ public class GroupByQueryMergeBufferTest extends InitializedNullHandlingTest
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT))
.build();
Assert.assertEquals(0, GroupByStrategyV2.countRequiredMergeBufferNum(query));
GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query);
Assert.assertEquals(2, MERGE_BUFFER_POOL.getMinRemainBufferNum());
Assert.assertEquals(3, MERGE_BUFFER_POOL.getPoolSize());
Assert.assertEquals(3, MERGE_BUFFER_POOL.getMinRemainBufferNum());
Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize());
}
@Test
@ -239,10 +241,11 @@ public class GroupByQueryMergeBufferTest extends InitializedNullHandlingTest
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT))
.build();
Assert.assertEquals(1, GroupByStrategyV2.countRequiredMergeBufferNum(query));
GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query);
Assert.assertEquals(1, MERGE_BUFFER_POOL.getMinRemainBufferNum());
Assert.assertEquals(3, MERGE_BUFFER_POOL.getPoolSize());
Assert.assertEquals(2, MERGE_BUFFER_POOL.getMinRemainBufferNum());
Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize());
}
@Test
@ -278,11 +281,12 @@ public class GroupByQueryMergeBufferTest extends InitializedNullHandlingTest
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT))
.build();
Assert.assertEquals(2, GroupByStrategyV2.countRequiredMergeBufferNum(query));
GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query);
// This should be 0 because the broker needs 2 buffers and the queryable node needs one.
Assert.assertEquals(0, MERGE_BUFFER_POOL.getMinRemainBufferNum());
Assert.assertEquals(3, MERGE_BUFFER_POOL.getPoolSize());
// This should be 1 because the broker needs 2 buffers and the queryable node needs one.
Assert.assertEquals(1, MERGE_BUFFER_POOL.getMinRemainBufferNum());
Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize());
}
@Test
@ -331,10 +335,157 @@ public class GroupByQueryMergeBufferTest extends InitializedNullHandlingTest
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT))
.build();
Assert.assertEquals(2, GroupByStrategyV2.countRequiredMergeBufferNum(query));
GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query);
// This should be 0 because the broker needs 2 buffers and the queryable node needs one.
// This should be 1 because the broker needs 2 buffers and the queryable node needs one.
Assert.assertEquals(1, MERGE_BUFFER_POOL.getMinRemainBufferNum());
Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize());
}
@Test
public void testSimpleGroupByWithSubtotals()
{
final GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setDimensions(Arrays.asList(
DefaultDimensionSpec.of(QueryRunnerTestHelper.MARKET_DIMENSION),
DefaultDimensionSpec.of(QueryRunnerTestHelper.PLACEMENT_DIMENSION),
DefaultDimensionSpec.of(QueryRunnerTestHelper.QUALITY_DIMENSION)
))
.setGranularity(Granularities.ALL)
.setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
.setSubtotalsSpec(Arrays.asList(
Arrays.asList(QueryRunnerTestHelper.MARKET_DIMENSION, QueryRunnerTestHelper.PLACEMENT_DIMENSION),
Arrays.asList(QueryRunnerTestHelper.MARKET_DIMENSION, QueryRunnerTestHelper.PLACEMENT_DIMENSION, QueryRunnerTestHelper.QUALITY_DIMENSION)
))
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT))
.build();
Assert.assertEquals(1, GroupByStrategyV2.countRequiredMergeBufferNum(query));
GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query);
// 1 for subtotal and 1 for GroupByQueryRunnerFactory#mergeRunners
Assert.assertEquals(2, MERGE_BUFFER_POOL.getMinRemainBufferNum());
Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize());
}
@Test
public void testSimpleGroupByWithSubtotalsWithoutPrefixMatch()
{
final GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setDimensions(Arrays.asList(
DefaultDimensionSpec.of(QueryRunnerTestHelper.MARKET_DIMENSION),
DefaultDimensionSpec.of(QueryRunnerTestHelper.PLACEMENT_DIMENSION),
DefaultDimensionSpec.of(QueryRunnerTestHelper.QUALITY_DIMENSION)
))
.setGranularity(Granularities.ALL)
.setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
.setSubtotalsSpec(Arrays.asList(
Arrays.asList(QueryRunnerTestHelper.MARKET_DIMENSION, QueryRunnerTestHelper.PLACEMENT_DIMENSION),
Arrays.asList(QueryRunnerTestHelper.MARKET_DIMENSION, QueryRunnerTestHelper.QUALITY_DIMENSION)
))
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT))
.build();
Assert.assertEquals(2, GroupByStrategyV2.countRequiredMergeBufferNum(query));
GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query);
// 2 needed by subtotal and 1 for GroupByQueryRunnerFactory#mergeRunners
Assert.assertEquals(1, MERGE_BUFFER_POOL.getMinRemainBufferNum());
Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize());
}
@Test
public void testNestedGroupByWithSubtotals()
{
final GroupByQuery query = GroupByQuery
.builder()
.setDataSource(
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setGranularity(Granularities.ALL)
.setDimensions(Arrays.asList(
DefaultDimensionSpec.of("quality"),
DefaultDimensionSpec.of("market"),
DefaultDimensionSpec.of("placement")
))
.setAggregatorSpecs(Collections.singletonList(QueryRunnerTestHelper.ROWS_COUNT))
.build()
)
)
.setGranularity(Granularities.ALL)
.setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setDimensions(Arrays.asList(
DefaultDimensionSpec.of("quality"),
DefaultDimensionSpec.of("market")
))
.setSubtotalsSpec(Arrays.asList(
Collections.singletonList("quality"),
Collections.singletonList("market")
))
.setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT))
.build();
Assert.assertEquals(3, GroupByStrategyV2.countRequiredMergeBufferNum(query));
GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query);
// 2 for subtotal, 1 for nested group by and 1 for GroupByQueryRunnerFactory#mergeRunners
Assert.assertEquals(0, MERGE_BUFFER_POOL.getMinRemainBufferNum());
Assert.assertEquals(3, MERGE_BUFFER_POOL.getPoolSize());
Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize());
}
@Test
public void testNestedGroupByWithNestedSubtotals()
{
final GroupByQuery query = GroupByQuery
.builder()
.setDataSource(
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setGranularity(Granularities.ALL)
.setDimensions(Arrays.asList(
DefaultDimensionSpec.of("quality"),
DefaultDimensionSpec.of("market"),
DefaultDimensionSpec.of("placement")
))
.setSubtotalsSpec(Arrays.asList(
Collections.singletonList("quality"),
Collections.singletonList("market")
))
.setAggregatorSpecs(Collections.singletonList(QueryRunnerTestHelper.ROWS_COUNT))
.build()
)
)
.setGranularity(Granularities.ALL)
.setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setDimensions(Arrays.asList(
DefaultDimensionSpec.of("quality"),
DefaultDimensionSpec.of("market")
))
.setSubtotalsSpec(Arrays.asList(
Collections.singletonList("quality"),
Collections.singletonList("market")
))
.setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT))
.build();
Assert.assertEquals(3, GroupByStrategyV2.countRequiredMergeBufferNum(query));
GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query);
// 2 for subtotal, 1 for nested group by and 1 for GroupByQueryRunnerFactory#mergeRunners
Assert.assertEquals(0, MERGE_BUFFER_POOL.getMinRemainBufferNum());
Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize());
}
}

View File

@ -139,7 +139,10 @@ public abstract class QueryRunnerBasedOnClusteredClientTestBase
new ForegroundCachePopulator(objectMapper, new CachePopulatorStats(), 0),
new CacheConfig(),
new DruidHttpClientConfig(),
QueryStackTests.getProcessingConfig(USE_PARALLEL_MERGE_POOL_CONFIGURED),
QueryStackTests.getProcessingConfig(
USE_PARALLEL_MERGE_POOL_CONFIGURED,
DruidProcessingConfig.DEFAULT_NUM_MERGE_BUFFERS
),
ForkJoinPool.commonPool(),
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())

View File

@ -180,7 +180,10 @@ public class QueryStackTests
);
}
public static DruidProcessingConfig getProcessingConfig(boolean useParallelMergePoolConfigured)
public static DruidProcessingConfig getProcessingConfig(
boolean useParallelMergePoolConfigured,
final int mergeBuffers
)
{
return new DruidProcessingConfig()
{
@ -206,9 +209,10 @@ public class QueryStackTests
@Override
public int getNumMergeBuffers()
{
// Need 3 buffers for CalciteQueryTest.testDoubleNestedGroupby.
// Two buffers for the broker and one for the queryable.
return 3;
if (mergeBuffers == DEFAULT_NUM_MERGE_BUFFERS) {
return 2;
}
return mergeBuffers;
}
@Override
@ -230,9 +234,15 @@ public class QueryStackTests
public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate(
final Closer closer,
final boolean useParallelMergePoolConfigured
)
{
return createQueryRunnerFactoryConglomerate(closer, getProcessingConfig(useParallelMergePoolConfigured));
return createQueryRunnerFactoryConglomerate(closer,
getProcessingConfig(
useParallelMergePoolConfigured,
DruidProcessingConfig.DEFAULT_NUM_MERGE_BUFFERS
)
);
}
public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate(

View File

@ -30,6 +30,7 @@ import java.util.Objects;
public class PlannerConfig
{
public static final String CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT = "useApproximateCountDistinct";
public static final String CTX_KEY_USE_GROUPING_SET_FOR_EXACT_DISTINCT = "useGroupingSetForExactDistinct";
public static final String CTX_KEY_USE_APPROXIMATE_TOPN = "useApproximateTopN";
@JsonProperty
@ -59,6 +60,9 @@ public class PlannerConfig
@JsonProperty
private long metadataSegmentPollPeriod = 60000;
@JsonProperty
private boolean useGroupingSetForExactDistinct = false;
public long getMetadataSegmentPollPeriod()
{
return metadataSegmentPollPeriod;
@ -86,6 +90,11 @@ public class PlannerConfig
return useApproximateCountDistinct;
}
public boolean isUseGroupingSetForExactDistinct()
{
return useGroupingSetForExactDistinct;
}
public boolean isUseApproximateTopN()
{
return useApproximateTopN;
@ -125,6 +134,11 @@ public class PlannerConfig
CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT,
isUseApproximateCountDistinct()
);
newConfig.useGroupingSetForExactDistinct = getContextBoolean(
context,
CTX_KEY_USE_GROUPING_SET_FOR_EXACT_DISTINCT,
isUseGroupingSetForExactDistinct()
);
newConfig.useApproximateTopN = getContextBoolean(
context,
CTX_KEY_USE_APPROXIMATE_TOPN,

View File

@ -268,9 +268,11 @@ public class Rules
rules.addAll(ABSTRACT_RELATIONAL_RULES);
if (!plannerConfig.isUseApproximateCountDistinct()) {
// For some reason, even though we support grouping sets, using AggregateExpandDistinctAggregatesRule.INSTANCE
// here causes CalciteQueryTest#testExactCountDistinctWithGroupingAndOtherAggregators to fail.
rules.add(AggregateExpandDistinctAggregatesRule.JOIN);
if (plannerConfig.isUseGroupingSetForExactDistinct()) {
rules.add(AggregateExpandDistinctAggregatesRule.INSTANCE);
} else {
rules.add(AggregateExpandDistinctAggregatesRule.JOIN);
}
}
// Rules that we wrote.

View File

@ -426,6 +426,12 @@ public abstract class DruidAvaticaHandlerTest extends CalciteTestBase
Pair.of("TABLE_NAME", CalciteTests.SOMEXDATASOURCE),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
),
row(
Pair.of("TABLE_CAT", "druid"),
Pair.of("TABLE_NAME", CalciteTests.USERVISITDATASOURCE),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
)
),
getRows(
@ -494,6 +500,12 @@ public abstract class DruidAvaticaHandlerTest extends CalciteTestBase
Pair.of("TABLE_NAME", CalciteTests.SOMEXDATASOURCE),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
),
row(
Pair.of("TABLE_CAT", "druid"),
Pair.of("TABLE_NAME", CalciteTests.USERVISITDATASOURCE),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
)
),
getRows(

View File

@ -891,4 +891,16 @@ public class BaseCalciteQueryTest extends CalciteTestBase
newContext.put(QueryContexts.SQL_JOIN_LEFT_SCAN_DIRECT, true);
return newContext;
}
/**
* Reset the walker and conglomerate with required number of merge buffers. Default value is 2.
*/
protected void requireMergeBuffers(int numMergeBuffers) throws IOException
{
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(
resourceCloser,
QueryStackTests.getProcessingConfig(true, numMergeBuffers)
);
walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder());
}
}

View File

@ -20,12 +20,8 @@
package org.apache.druid.sql.calcite;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.AllGranularity;
import org.apache.druid.query.QueryDataSource;
@ -42,58 +38,21 @@ import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.junit.Before;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@RunWith(JUnitParamsRunner.class)
public class CalciteCorrelatedQueryTest extends BaseCalciteQueryTest
{
private static final IncrementalIndexSchema INDEX_SCHEMA = new IncrementalIndexSchema.Builder()
.withMetrics(
new CountAggregatorFactory("cnt")
)
.withRollup(false)
.withMinTimestamp(DateTimes.of("2020-12-31").getMillis())
.build();
private static final List<String> DIMENSIONS = ImmutableList.of("user", "country", "city");
@Before
public void setup() throws Exception
{
final QueryableIndex index1 = IndexBuilder
.create()
.tmpDir(new File(temporaryFolder.newFolder(), "1"))
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
.schema(INDEX_SCHEMA)
.rows(getRawRows())
.buildMMappedIndex();
final DataSegment segment = DataSegment.builder()
.dataSource("visits")
.interval(index1.getDataInterval())
.version("1")
.shardSpec(new LinearShardSpec(0))
.size(0)
.build();
walker.add(segment, index1);
}
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@ -115,12 +74,12 @@ public class CalciteCorrelatedQueryTest extends BaseCalciteQueryTest
GroupByQuery.builder()
.setDataSource(
join(
new TableDataSource("visits"),
new TableDataSource(CalciteTests.USERVISITDATASOURCE),
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(
GroupByQuery.builder()
.setDataSource("visits")
.setDataSource(CalciteTests.USERVISITDATASOURCE)
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
.setVirtualColumns(new ExpressionVirtualColumn(
"v0",
@ -222,12 +181,12 @@ public class CalciteCorrelatedQueryTest extends BaseCalciteQueryTest
GroupByQuery.builder()
.setDataSource(
join(
new TableDataSource("visits"),
new TableDataSource(CalciteTests.USERVISITDATASOURCE),
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(
GroupByQuery.builder()
.setDataSource("visits")
.setDataSource(CalciteTests.USERVISITDATASOURCE)
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
.setVirtualColumns(new ExpressionVirtualColumn(
"v0",
@ -304,7 +263,7 @@ public class CalciteCorrelatedQueryTest extends BaseCalciteQueryTest
GroupByQuery.builder()
.setDataSource(
join(
new QueryDataSource(newScanQueryBuilder().dataSource("visits")
new QueryDataSource(newScanQueryBuilder().dataSource(CalciteTests.USERVISITDATASOURCE)
.intervals(querySegmentSpec(Intervals.of(
"2021-01-01T01:00:00.000Z/2021-01-02T23:59:59.001Z")))
.filters(selector("city", "B", null))
@ -314,7 +273,7 @@ public class CalciteCorrelatedQueryTest extends BaseCalciteQueryTest
GroupByQuery.builder()
.setDataSource(
GroupByQuery.builder()
.setDataSource("visits")
.setDataSource(CalciteTests.USERVISITDATASOURCE)
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
.setVirtualColumns(new ExpressionVirtualColumn(
"v0",
@ -390,12 +349,12 @@ public class CalciteCorrelatedQueryTest extends BaseCalciteQueryTest
GroupByQuery.builder()
.setDataSource(
join(
new TableDataSource("visits"),
new TableDataSource(CalciteTests.USERVISITDATASOURCE),
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(
GroupByQuery.builder()
.setDataSource("visits")
.setDataSource(CalciteTests.USERVISITDATASOURCE)
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
.setVirtualColumns(new ExpressionVirtualColumn(
"v0",
@ -477,12 +436,12 @@ public class CalciteCorrelatedQueryTest extends BaseCalciteQueryTest
GroupByQuery.builder()
.setDataSource(
join(
new TableDataSource("visits"),
new TableDataSource(CalciteTests.USERVISITDATASOURCE),
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(
GroupByQuery.builder()
.setDataSource("visits")
.setDataSource(CalciteTests.USERVISITDATASOURCE)
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
.setVirtualColumns(new ExpressionVirtualColumn(
"v0",
@ -544,26 +503,4 @@ public class CalciteCorrelatedQueryTest extends BaseCalciteQueryTest
);
}
private List<InputRow> getRawRows()
{
return ImmutableList.of(
toRow("2021-01-01T01:00:00Z", ImmutableMap.of("user", "alice", "country", "canada", "city", "A")),
toRow("2021-01-01T02:00:00Z", ImmutableMap.of("user", "alice", "country", "canada", "city", "B")),
toRow("2021-01-01T03:00:00Z", ImmutableMap.of("user", "bob", "country", "canada", "city", "A")),
toRow("2021-01-01T04:00:00Z", ImmutableMap.of("user", "alice", "country", "India", "city", "Y")),
toRow("2021-01-02T01:00:00Z", ImmutableMap.of("user", "alice", "country", "canada", "city", "A")),
toRow("2021-01-02T02:00:00Z", ImmutableMap.of("user", "bob", "country", "canada", "city", "A")),
toRow("2021-01-02T03:00:00Z", ImmutableMap.of("user", "foo", "country", "canada", "city", "B")),
toRow("2021-01-02T04:00:00Z", ImmutableMap.of("user", "bar", "country", "canada", "city", "B")),
toRow("2021-01-02T05:00:00Z", ImmutableMap.of("user", "alice", "country", "India", "city", "X")),
toRow("2021-01-02T06:00:00Z", ImmutableMap.of("user", "bob", "country", "India", "city", "X")),
toRow("2021-01-02T07:00:00Z", ImmutableMap.of("user", "foo", "country", "India", "city", "X")),
toRow("2021-01-03T01:00:00Z", ImmutableMap.of("user", "foo", "country", "USA", "city", "M"))
);
}
private MapBasedInputRow toRow(String time, Map<String, Object> event)
{
return new MapBasedInputRow(DateTimes.ISO_DATE_OPTIONAL_TIME.parse(time), DIMENSIONS, event);
}
}

View File

@ -111,6 +111,7 @@ import org.apache.druid.sql.SqlPlanningException.PlanningError;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.CannotBuildQueryException;
import org.apache.druid.sql.calcite.util.CalciteTests;
@ -877,6 +878,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.USERVISITDATASOURCE, "TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE", "NO", "NO"})
@ -912,6 +914,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.USERVISITDATASOURCE, "TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE", "NO", "NO"})
@ -7609,6 +7612,72 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@Test
public void testMultipleExactCountDistinctWithGroupingAndOtherAggregators() throws Exception
{
requireMergeBuffers(4);
testQuery(
PLANNER_CONFIG_NO_HLL.withOverrides(ImmutableMap.of(PlannerConfig.CTX_KEY_USE_GROUPING_SET_FOR_EXACT_DISTINCT, "true")),
"SELECT FLOOR(__time to day), COUNT(distinct city), COUNT(distinct user) FROM druid.visits GROUP BY 1",
CalciteTests.REGULAR_USER_AUTH_RESULT,
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(CalciteTests.USERVISITDATASOURCE)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setVirtualColumns(expressionVirtualColumn(
"v0",
"timestamp_floor(\"__time\",'P1D',null,'UTC')",
ValueType.LONG
))
.setDimensions(dimensions(
new DefaultDimensionSpec("v0", "d0", ValueType.LONG),
new DefaultDimensionSpec("city", "d1"),
new DefaultDimensionSpec("user", "d2")
))
.setAggregatorSpecs(aggregators(new GroupingAggregatorFactory(
"a0",
Arrays.asList(
"v0",
"city",
"user"
)
)))
.setSubtotalsSpec(ImmutableList.of(
ImmutableList.of("d0", "d1"),
ImmutableList.of("d0", "d2")
))
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("d0", "_d0", ValueType.LONG)))
.setAggregatorSpecs(aggregators(
new FilteredAggregatorFactory(
new CountAggregatorFactory("_a0"),
and(not(selector("d1", null, null)), selector("a0", "1", null))
),
new FilteredAggregatorFactory(
new CountAggregatorFactory("_a1"),
and(not(selector("d2", null, null)), selector("a0", "2", null))
)
))
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{1609459200000L, 3L, 2L},
new Object[]{1609545600000L, 3L, 4L},
new Object[]{1609632000000L, 1L, 1L}
)
);
}
@Test
public void testApproxCountDistinct() throws Exception
{
@ -7760,6 +7829,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
@Test
public void testDoubleNestedGroupBy() throws Exception
{
requireMergeBuffers(3);
testQuery(
"SELECT SUM(cnt), COUNT(*) FROM (\n"
+ " SELECT dim2, SUM(t1.cnt) cnt FROM (\n"
@ -12521,6 +12591,8 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
@Test
public void testGroupingAggregatorDifferentOrder() throws Exception
{
requireMergeBuffers(3);
// Cannot vectorize due to virtual columns.
cannotVectorize();

View File

@ -35,6 +35,7 @@ import org.apache.druid.client.BrokerSegmentWatcherConfig;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.DoubleDimensionSchema;
@ -160,6 +161,7 @@ public class CalciteTests
public static final String SOME_DATASOURCE = "some_datasource";
public static final String SOME_DATSOURCE_ESCAPED = "some\\_datasource";
public static final String SOMEXDATASOURCE = "somexdatasource";
public static final String USERVISITDATASOURCE = "visits";
public static final String DRUID_SCHEMA_NAME = "druid";
public static final String INFORMATION_SCHEMA_NAME = "INFORMATION_SCHEMA";
public static final String SYSTEM_SCHEMA_NAME = "sys";
@ -365,6 +367,15 @@ public class CalciteTests
.withRollup(false)
.build();
private static final List<String> USER_VISIT_DIMS = ImmutableList.of("user", "country", "city");
private static final IncrementalIndexSchema INDEX_SCHEMA_USER_VISIT = new IncrementalIndexSchema.Builder()
.withMetrics(
new CountAggregatorFactory("cnt")
)
.withRollup(false)
.withMinTimestamp(DateTimes.of("2020-12-31").getMillis())
.build();
public static final List<ImmutableMap<String, Object>> RAW_ROWS1 = ImmutableList.of(
ImmutableMap.<String, Object>builder()
.put("t", "2000-01-01")
@ -647,6 +658,33 @@ public class CalciteTests
)
);
private static List<InputRow> USER_VISIT_ROWS = ImmutableList.of(
toRow(
"2021-01-01T01:00:00Z",
USER_VISIT_DIMS,
ImmutableMap.of("user", "alice", "country", "canada", "city", "A")
),
toRow(
"2021-01-01T02:00:00Z",
USER_VISIT_DIMS,
ImmutableMap.of("user", "alice", "country", "canada", "city", "B")
),
toRow("2021-01-01T03:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user", "bob", "country", "canada", "city", "A")),
toRow("2021-01-01T04:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user", "alice", "country", "India", "city", "Y")),
toRow(
"2021-01-02T01:00:00Z",
USER_VISIT_DIMS,
ImmutableMap.of("user", "alice", "country", "canada", "city", "A")
),
toRow("2021-01-02T02:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user", "bob", "country", "canada", "city", "A")),
toRow("2021-01-02T03:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user", "foo", "country", "canada", "city", "B")),
toRow("2021-01-02T04:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user", "bar", "country", "canada", "city", "B")),
toRow("2021-01-02T05:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user", "alice", "country", "India", "city", "X")),
toRow("2021-01-02T06:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user", "bob", "country", "India", "city", "X")),
toRow("2021-01-02T07:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user", "foo", "country", "India", "city", "X")),
toRow("2021-01-03T01:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user", "foo", "country", "USA", "city", "M"))
);
private static final InlineDataSource JOINABLE_BACKING_DATA = InlineDataSource.fromIterable(
RAW_ROWS1_WITH_NUMERIC_DIMS.stream().map(x -> new Object[]{
x.get("dim1"),
@ -856,6 +894,14 @@ public class CalciteTests
.rows(RAW_ROWS1_X)
.buildMMappedIndex();
final QueryableIndex userVisitIndex = IndexBuilder
.create()
.tmpDir(new File(tmpDir, "8"))
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
.schema(INDEX_SCHEMA)
.rows(USER_VISIT_ROWS)
.buildMMappedIndex();
return new SpecificSegmentsQuerySegmentWalker(
conglomerate,
@ -943,9 +989,23 @@ public class CalciteTests
.size(0)
.build(),
indexNumericDims
).add(
DataSegment.builder()
.dataSource(USERVISITDATASOURCE)
.interval(userVisitIndex.getDataInterval())
.version("1")
.shardSpec(new LinearShardSpec(0))
.size(0)
.build(),
userVisitIndex
);
}
private static MapBasedInputRow toRow(String time, List<String> dimensions, Map<String, Object> event)
{
return new MapBasedInputRow(DateTimes.ISO_DATE_OPTIONAL_TIME.parse(time), dimensions, event);
}
public static ExprMacroTable createExprMacroTable()
{
final List<ExprMacroTable.ExprMacro> exprMacros = new ArrayList<>();

View File

@ -1540,6 +1540,7 @@ druid.server.tier
druid.sql.planner.maxSemiJoinRowsInMemory
druid.sql.planner.sqlTimeZone
druid.sql.planner.useApproximateCountDistinct
druid.sql.planner.useGroupingSetForExactDistinct
druid.sql.planner.useApproximateTopN
error_msg
exprs
@ -1572,6 +1573,7 @@ timestamp_expr
tls_port
total_size
useApproximateCountDistinct
useGroupingSetForExactDistinct
useApproximateTopN
wikipedia
- ../docs/querying/timeseriesquery.md