SQL: Fix toTimeseriesQuery and toTopNQuery. (#4780)

The former would sometimes eat limits, and the latter would sometimes
use the wrong dimension comparator.
This commit is contained in:
Gian Merlino 2017-09-12 14:37:27 -07:00 committed by Fangjin Yang
parent 3a29521273
commit c3a1ce6933
7 changed files with 83 additions and 26 deletions

View File

@ -348,7 +348,7 @@ public class GroupByQuery extends BaseQuery<Row>
throw new IAE("When forcing limit push down, a limit spec must be provided.");
}
if (((DefaultLimitSpec) limitSpec).getLimit() == Integer.MAX_VALUE) {
if (!((DefaultLimitSpec) limitSpec).isLimited()) {
throw new IAE("When forcing limit push down, the provided limit spec must have a limit.");
}
@ -373,7 +373,7 @@ public class GroupByQuery extends BaseQuery<Row>
DefaultLimitSpec defaultLimitSpec = (DefaultLimitSpec) limitSpec;
// If only applying an orderby without a limit, don't try to push down
if (defaultLimitSpec.getLimit() == Integer.MAX_VALUE) {
if (!defaultLimitSpec.isLimited()) {
return false;
}

View File

@ -110,6 +110,11 @@ public class DefaultLimitSpec implements LimitSpec
return limit;
}
public boolean isLimited()
{
return limit < Integer.MAX_VALUE;
}
@Override
public Function<Sequence<Row>, Sequence<Row>> build(
List<DimensionSpec> dimensions,
@ -163,16 +168,16 @@ public class DefaultLimitSpec implements LimitSpec
}
if (!sortingNeeded) {
return limit == Integer.MAX_VALUE ? Functions.<Sequence<Row>>identity() : new LimitingFn(limit);
return isLimited() ? new LimitingFn(limit) : Functions.identity();
}
// Materialize the Comparator first for fast-fail error checking.
final Ordering<Row> ordering = makeComparator(dimensions, aggs, postAggs);
if (limit == Integer.MAX_VALUE) {
return new SortingFn(ordering);
} else {
if (isLimited()) {
return new TopNFunction(ordering, limit);
} else {
return new SortingFn(ordering);
}
}

View File

@ -133,6 +133,11 @@ public class Calcites
public static StringComparator getStringComparatorForSqlTypeName(SqlTypeName sqlTypeName)
{
final ValueType valueType = getValueTypeForSqlTypeName(sqlTypeName);
return getStringComparatorForValueType(valueType);
}
public static StringComparator getStringComparatorForValueType(ValueType valueType)
{
if (ValueType.isNumeric(valueType)) {
return StringComparators.NUMERIC;
} else if (ValueType.STRING == valueType) {

View File

@ -38,7 +38,6 @@ import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.having.DimFilterHavingSpec;
import io.druid.query.groupby.orderby.DefaultLimitSpec;
import io.druid.query.groupby.orderby.OrderByColumnSpec;
import io.druid.query.ordering.StringComparators;
import io.druid.query.select.PagingSpec;
import io.druid.query.select.SelectQuery;
import io.druid.query.timeseries.TimeseriesQuery;
@ -408,10 +407,12 @@ public class DruidQueryBuilder
}
final Granularity queryGranularity;
final boolean descending;
final List<DimensionSpec> dimensions = grouping.getDimensionSpecs();
if (dimensions.isEmpty()) {
queryGranularity = Granularities.ALL;
descending = false;
} else if (dimensions.size() == 1) {
final DimensionSpec dimensionSpec = Iterables.getOnlyElement(dimensions);
final Granularity gran = ExtractionFns.toQueryGranularity(dimensionSpec.getExtractionFn());
@ -419,32 +420,42 @@ public class DruidQueryBuilder
if (gran == null || !dimensionSpec.getDimension().equals(Column.TIME_COLUMN_NAME)) {
// Timeseries only applies if the single dimension is granular __time.
return null;
} else {
queryGranularity = gran;
}
// Timeseries only applies if sort is null, or if the first sort field is the time dimension.
final boolean sortingOnTime =
limitSpec == null || limitSpec.getColumns().isEmpty()
|| (limitSpec.getLimit() == Integer.MAX_VALUE
&& limitSpec.getColumns().get(0).getDimension().equals(dimensionSpec.getOutputName()));
if (limitSpec != null) {
// If there is a limit spec, timeseries cannot LIMIT; and must be ORDER BY time (or nothing).
if (sortingOnTime) {
queryGranularity = gran;
if (limitSpec.isLimited()) {
return null;
}
if (limitSpec.getColumns().isEmpty()) {
descending = false;
} else {
// We're ok if the first order by is time (since every time value is distinct, the rest of the columns
// wouldn't matter anyway).
final OrderByColumnSpec firstOrderBy = limitSpec.getColumns().get(0);
if (firstOrderBy.getDimension().equals(dimensionSpec.getOutputName())) {
// Order by time.
descending = firstOrderBy.getDirection() == OrderByColumnSpec.Direction.DESCENDING;
} else {
// Order by something else.
return null;
}
}
} else {
return null;
// No limitSpec.
descending = false;
}
} else {
// More than one dimension, timeseries cannot handle.
return null;
}
final Filtration filtration = Filtration.create(filter).optimize(sourceRowSignature);
final boolean descending;
if (limitSpec != null && !limitSpec.getColumns().isEmpty()) {
descending = limitSpec.getColumns().get(0).getDirection() == OrderByColumnSpec.Direction.DESCENDING;
} else {
descending = false;
}
final Map<String, Object> theContext = Maps.newHashMap();
theContext.put("skipEmptyBuckets", true);
theContext.putAll(plannerContext.getQueryContext());
@ -494,7 +505,7 @@ public class DruidQueryBuilder
limitColumn = new OrderByColumnSpec(
dimensionSpec.getOutputName(),
OrderByColumnSpec.Direction.ASCENDING,
StringComparators.LEXICOGRAPHIC
Calcites.getStringComparatorForValueType(dimensionSpec.getOutputType())
);
} else {
limitColumn = Iterables.getOnlyElement(limitSpec.getColumns());

View File

@ -220,7 +220,7 @@ public class DruidQueryRel extends DruidRel<DruidQueryRel>
cost += COST_PER_COLUMN * queryBuilder.getGrouping().getPostAggregators().size();
}
if (queryBuilder.getLimitSpec() != null && queryBuilder.getLimitSpec().getLimit() < Integer.MAX_VALUE) {
if (queryBuilder.getLimitSpec() != null && queryBuilder.getLimitSpec().isLimited()) {
cost *= COST_LIMIT_MULTIPLIER;
}

View File

@ -648,7 +648,7 @@ public class GroupByRules
}
}
if (!orderBys.isEmpty() || limitSpec.getLimit() < Integer.MAX_VALUE) {
if (!orderBys.isEmpty() || limitSpec.isLimited()) {
return druidRel.withQueryBuilder(
druidRel.getQueryBuilder()
.withAdjustedGrouping(

View File

@ -5197,6 +5197,42 @@ public class CalciteQueryTest
@Test
public void testTimeseriesWithLimit() throws Exception
{
testQuery(
"SELECT gran, SUM(cnt)\n"
+ "FROM (\n"
+ " SELECT floor(__time TO month) AS gran, cnt\n"
+ " FROM druid.foo\n"
+ ") AS x\n"
+ "GROUP BY gran\n"
+ "LIMIT 1",
ImmutableList.of(
new TopNQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(QSS(Filtration.eternity()))
.granularity(Granularities.ALL)
.dimension(
new ExtractionDimensionSpec(
"__time",
"d0",
ValueType.LONG,
new TimeFormatExtractionFn(null, null, null, Granularities.MONTH, true)
)
)
.aggregators(AGGS(new LongSumAggregatorFactory("a0", "cnt")))
.metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
.threshold(1)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{T("2000-01-01"), 3L}
)
);
}
@Test
public void testTimeseriesWithOrderByAndLimit() throws Exception
{
testQuery(
"SELECT gran, SUM(cnt)\n"