fix bug with sqlOuterLimit, use sqlOuterLimit in web console (#8919)

* fix bug with sqlOuterLimit, use sqlOuterLimit instead of wrapping sql query for web console

* fixes, refactors, tests

* meh

* better name

* fix comment location

* fix copy and paste
This commit is contained in:
Clint Wylie 2019-12-03 18:36:28 -08:00 committed by Gian Merlino
parent 187cf0dd3f
commit d0a6fe7f12
6 changed files with 245 additions and 88 deletions

View File

@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import com.google.common.io.BaseEncoding;
import com.google.common.primitives.Chars;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexLiteral;
@ -378,4 +379,37 @@ public class Calcites
{
return StringUtils.format("%s:%s", prefix, suffix);
}
public static int getInt(RexNode rex, int defaultValue)
{
return rex == null ? defaultValue : RexLiteral.intValue(rex);
}
public static int getOffset(Sort sort)
{
return Calcites.getInt(sort.offset, 0);
}
public static int getFetch(Sort sort)
{
return Calcites.getInt(sort.fetch, -1);
}
public static int collapseFetch(int innerFetch, int outerFetch, int outerOffset)
{
final int fetch;
if (innerFetch < 0 && outerFetch < 0) {
// Neither has a limit => no limit overall.
fetch = -1;
} else if (innerFetch < 0) {
// Outer limit only.
fetch = outerFetch;
} else if (outerFetch < 0) {
// Inner limit only.
fetch = Math.max(0, innerFetch - outerOffset);
} else {
fetch = Math.max(0, Math.min(innerFetch - outerOffset, outerFetch));
}
return fetch;
}
}

View File

@ -19,11 +19,11 @@
package org.apache.druid.sql.calcite.planner;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.primitives.Ints;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.interpreter.BindableConvention;
@ -35,6 +35,7 @@ import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
@ -139,31 +140,21 @@ public class DruidPlanner implements Closeable
if (explain != null) {
return planExplanation(druidRel, explain, dataSourceNames);
} else {
final Supplier<Sequence<Object[]>> resultsSupplier = new Supplier<Sequence<Object[]>>()
{
@Override
public Sequence<Object[]> get()
{
if (root.isRefTrivial()) {
return druidRel.runQuery();
} else {
// Add a mapping on top to accommodate root.fields.
return Sequences.map(
druidRel.runQuery(),
new Function<Object[], Object[]>()
{
@Override
public Object[] apply(final Object[] input)
{
final Object[] retVal = new Object[root.fields.size()];
for (int i = 0; i < root.fields.size(); i++) {
retVal[i] = input[root.fields.get(i).getKey()];
}
return retVal;
}
final Supplier<Sequence<Object[]>> resultsSupplier = () -> {
if (root.isRefTrivial()) {
return druidRel.runQuery();
} else {
// Add a mapping on top to accommodate root.fields.
return Sequences.map(
druidRel.runQuery(),
input -> {
final Object[] retVal = new Object[root.fields.size()];
for (int i = 0; i < root.fields.size(); i++) {
retVal[i] = input[root.fields.get(i).getKey()];
}
);
}
return retVal;
}
);
}
};
@ -212,9 +203,9 @@ public class DruidPlanner implements Closeable
new BaseSequence.IteratorMaker<Object[], EnumeratorIterator<Object[]>>()
{
@Override
public EnumeratorIterator make()
public EnumeratorIterator<Object[]> make()
{
return new EnumeratorIterator(new Iterator<Object[]>()
return new EnumeratorIterator<>(new Iterator<Object[]>()
{
@Override
public boolean hasNext()
@ -236,16 +227,19 @@ public class DruidPlanner implements Closeable
}
}
), () -> enumerator.close());
), enumerator::close);
};
return new PlannerResult(resultsSupplier, root.validatedRowType, ImmutableSet.of());
}
}
/**
* This method wraps the root with a logical sort that applies a limit (no ordering change).
* The CTX_SQL_OUTER_LIMIT flag that controls this wrapping is meant for internal use only by the
* web console, allowing it to apply a limit to queries without rewriting the original SQL.
* This method wraps the root with a {@link LogicalSort} that applies a limit (no ordering change). If the outer rel
* is already a {@link Sort}, we can merge our outerLimit into it, similar to what is going on in
* {@link org.apache.druid.sql.calcite.rule.SortCollapseRule}.
*
* The {@link PlannerContext#CTX_SQL_OUTER_LIMIT} flag that controls this wrapping is meant for internal use only by
* the web console, allowing it to apply a limit to queries without rewriting the original SQL.
*
* @param root root node
* @return root node wrapped with a limiting logical sort if a limit is specified in the query context.
@ -261,6 +255,23 @@ public class DruidPlanner implements Closeable
return root.rel;
}
if (root.rel instanceof Sort) {
Sort innerSort = (Sort) root.rel;
final int offset = Calcites.getOffset(innerSort);
final int fetch = Calcites.collapseFetch(
Calcites.getFetch(innerSort),
Ints.checkedCast(outerLimit),
0
);
return LogicalSort.create(
innerSort.getInput(),
innerSort.collation,
makeBigIntLiteral(offset),
makeBigIntLiteral(fetch)
);
}
return LogicalSort.create(
root.rel,
root.collation,
@ -282,7 +293,7 @@ public class DruidPlanner implements Closeable
{
private final Iterator<T> it;
public EnumeratorIterator(Iterator<T> it)
EnumeratorIterator(Iterator<T> it)
{
this.it = it;
}

View File

@ -22,7 +22,7 @@ package org.apache.druid.sql.calcite.rule;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rex.RexLiteral;
import org.apache.druid.sql.calcite.planner.Calcites;
/**
* Collapses two adjacent Sort operations together. Useful for queries like
@ -45,48 +45,30 @@ public class SortCollapseRule extends RelOptRule
@Override
public void onMatch(final RelOptRuleCall call)
{
// First is the inner sort, second is the outer sort.
final Sort first = call.rel(1);
final Sort second = call.rel(0);
final Sort outerSort = call.rel(0);
final Sort innerSort = call.rel(1);
if (outerSort.collation.getFieldCollations().isEmpty()
|| outerSort.collation.getFieldCollations().equals(innerSort.collation.getFieldCollations())) {
final int innerOffset = Calcites.getOffset(innerSort);
final int innerFetch = Calcites.getFetch(innerSort);
final int outerOffset = Calcites.getOffset(outerSort);
final int outerFetch = Calcites.getFetch(outerSort);
if (second.collation.getFieldCollations().isEmpty()
|| second.collation.getFieldCollations().equals(first.collation.getFieldCollations())) {
// Add up the offsets.
final int firstOffset = (first.offset != null ? RexLiteral.intValue(first.offset) : 0);
final int secondOffset = (second.offset != null ? RexLiteral.intValue(second.offset) : 0);
final int offset = innerOffset + outerOffset;
final int fetch = Calcites.collapseFetch(innerFetch, outerFetch, outerOffset);
final int offset = firstOffset + secondOffset;
final int fetch;
if (first.fetch == null && second.fetch == null) {
// Neither has a limit => no limit overall.
fetch = -1;
} else if (first.fetch == null) {
// Outer limit only.
fetch = RexLiteral.intValue(second.fetch);
} else if (second.fetch == null) {
// Inner limit only.
fetch = Math.max(0, RexLiteral.intValue(first.fetch) - secondOffset);
} else {
fetch = Math.max(
0,
Math.min(
RexLiteral.intValue(first.fetch) - secondOffset,
RexLiteral.intValue(second.fetch)
)
);
}
final Sort combined = first.copy(
first.getTraitSet(),
first.getInput(),
first.getCollation(),
final Sort combined = innerSort.copy(
innerSort.getTraitSet(),
innerSort.getInput(),
innerSort.getCollation(),
offset == 0 ? null : call.builder().literal(offset),
fetch < 0 ? null : call.builder().literal(fetch)
);
call.transformTo(combined);
call.getPlanner().setImportance(second, 0.0);
call.getPlanner().setImportance(outerSort, 0.0);
}
}
}

View File

@ -214,6 +214,8 @@ public class BaseCalciteQueryTest extends CalciteTestBase
// Matches QUERY_CONTEXT_LOS_ANGELES
public static final Map<String, Object> TIMESERIES_CONTEXT_LOS_ANGELES = new HashMap<>();
public static final Map<String, Object> OUTER_LIMIT_CONTEXT = new HashMap<>(QUERY_CONTEXT_DEFAULT);
public static QueryRunnerFactoryConglomerate conglomerate;
public static Closer resourceCloser;
@ -236,6 +238,8 @@ public class BaseCalciteQueryTest extends CalciteTestBase
TIMESERIES_CONTEXT_LOS_ANGELES.put("skipEmptyBuckets", true);
TIMESERIES_CONTEXT_LOS_ANGELES.put(QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS);
TIMESERIES_CONTEXT_LOS_ANGELES.put(QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE);
OUTER_LIMIT_CONTEXT.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 2);
}
// Generate timestamps for expected results

View File

@ -786,13 +786,14 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
new Object[]{"10.1"}
)
);
}
// The outer limit wrapping behavior that was used in the query above can be applied with a context flag instead
Map<String, Object> outerLimitContext = new HashMap<>(QUERY_CONTEXT_DEFAULT);
outerLimitContext.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 2);
@Test
public void testSelectLimitWrapping() throws Exception
{
testQuery(
"SELECT dim1 FROM druid.foo ORDER BY __time DESC",
outerLimitContext,
OUTER_LIMIT_CONTEXT,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
@ -801,7 +802,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.limit(2)
.order(ScanQuery.Order.DESCENDING)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(outerLimitContext)
.context(OUTER_LIMIT_CONTEXT)
.build()
),
ImmutableList.of(
@ -811,6 +812,138 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@Test
public void testTopNLimitWrapping() throws Exception
{
List<Object[]> expected;
if (NullHandling.replaceWithDefault()) {
expected = ImmutableList.of(
new Object[]{"", 1L},
new Object[]{"def", 1L}
);
} else {
expected = ImmutableList.of(
new Object[]{"def", 1L},
new Object[]{"abc", 1L}
);
}
testQuery(
"SELECT dim1, COUNT(*) FROM druid.foo GROUP BY dim1 ORDER BY dim1 DESC",
OUTER_LIMIT_CONTEXT,
ImmutableList.of(
new TopNQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.dimension(new DefaultDimensionSpec("dim1", "d0", ValueType.STRING))
.threshold(2)
.aggregators(aggregators(new CountAggregatorFactory("a0")))
.metric(
new InvertedTopNMetricSpec(
new DimensionTopNMetricSpec(null, StringComparators.LEXICOGRAPHIC)
)
)
.context(OUTER_LIMIT_CONTEXT)
.build()
),
expected
);
}
@Test
public void testTopNLimitWrappingOrderByAgg() throws Exception
{
testQuery(
"SELECT dim1, COUNT(*) FROM druid.foo GROUP BY 1 ORDER BY 2 DESC",
OUTER_LIMIT_CONTEXT,
ImmutableList.of(
new TopNQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.dimension(new DefaultDimensionSpec("dim1", "d0", ValueType.STRING))
.threshold(2)
.aggregators(aggregators(new CountAggregatorFactory("a0")))
.metric("a0")
.context(OUTER_LIMIT_CONTEXT)
.build()
),
ImmutableList.of(new Object[]{"", 1L}, new Object[]{"1", 1L})
);
}
@Test
public void testGroupByLimitWrapping() throws Exception
{
List<Object[]> expected;
if (NullHandling.replaceWithDefault()) {
expected = ImmutableList.of(
new Object[]{"def", "abc", 1L},
new Object[]{"abc", "", 1L}
);
} else {
expected = ImmutableList.of(
new Object[]{"def", "abc", 1L},
new Object[]{"abc", null, 1L}
);
}
testQuery(
"SELECT dim1, dim2, COUNT(*) FROM druid.foo GROUP BY dim1, dim2 ORDER BY dim1 DESC",
OUTER_LIMIT_CONTEXT,
ImmutableList.of(
new GroupByQuery.Builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(
new DefaultDimensionSpec("dim1", "d0", ValueType.STRING),
new DefaultDimensionSpec("dim2", "d1", ValueType.STRING)
)
.setLimitSpec(
new DefaultLimitSpec(
ImmutableList.of(
new OrderByColumnSpec("d0", Direction.DESCENDING, StringComparators.LEXICOGRAPHIC)),
2
)
)
.setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
.setContext(OUTER_LIMIT_CONTEXT)
.build()
),
expected
);
}
@Test
public void testGroupByLimitWrappingOrderByAgg() throws Exception
{
testQuery(
"SELECT dim1, dim2, COUNT(*) FROM druid.foo GROUP BY 1, 2 ORDER BY 3 DESC",
OUTER_LIMIT_CONTEXT,
ImmutableList.of(
new GroupByQuery.Builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(
new DefaultDimensionSpec("dim1", "d0", ValueType.STRING),
new DefaultDimensionSpec("dim2", "d1", ValueType.STRING)
)
.setLimitSpec(
new DefaultLimitSpec(
ImmutableList.of(
new OrderByColumnSpec("a0", Direction.DESCENDING, StringComparators.NUMERIC)
),
2
)
)
.setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
.setContext(OUTER_LIMIT_CONTEXT)
.build()
),
ImmutableList.of(new Object[]{"", "a", 1L}, new Object[]{"def", "abc", 1L})
);
}
@Test
public void testSelectProjectionFromSelectSingleColumnWithInnerLimitDescending() throws Exception
{

View File

@ -131,13 +131,6 @@ export class QueryView extends React.PureComponent<QueryViewProps, QueryViewStat
return /EXPLAIN\sPLAN\sFOR/i.test(query);
}
static wrapInLimitIfNeeded(query: string, limit: number | undefined): string {
query = QueryView.trimSemicolon(query);
if (!limit) return query;
if (QueryView.isExplainQuery(query)) return query;
return `SELECT * FROM (${query}\n) LIMIT ${limit}`;
}
static wrapInExplainIfNeeded(query: string): string {
query = QueryView.trimSemicolon(query);
if (QueryView.isExplainQuery(query)) return query;
@ -251,17 +244,16 @@ export class QueryView extends React.PureComponent<QueryViewProps, QueryViewStat
if (QueryView.isJsonLike(queryString)) {
jsonQuery = Hjson.parse(queryString);
} else {
const actualQuery = QueryView.wrapInLimitIfNeeded(queryString, wrapQueryLimit);
jsonQuery = {
query: actualQuery,
query: queryString,
resultFormat: 'array',
header: true,
};
}
if (!isEmptyContext(queryContext)) {
jsonQuery.context = Object.assign(jsonQuery.context || {}, queryContext);
if (!isEmptyContext(queryContext) || wrapQueryLimit) {
jsonQuery.context = Object.assign({}, jsonQuery.context || {}, queryContext);
jsonQuery.context.sqlOuterLimit = wrapQueryLimit;
}
let rawQueryResult: unknown;
@ -320,14 +312,15 @@ export class QueryView extends React.PureComponent<QueryViewProps, QueryViewStat
processQuery: async (queryWithContext: QueryWithContext) => {
const { queryString, queryContext, wrapQueryLimit } = queryWithContext;
const actualQuery = QueryView.wrapInLimitIfNeeded(queryString, wrapQueryLimit);
const explainPayload: Record<string, any> = {
query: QueryView.wrapInExplainIfNeeded(actualQuery),
query: QueryView.wrapInExplainIfNeeded(queryString),
resultFormat: 'object',
};
if (!isEmptyContext(queryContext)) explainPayload.context = queryContext;
if (!isEmptyContext(queryContext) || wrapQueryLimit) {
explainPayload.context = Object.assign({}, queryContext || {});
explainPayload.context.sqlOuterLimit = wrapQueryLimit;
}
const result = await queryDruidSql(explainPayload);
return parseQueryPlan(result[0]['PLAN']);