Count distinct returned incorrect results without useApproximateCountDistinct (#14748)

* fix grouping engine handling of summaries when result set is empty
This commit is contained in:
Zoltan Haindrich 2023-09-12 22:57:54 +02:00 committed by GitHub
parent 0f38a37b9d
commit 5d16d0edf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 380 additions and 1 deletions

View File

@ -41,7 +41,7 @@ import java.util.regex.Pattern;
public abstract class Granularity implements Cacheable
{
public static Comparator<Granularity> IS_FINER_THAN = new Comparator<Granularity>()
public static final Comparator<Granularity> IS_FINER_THAN = new Comparator<Granularity>()
{
@Override
/**
@ -215,6 +215,16 @@ public abstract class Granularity implements Cacheable
return vals;
}
/**
* Decides whether this granularity is finer than the other granularity
*
* @return true if this {@link Granularity} is finer than the passed one
*/
public boolean isFinerThan(Granularity g)
{
return IS_FINER_THAN.compare(this, g) < 0;
}
/**
* Return an iterable of granular buckets that overlap a particular interval.
*

View File

@ -49,6 +49,7 @@ public class Sequences
return (Sequence<T>) EMPTY_SEQUENCE;
}
@SafeVarargs
public static <T> Sequence<T> concat(Sequence<T>... sequences)
{
return concat(Arrays.asList(sequences));

View File

@ -36,6 +36,7 @@ import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.collect.Utils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.java.util.common.guava.Sequence;
@ -66,16 +67,20 @@ import org.apache.druid.query.groupby.orderby.NoopLimitSpec;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.join.filter.AllNullColumnSelectorFactory;
import org.apache.druid.utils.CloseableUtils;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BinaryOperator;
import java.util.stream.Collectors;
@ -436,6 +441,8 @@ public class GroupingEngine
*/
public Sequence<ResultRow> applyPostProcessing(Sequence<ResultRow> results, GroupByQuery query)
{
results = wrapSummaryRowIfNeeded(query, results);
// Don't apply limit here for inner results, that will be pushed down to the BufferHashGrouper
if (query.context().getBoolean(CTX_KEY_OUTERMOST, true)) {
return query.postProcess(results);
@ -726,4 +733,57 @@ public class GroupingEngine
return aggsAndPostAggs;
}
/**
* Wraps the sequence around if for this query a summary row might be needed in case the input becomes empty.
*/
public static Sequence<ResultRow> wrapSummaryRowIfNeeded(GroupByQuery query, Sequence<ResultRow> process)
{
if (!summaryRowPreconditions(query)) {
return process;
}
final AtomicBoolean t = new AtomicBoolean();
return Sequences.concat(
Sequences.map(process, ent -> {
t.set(true);
return ent;
}),
Sequences.simple(() -> {
if (t.get()) {
return Collections.emptyIterator();
}
return summaryRowIterator(query);
}));
}
private static boolean summaryRowPreconditions(GroupByQuery query)
{
LimitSpec limit = query.getLimitSpec();
if (limit instanceof DefaultLimitSpec) {
DefaultLimitSpec limitSpec = (DefaultLimitSpec) limit;
if (limitSpec.getLimit() == 0 || limitSpec.getOffset() > 0) {
return false;
}
}
if (!query.getDimensions().isEmpty()) {
return false;
}
if (query.getGranularity().isFinerThan(Granularities.ALL)) {
return false;
}
return true;
}
private static Iterator<ResultRow> summaryRowIterator(GroupByQuery q)
{
List<AggregatorFactory> aggSpec = q.getAggregatorSpecs();
Object[] values = new Object[aggSpec.size()];
for (int i = 0; i < aggSpec.size(); i++) {
values[i] = aggSpec.get(i).factorize(new AllNullColumnSelectorFactory()).get();
}
return Collections.singleton(ResultRow.of(values)).iterator();
}
}

View File

@ -12949,6 +12949,157 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
TestHelper.assertExpectedObjects(expectedResults, results, "groupBy");
}
@Test
public void testSummaryrowForEmptyInput()
{
GroupByQuery query = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setDimFilter(new SelectorDimFilter("placementish", "xxa", null))
.setAggregatorSpecs(
QueryRunnerTestHelper.ROWS_COUNT,
new LongSumAggregatorFactory("idx", "index"),
new FloatSumAggregatorFactory("idxFloat", "indexFloat"),
new DoubleSumAggregatorFactory("idxDouble", "index")
)
.setGranularity(QueryRunnerTestHelper.ALL_GRAN)
.build();
List<ResultRow> expectedResults = ImmutableList.of(
makeRow(
query,
"2011-04-01",
"rows",
0L,
"idx",
NullHandling.replaceWithDefault() ? 0L : null,
"idxFloat",
NullHandling.replaceWithDefault() ? 0.0 : null,
"idxDouble",
NullHandling.replaceWithDefault() ? 0.0 : null
)
);
StubServiceEmitter serviceEmitter = new StubServiceEmitter("", "");
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQueryWithEmitter(
factory,
originalRunner,
query,
serviceEmitter
);
serviceEmitter.verifyEmitted("query/wait/time", ImmutableMap.of("vectorized", vectorize), 1);
TestHelper.assertExpectedObjects(expectedResults, results, "groupBy");
}
@Test
public void testSummaryrowFilteredByHaving()
{
GroupByQuery query = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setDimFilter(new SelectorDimFilter("placementish", "xxa", null))
.setHavingSpec(new GreaterThanHavingSpec("rows", 99L))
.setAggregatorSpecs(
QueryRunnerTestHelper.ROWS_COUNT,
new LongSumAggregatorFactory("idx", "index"),
new FloatSumAggregatorFactory("idxFloat", "indexFloat"),
new DoubleSumAggregatorFactory("idxDouble", "index")
)
.setGranularity(QueryRunnerTestHelper.ALL_GRAN)
.build();
List<ResultRow> expectedResults = ImmutableList.of();
StubServiceEmitter serviceEmitter = new StubServiceEmitter("", "");
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQueryWithEmitter(
factory,
originalRunner,
query,
serviceEmitter
);
serviceEmitter.verifyEmitted("query/wait/time", ImmutableMap.of("vectorized", vectorize), 1);
TestHelper.assertExpectedObjects(expectedResults, results, "groupBy");
}
@Test
public void testSummaryrowForEmptySubqueryInput()
{
GroupByQuery subquery = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setDimFilter(new SelectorDimFilter("placementish", "xxa", null))
.setGranularity(QueryRunnerTestHelper.DAY_GRAN)
.build();
GroupByQuery query = makeQueryBuilder()
.setDataSource(subquery)
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setAggregatorSpecs(
QueryRunnerTestHelper.ROWS_COUNT,
new LongSumAggregatorFactory("idx", "index"),
new FloatSumAggregatorFactory("idxFloat", "indexFloat"),
new DoubleSumAggregatorFactory("idxDouble", "index")
)
.setGranularity(QueryRunnerTestHelper.ALL_GRAN)
.build();
List<ResultRow> expectedResults = ImmutableList.of(
makeRow(
query,
"2011-04-01",
"rows",
0L,
"idx",
NullHandling.replaceWithDefault() ? 0L : null,
"idxFloat",
NullHandling.replaceWithDefault() ? 0.0 : null,
"idxDouble",
NullHandling.replaceWithDefault() ? 0.0 : null
)
);
StubServiceEmitter serviceEmitter = new StubServiceEmitter("", "");
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQueryWithEmitter(
factory,
originalRunner,
query,
serviceEmitter
);
serviceEmitter.verifyEmitted("query/wait/time", ImmutableMap.of("vectorized", vectorize), 1);
TestHelper.assertExpectedObjects(expectedResults, results, "groupBy");
}
@Test
public void testSummaryrowForEmptyInputByDay()
{
GroupByQuery query = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setDimFilter(new SelectorDimFilter("placementish", "xxa", null))
.setAggregatorSpecs(
QueryRunnerTestHelper.ROWS_COUNT,
new LongSumAggregatorFactory("idx", "index"),
new FloatSumAggregatorFactory("idxFloat", "indexFloat"),
new DoubleSumAggregatorFactory("idxDouble", "index")
)
.setGranularity(QueryRunnerTestHelper.DAY_GRAN)
.build();
List<ResultRow> expectedResults = ImmutableList.of();
StubServiceEmitter serviceEmitter = new StubServiceEmitter("", "");
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQueryWithEmitter(
factory,
originalRunner,
query,
serviceEmitter
);
serviceEmitter.verifyEmitted("query/wait/time", ImmutableMap.of("vectorized", vectorize), 1);
TestHelper.assertExpectedObjects(expectedResults, results, "groupBy");
}
private static ResultRow makeRow(final GroupByQuery query, final String timestamp, final Object... vals)
{
return GroupByQueryRunnerTestHelper.createExpectedRow(query, timestamp, vals);

View File

@ -20,6 +20,7 @@
package org.apache.druid.sql.calcite;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.DateTimes;
@ -46,6 +47,7 @@ import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.joda.time.DateTime;
@ -1946,4 +1948,159 @@ public class CalciteSelectQueryTest extends BaseCalciteQueryTest
.build()),
ImmutableList.of(new Object[] {0L}));
}
@Test
public void testCountDistinctNonApproximateEmptySet()
{
cannotVectorize();
testQuery(
PLANNER_CONFIG_DEFAULT.withOverrides(
ImmutableMap.of(
PlannerConfig.CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT, false)),
"select count(distinct m1) from druid.foo where m1 < -1.0",
CalciteTests.REGULAR_USER_AUTH_RESULT,
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(
dimensions(
new DefaultDimensionSpec("m1", "d0", ColumnType.FLOAT)))
.setDimFilter(
range("m1", ColumnType.DOUBLE, null, -1.0, false, true))
.build())
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setAggregatorSpecs(aggregators(
useDefault
? new CountAggregatorFactory("a0")
: new FilteredAggregatorFactory(
new CountAggregatorFactory("a0"),
notNull("d0"))))
.build()
),
ImmutableList.of(new Object[] {0L}));
}
@Test
public void testCountDistinctNonApproximateBasic()
{
cannotVectorize();
testQuery(
PLANNER_CONFIG_DEFAULT.withOverrides(
ImmutableMap.of(
PlannerConfig.CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT, false)),
"select count(distinct m1) from druid.foo where m1 < 111.0",
CalciteTests.REGULAR_USER_AUTH_RESULT,
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(
dimensions(
new DefaultDimensionSpec("m1", "d0", ColumnType.FLOAT)))
.setDimFilter(
range("m1", ColumnType.DOUBLE, null, 111.0, false, true))
.build())
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setAggregatorSpecs(aggregators(useDefault
? new CountAggregatorFactory("a0")
: new FilteredAggregatorFactory(
new CountAggregatorFactory("a0"),
notNull("d0"))))
.build()
),
ImmutableList.of(new Object[] {6L}));
}
@Test
public void testCountDistinctNonApproximateWithFilter()
{
cannotVectorize();
testQuery(
PLANNER_CONFIG_DEFAULT.withOverrides(
ImmutableMap.of(
PlannerConfig.CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT, false)),
"select count(distinct m1) FILTER (where m1 < -1.0) from druid.foo",
CalciteTests.REGULAR_USER_AUTH_RESULT,
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(
dimensions(
new DefaultDimensionSpec("v0", "d0", ColumnType.FLOAT)))
.setVirtualColumns(
expressionVirtualColumn("v0", "case_searched((\"m1\" < -1.0),\"m1\",null)",
ColumnType.FLOAT))
.build())
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setAggregatorSpecs(aggregators(
useDefault
? new CountAggregatorFactory("a0")
: new FilteredAggregatorFactory(
new CountAggregatorFactory("a0"),
notNull("d0"))))
.build()
),
// returning 1 is incorrect result; but with nulls as default that should be expected
ImmutableList.of(new Object[] {useDefault ? 1L : 0L}));
}
@Test
public void testCountDistinctNonApproximateWithFilterHaving()
{
cannotVectorize();
testQuery(
PLANNER_CONFIG_DEFAULT.withOverrides(
ImmutableMap.of(
PlannerConfig.CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT, false)),
"select count(distinct m1) FILTER (where m1 < -1.0) c from druid.foo HAVING c > 3",
CalciteTests.REGULAR_USER_AUTH_RESULT,
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(
dimensions(
new DefaultDimensionSpec("v0", "d0", ColumnType.FLOAT)))
.setVirtualColumns(
expressionVirtualColumn("v0", "case_searched((\"m1\" < -1.0),\"m1\",null)",
ColumnType.FLOAT))
.build())
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setHavingSpec(having(
range("a0", ColumnType.LONG, 3L, null, true, false)
))
.setAggregatorSpecs(aggregators(
useDefault
? new CountAggregatorFactory("a0")
: new FilteredAggregatorFactory(
new CountAggregatorFactory("a0"),
notNull("d0"))))
.build()
),
ImmutableList.of());
}
}