diff --git a/processing/src/main/java/org/apache/druid/java/util/common/granularity/Granularity.java b/processing/src/main/java/org/apache/druid/java/util/common/granularity/Granularity.java index ca307886a79..572467f7c74 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/granularity/Granularity.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/granularity/Granularity.java @@ -41,7 +41,7 @@ import java.util.regex.Pattern; public abstract class Granularity implements Cacheable { - public static Comparator IS_FINER_THAN = new Comparator() + public static final Comparator IS_FINER_THAN = new Comparator() { @Override /** @@ -215,6 +215,16 @@ public abstract class Granularity implements Cacheable return vals; } + /** + * Decides whether this granularity is finer than the other granularity + * + * @return true if this {@link Granularity} is finer than the passed one + */ + public boolean isFinerThan(Granularity g) + { + return IS_FINER_THAN.compare(this, g) < 0; + } + /** * Return an iterable of granular buckets that overlap a particular interval. * diff --git a/processing/src/main/java/org/apache/druid/java/util/common/guava/Sequences.java b/processing/src/main/java/org/apache/druid/java/util/common/guava/Sequences.java index 3dc44cb063b..9f8169434af 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/guava/Sequences.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/guava/Sequences.java @@ -49,6 +49,7 @@ public class Sequences return (Sequence) EMPTY_SEQUENCE; } + @SafeVarargs public static Sequence concat(Sequence... sequences) { return concat(Arrays.asList(sequences)); diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java index 112f6ea25ed..b79c4358a3d 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java @@ -36,6 +36,7 @@ import org.apache.druid.guice.annotations.Smile; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.collect.Utils; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.guava.LazySequence; import org.apache.druid.java.util.common.guava.Sequence; @@ -66,16 +67,20 @@ import org.apache.druid.query.groupby.orderby.NoopLimitSpec; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.join.filter.AllNullColumnSelectorFactory; import org.apache.druid.utils.CloseableUtils; import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BinaryOperator; import java.util.stream.Collectors; @@ -436,6 +441,8 @@ public class GroupingEngine */ public Sequence applyPostProcessing(Sequence results, GroupByQuery query) { + results = wrapSummaryRowIfNeeded(query, results); + // Don't apply limit here for inner results, that will be pushed down to the BufferHashGrouper if (query.context().getBoolean(CTX_KEY_OUTERMOST, true)) { return query.postProcess(results); @@ -726,4 +733,57 @@ public class GroupingEngine return aggsAndPostAggs; } + + /** + * Wraps the sequence around if for this query a summary row might be needed in case the input becomes empty. + */ + public static Sequence wrapSummaryRowIfNeeded(GroupByQuery query, Sequence process) + { + if (!summaryRowPreconditions(query)) { + return process; + } + + final AtomicBoolean t = new AtomicBoolean(); + + return Sequences.concat( + Sequences.map(process, ent -> { + t.set(true); + return ent; + }), + Sequences.simple(() -> { + if (t.get()) { + return Collections.emptyIterator(); + } + return summaryRowIterator(query); + })); + } + + private static boolean summaryRowPreconditions(GroupByQuery query) + { + LimitSpec limit = query.getLimitSpec(); + if (limit instanceof DefaultLimitSpec) { + DefaultLimitSpec limitSpec = (DefaultLimitSpec) limit; + if (limitSpec.getLimit() == 0 || limitSpec.getOffset() > 0) { + return false; + } + } + if (!query.getDimensions().isEmpty()) { + return false; + } + if (query.getGranularity().isFinerThan(Granularities.ALL)) { + return false; + } + return true; + } + + private static Iterator summaryRowIterator(GroupByQuery q) + { + List aggSpec = q.getAggregatorSpecs(); + Object[] values = new Object[aggSpec.size()]; + for (int i = 0; i < aggSpec.size(); i++) { + values[i] = aggSpec.get(i).factorize(new AllNullColumnSelectorFactory()).get(); + } + return Collections.singleton(ResultRow.of(values)).iterator(); + } + } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 033f64bcb9c..c8f5aa7dfca 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -12949,6 +12949,157 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest TestHelper.assertExpectedObjects(expectedResults, results, "groupBy"); } + @Test + public void testSummaryrowForEmptyInput() + { + GroupByQuery query = makeQueryBuilder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimFilter(new SelectorDimFilter("placementish", "xxa", null)) + .setAggregatorSpecs( + QueryRunnerTestHelper.ROWS_COUNT, + new LongSumAggregatorFactory("idx", "index"), + new FloatSumAggregatorFactory("idxFloat", "indexFloat"), + new DoubleSumAggregatorFactory("idxDouble", "index") + ) + .setGranularity(QueryRunnerTestHelper.ALL_GRAN) + .build(); + + List expectedResults = ImmutableList.of( + makeRow( + query, + "2011-04-01", + "rows", + 0L, + "idx", + NullHandling.replaceWithDefault() ? 0L : null, + "idxFloat", + NullHandling.replaceWithDefault() ? 0.0 : null, + "idxDouble", + NullHandling.replaceWithDefault() ? 0.0 : null + ) + ); + + StubServiceEmitter serviceEmitter = new StubServiceEmitter("", ""); + Iterable results = GroupByQueryRunnerTestHelper.runQueryWithEmitter( + factory, + originalRunner, + query, + serviceEmitter + ); + serviceEmitter.verifyEmitted("query/wait/time", ImmutableMap.of("vectorized", vectorize), 1); + TestHelper.assertExpectedObjects(expectedResults, results, "groupBy"); + } + + @Test + public void testSummaryrowFilteredByHaving() + { + GroupByQuery query = makeQueryBuilder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimFilter(new SelectorDimFilter("placementish", "xxa", null)) + .setHavingSpec(new GreaterThanHavingSpec("rows", 99L)) + .setAggregatorSpecs( + QueryRunnerTestHelper.ROWS_COUNT, + new LongSumAggregatorFactory("idx", "index"), + new FloatSumAggregatorFactory("idxFloat", "indexFloat"), + new DoubleSumAggregatorFactory("idxDouble", "index") + ) + .setGranularity(QueryRunnerTestHelper.ALL_GRAN) + .build(); + + List expectedResults = ImmutableList.of(); + + StubServiceEmitter serviceEmitter = new StubServiceEmitter("", ""); + Iterable results = GroupByQueryRunnerTestHelper.runQueryWithEmitter( + factory, + originalRunner, + query, + serviceEmitter + ); + serviceEmitter.verifyEmitted("query/wait/time", ImmutableMap.of("vectorized", vectorize), 1); + TestHelper.assertExpectedObjects(expectedResults, results, "groupBy"); + } + + + @Test + public void testSummaryrowForEmptySubqueryInput() + { + GroupByQuery subquery = makeQueryBuilder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimFilter(new SelectorDimFilter("placementish", "xxa", null)) + .setGranularity(QueryRunnerTestHelper.DAY_GRAN) + .build(); + + GroupByQuery query = makeQueryBuilder() + .setDataSource(subquery) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setAggregatorSpecs( + QueryRunnerTestHelper.ROWS_COUNT, + new LongSumAggregatorFactory("idx", "index"), + new FloatSumAggregatorFactory("idxFloat", "indexFloat"), + new DoubleSumAggregatorFactory("idxDouble", "index") + ) + .setGranularity(QueryRunnerTestHelper.ALL_GRAN) + .build(); + + List expectedResults = ImmutableList.of( + makeRow( + query, + "2011-04-01", + "rows", + 0L, + "idx", + NullHandling.replaceWithDefault() ? 0L : null, + "idxFloat", + NullHandling.replaceWithDefault() ? 0.0 : null, + "idxDouble", + NullHandling.replaceWithDefault() ? 0.0 : null + ) + ); + + StubServiceEmitter serviceEmitter = new StubServiceEmitter("", ""); + Iterable results = GroupByQueryRunnerTestHelper.runQueryWithEmitter( + factory, + originalRunner, + query, + serviceEmitter + ); + serviceEmitter.verifyEmitted("query/wait/time", ImmutableMap.of("vectorized", vectorize), 1); + TestHelper.assertExpectedObjects(expectedResults, results, "groupBy"); + } + + + @Test + public void testSummaryrowForEmptyInputByDay() + { + GroupByQuery query = makeQueryBuilder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimFilter(new SelectorDimFilter("placementish", "xxa", null)) + .setAggregatorSpecs( + QueryRunnerTestHelper.ROWS_COUNT, + new LongSumAggregatorFactory("idx", "index"), + new FloatSumAggregatorFactory("idxFloat", "indexFloat"), + new DoubleSumAggregatorFactory("idxDouble", "index") + ) + .setGranularity(QueryRunnerTestHelper.DAY_GRAN) + .build(); + + List expectedResults = ImmutableList.of(); + + StubServiceEmitter serviceEmitter = new StubServiceEmitter("", ""); + Iterable results = GroupByQueryRunnerTestHelper.runQueryWithEmitter( + factory, + originalRunner, + query, + serviceEmitter + ); + serviceEmitter.verifyEmitted("query/wait/time", ImmutableMap.of("vectorized", vectorize), 1); + TestHelper.assertExpectedObjects(expectedResults, results, "groupBy"); + } + private static ResultRow makeRow(final GroupByQuery query, final String timestamp, final Object... vals) { return GroupByQueryRunnerTestHelper.createExpectedRow(query, timestamp, vals); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSelectQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSelectQueryTest.java index c90a961d96a..a7ad956baee 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSelectQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSelectQueryTest.java @@ -20,6 +20,7 @@ package org.apache.druid.sql.calcite; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import org.apache.druid.common.config.NullHandling; import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.DateTimes; @@ -46,6 +47,7 @@ import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.virtual.ExpressionVirtualColumn; import org.apache.druid.sql.calcite.filtration.Filtration; +import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.util.CalciteTests; import org.joda.time.DateTime; @@ -1946,4 +1948,159 @@ public class CalciteSelectQueryTest extends BaseCalciteQueryTest .build()), ImmutableList.of(new Object[] {0L})); } + + @Test + public void testCountDistinctNonApproximateEmptySet() + { + cannotVectorize(); + testQuery( + PLANNER_CONFIG_DEFAULT.withOverrides( + ImmutableMap.of( + PlannerConfig.CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT, false)), + "select count(distinct m1) from druid.foo where m1 < -1.0", + CalciteTests.REGULAR_USER_AUTH_RESULT, + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions( + dimensions( + new DefaultDimensionSpec("m1", "d0", ColumnType.FLOAT))) + .setDimFilter( + range("m1", ColumnType.DOUBLE, null, -1.0, false, true)) + .build()) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs(aggregators( + useDefault + ? new CountAggregatorFactory("a0") + : new FilteredAggregatorFactory( + new CountAggregatorFactory("a0"), + notNull("d0")))) + .build() + + ), + ImmutableList.of(new Object[] {0L})); + } + + @Test + public void testCountDistinctNonApproximateBasic() + { + cannotVectorize(); + testQuery( + PLANNER_CONFIG_DEFAULT.withOverrides( + ImmutableMap.of( + PlannerConfig.CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT, false)), + "select count(distinct m1) from druid.foo where m1 < 111.0", + CalciteTests.REGULAR_USER_AUTH_RESULT, + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions( + dimensions( + new DefaultDimensionSpec("m1", "d0", ColumnType.FLOAT))) + .setDimFilter( + range("m1", ColumnType.DOUBLE, null, 111.0, false, true)) + .build()) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs(aggregators(useDefault + ? new CountAggregatorFactory("a0") + : new FilteredAggregatorFactory( + new CountAggregatorFactory("a0"), + notNull("d0")))) + .build() + + ), + ImmutableList.of(new Object[] {6L})); + } + + @Test + public void testCountDistinctNonApproximateWithFilter() + { + cannotVectorize(); + + testQuery( + PLANNER_CONFIG_DEFAULT.withOverrides( + ImmutableMap.of( + PlannerConfig.CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT, false)), + "select count(distinct m1) FILTER (where m1 < -1.0) from druid.foo", + CalciteTests.REGULAR_USER_AUTH_RESULT, + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions( + dimensions( + new DefaultDimensionSpec("v0", "d0", ColumnType.FLOAT))) + .setVirtualColumns( + expressionVirtualColumn("v0", "case_searched((\"m1\" < -1.0),\"m1\",null)", + ColumnType.FLOAT)) + .build()) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs(aggregators( + useDefault + ? new CountAggregatorFactory("a0") + : new FilteredAggregatorFactory( + new CountAggregatorFactory("a0"), + notNull("d0")))) + .build() + + ), + // returning 1 is incorrect result; but with nulls as default that should be expected + ImmutableList.of(new Object[] {useDefault ? 1L : 0L})); + } + + @Test + public void testCountDistinctNonApproximateWithFilterHaving() + { + cannotVectorize(); + + testQuery( + PLANNER_CONFIG_DEFAULT.withOverrides( + ImmutableMap.of( + PlannerConfig.CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT, false)), + "select count(distinct m1) FILTER (where m1 < -1.0) c from druid.foo HAVING c > 3", + CalciteTests.REGULAR_USER_AUTH_RESULT, + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions( + dimensions( + new DefaultDimensionSpec("v0", "d0", ColumnType.FLOAT))) + .setVirtualColumns( + expressionVirtualColumn("v0", "case_searched((\"m1\" < -1.0),\"m1\",null)", + ColumnType.FLOAT)) + .build()) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setHavingSpec(having( + range("a0", ColumnType.LONG, 3L, null, true, false) + )) + .setAggregatorSpecs(aggregators( + useDefault + ? new CountAggregatorFactory("a0") + : new FilteredAggregatorFactory( + new CountAggregatorFactory("a0"), + notNull("d0")))) + .build() + + ), + ImmutableList.of()); + } }