From bb487a419389aa68db34d37c05c86953c2599be5 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 17 Sep 2024 10:06:24 +0530 Subject: [PATCH] Support maxSubqueryBytes for window functions (#16800) Window queries now acknowledge maxSubqueryBytes. --- .../druid/query/ResultSerializationMode.java | 39 +++++ .../WindowOperatorQueryQueryToolChest.java | 138 ++++++++++----- ...WindowOperatorQueryQueryToolChestTest.java | 162 ++++++++++++++++++ .../server/ClientQuerySegmentWalker.java | 33 ++-- .../server/ClientQuerySegmentWalkerUtils.java | 22 ++- .../server/ClientQuerySegmentWalkerTest.java | 4 +- .../sql/calcite/BaseCalciteQueryTest.java | 82 ++++++++- .../sql/calcite/CalciteWindowQueryTest.java | 52 ++++-- .../tests/window/arrayAggWithOrderBy.sqlTest | 4 +- .../tests/window/arrayConcatAgg.sqlTest | 4 +- .../tests/window/rank_handling.sqlTest | 3 - 11 files changed, 461 insertions(+), 82 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/ResultSerializationMode.java create mode 100644 processing/src/test/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChestTest.java diff --git a/processing/src/main/java/org/apache/druid/query/ResultSerializationMode.java b/processing/src/main/java/org/apache/druid/query/ResultSerializationMode.java new file mode 100644 index 00000000000..3d42e8cbc82 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/ResultSerializationMode.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +/** + * Serialization medium of the query results on the broker. It is currently used to communicate the result's format between + * the main query processing walker and the individual toolchests while materializing subquery's rows + */ +public enum ResultSerializationMode +{ + /** + * Materialize the inner results as rows + */ + ROWS, + + /** + * Materialize the inner results as frames + */ + FRAMES; + + public static final String CTX_SERIALIZATION_PARAMETER = "serialization"; +} diff --git a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java index bec529eedef..7fb67e8732d 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java @@ -23,24 +23,30 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Function; import com.google.common.base.Functions; import com.google.common.collect.ImmutableMap; +import org.apache.druid.error.DruidException; +import org.apache.druid.frame.allocation.MemoryAllocatorFactory; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.DefaultQueryMetrics; +import org.apache.druid.query.FrameSignaturePair; import org.apache.druid.query.QueryMetrics; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryToolChest; +import org.apache.druid.query.ResultSerializationMode; import org.apache.druid.query.aggregation.MetricManipulationFn; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.ColumnAccessor; import org.apache.druid.query.rowsandcols.column.NullColumn; +import org.apache.druid.query.rowsandcols.semantic.FrameMaker; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.function.Supplier; public class WindowOperatorQueryQueryToolChest extends QueryToolChest @@ -50,7 +56,7 @@ public class WindowOperatorQueryQueryToolChest extends QueryToolChest mergeResults(QueryRunner runner) { - return new RowsAndColumnsUnravelingQueryRunner( + return new RowsAndColumnsSerializingQueryRunner( (queryPlus, responseContext) -> { final WindowOperatorQuery query = (WindowOperatorQuery) queryPlus.getQuery(); final List opFactories = query.getOperators(); @@ -61,7 +67,7 @@ public class WindowOperatorQueryQueryToolChest extends QueryToolChest opSupplier = () -> { Operator retVal = new SequenceOperator( runner.run( - queryPlus.withQuery(query.withOperators(new ArrayList())), + queryPlus.withQuery(query.withOperators(new ArrayList<>())), responseContext ) ); @@ -112,16 +118,29 @@ public class WindowOperatorQueryQueryToolChest extends QueryToolChest resultSequence ) { - // Dark magic; see RowsAndColumnsUnravelingQueryRunner. + // Dark magic; see RowsAndColumnsSerializingQueryRunner. return (Sequence) resultSequence; } + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public Optional> resultsAsFrames( + WindowOperatorQuery query, + Sequence resultSequence, + MemoryAllocatorFactory memoryAllocatorFactory, + boolean useNestedForUnknownTypes + ) + { + // see RowsAndColumnsSerializingQueryRunner + return Optional.of((Sequence) resultSequence); + } + /** - * This class exists to unravel the RowsAndColumns that are used in this query and make it the return Sequence - * actually be a Sequence of rows. This is relatively broken in a number of regards, the most obvious of which - * is that it is going to run counter to the stated class on the Generic of the QueryToolChest. That is, the - * code makes it look like you are getting a Sequence of RowsAndColumns, but, by using this, the query will - * actually ultimately produce a Sequence of Object[]. This works because of type Erasure in Java (it's all Object + * This class exists to serialize the RowsAndColumns that are used in this query and make it the return Sequence + * actually be a Sequence of rows or frames, as the query requires. + * This is relatively broken in a number of regards, the most obvious of which is that it is going to run counter to the stated class on the Generic of the QueryToolChest. + * That is, the code makes it look like you are getting a Sequence of RowsAndColumns, but, by using this, the query will + * actually ultimately produce a Sequence of Object[] or Frames. This works because of type Erasure in Java (it's all Object * at the end of the day). *

* While it might seem like this will break all sorts of things, the Generic type is actually there more as a type @@ -132,12 +151,12 @@ public class WindowOperatorQueryQueryToolChest extends QueryToolChest baseQueryRunner; - private RowsAndColumnsUnravelingQueryRunner( + private RowsAndColumnsSerializingQueryRunner( QueryRunner baseQueryRunner ) { @@ -158,42 +177,77 @@ public class WindowOperatorQueryQueryToolChest extends QueryToolChest { - List results = new ArrayList<>(rac.numRows()); - - ColumnAccessor[] accessors = new ColumnAccessor[rowSignature.size()]; - int index = 0; - for (String columnName : rowSignature.getColumnNames()) { - final Column column = rac.findColumn(columnName); - if (column == null) { - final ColumnType columnType = rowSignature - .getColumnType(columnName) - .orElse(ColumnType.UNKNOWN_COMPLEX); - - accessors[index] = new NullColumn.Accessor(columnType, rac.numRows()); - } else { - accessors[index] = column.toAccessor(); - } - ++index; - } - - for (int i = 0; i < rac.numRows(); ++i) { - Object[] objArr = new Object[accessors.length]; - for (int j = 0; j < accessors.length; j++) { - objArr[j] = accessors[j].getObject(i); - } - results.add(objArr); - } - - return Sequences.simple(results); - } + final ResultSerializationMode serializationMode = query.context().getEnum( + ResultSerializationMode.CTX_SERIALIZATION_PARAMETER, + ResultSerializationMode.class, + ResultSerializationMode.ROWS ); + switch (serializationMode) { + case ROWS: + return asRows(baseSequence, query); + case FRAMES: + return asFrames(baseSequence); + default: + throw DruidException.defensive("Serialization mode[%s] not supported", serializationMode); + } } return baseQueryRunner.run(queryPlus, responseContext); } + + /** + * Translates Sequence of RACs to a Sequence of Object[] + */ + private static Sequence asRows(final Sequence baseSequence, final WindowOperatorQuery query) + { + final RowSignature rowSignature = query.getRowSignature(); + return baseSequence.flatMap( + rac -> { + List results = new ArrayList<>(rac.numRows()); + + ColumnAccessor[] accessors = new ColumnAccessor[rowSignature.size()]; + int index = 0; + for (String columnName : rowSignature.getColumnNames()) { + final Column column = rac.findColumn(columnName); + if (column == null) { + final ColumnType columnType = rowSignature + .getColumnType(columnName) + .orElse(ColumnType.UNKNOWN_COMPLEX); + + accessors[index] = new NullColumn.Accessor(columnType, rac.numRows()); + } else { + accessors[index] = column.toAccessor(); + } + ++index; + } + + for (int i = 0; i < rac.numRows(); ++i) { + Object[] objArr = new Object[accessors.length]; + for (int j = 0; j < accessors.length; j++) { + objArr[j] = accessors[j].getObject(i); + } + results.add(objArr); + } + + return Sequences.simple(results); + } + ); + } + + /** + * Translates a sequence of RACs to a Sequence of Frames + */ + private static Sequence asFrames(final Sequence baseSequence) + { + return baseSequence.map( + rac -> { + FrameMaker frameMaker = FrameMaker.fromRAC(rac); + return new FrameSignaturePair( + frameMaker.toColumnBasedFrame(), + frameMaker.computeSignature() + ); + } + ); + } } } diff --git a/processing/src/test/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChestTest.java new file mode 100644 index 00000000000..2100b36e57d --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChestTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.operator; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.frame.testutil.FrameTestUtil; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.query.Druids; +import org.apache.druid.query.FrameSignaturePair; +import org.apache.druid.query.QueryDataSource; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.ResultSerializationMode; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.operator.window.WindowOperatorFactory; +import org.apache.druid.query.operator.window.ranking.WindowRowNumberProcessor; +import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; +import org.apache.druid.query.rowsandcols.column.IntArrayColumn; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.query.spec.LegacySegmentSpec; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +public class WindowOperatorQueryQueryToolChestTest extends InitializedNullHandlingTest +{ + + private final WindowOperatorQueryQueryToolChest toolchest = new WindowOperatorQueryQueryToolChest(); + + @Test + public void mergeResultsWithRowResultSerializationMode() + { + RowSignature inputSignature = RowSignature.builder() + .add("length", ColumnType.LONG) + .build(); + RowSignature outputSignature = RowSignature.builder() + .addAll(inputSignature) + .add("w0", ColumnType.LONG) + .build(); + + final WindowOperatorQuery query = new WindowOperatorQuery( + new QueryDataSource( + Druids.newScanQueryBuilder() + .dataSource(new TableDataSource("test")) + .intervals(new LegacySegmentSpec(Intervals.ETERNITY)) + .columns("length") + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .context(new HashMap<>()) + .build() + ), + new LegacySegmentSpec(Intervals.ETERNITY), + new HashMap<>(), + outputSignature, + ImmutableList.of( + new WindowOperatorFactory(new WindowRowNumberProcessor("w0")) + ), + ImmutableList.of() + ); + List results = toolchest.mergeResults( + (queryPlus, responseContext) -> Sequences.simple( + Collections.singletonList( + MapOfColumnsRowsAndColumns.fromMap( + ImmutableMap.of("length", new IntArrayColumn(new int[]{1, 5, 10})) + ) + ) + ) + ).run(QueryPlus.wrap(query)).toList(); + + Assert.assertTrue(results.get(0) instanceof Object[]); + Assert.assertEquals(3, results.size()); + List expectedResults = ImmutableList.of( + new Object[]{1, 1}, + new Object[]{5, 2}, + new Object[]{10, 3} + ); + + for (int i = 0; i < 3; ++i) { + Assert.assertArrayEquals(expectedResults.get(i), (Object[]) results.get(i)); + } + } + + @Test + public void mergeResultsWithFrameResultSerializationMode() + { + RowSignature inputSignature = RowSignature.builder() + .add("length", ColumnType.LONG) + .build(); + RowSignature outputSignature = RowSignature.builder() + .addAll(inputSignature) + .add("w0", ColumnType.LONG) + .build(); + + final WindowOperatorQuery query = new WindowOperatorQuery( + new QueryDataSource( + Druids.newScanQueryBuilder() + .dataSource(new TableDataSource("test")) + .intervals(new LegacySegmentSpec(Intervals.ETERNITY)) + .columns("length") + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .context(new HashMap<>()) + .build() + ), + new LegacySegmentSpec(Intervals.ETERNITY), + Collections.singletonMap(ResultSerializationMode.CTX_SERIALIZATION_PARAMETER, ResultSerializationMode.FRAMES.toString()), + outputSignature, + ImmutableList.of( + new WindowOperatorFactory(new WindowRowNumberProcessor("w0")) + ), + ImmutableList.of() + ); + List results = toolchest.mergeResults( + (queryPlus, responseContext) -> Sequences.simple( + Collections.singletonList( + MapOfColumnsRowsAndColumns.fromMap( + ImmutableMap.of("length", new IntArrayColumn(new int[]{1, 5, 10})) + ) + ) + ) + ).run(QueryPlus.wrap(query)).toList(); + + Assert.assertTrue(results.get(0) instanceof FrameSignaturePair); + Assert.assertEquals(1, results.size()); + + FrameReader reader = FrameReader.create(((FrameSignaturePair) results.get(0)).getRowSignature()); + List> resultRows = FrameTestUtil.readRowsFromCursorFactory( + reader.makeCursorFactory(((FrameSignaturePair) results.get(0)).getFrame()) + ).toList(); + + List> expectedResults = ImmutableList.of( + ImmutableList.of(1L, 1L), + ImmutableList.of(5L, 2L), + ImmutableList.of(10L, 3L) + ); + Assertions.assertEquals(expectedResults, resultRows); + } +} diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index 990878eda6e..37ae14f56c3 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -56,6 +56,7 @@ import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.query.ResourceLimitExceededException; import org.apache.druid.query.ResultLevelCachingQueryRunner; +import org.apache.druid.query.ResultSerializationMode; import org.apache.druid.query.RetryQueryRunner; import org.apache.druid.query.RetryQueryRunnerConfig; import org.apache.druid.query.SegmentDescriptor; @@ -359,7 +360,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker * @param dryRun if true, does not actually execute any subqueries, but will inline empty result sets. */ @SuppressWarnings({"rawtypes", "unchecked"}) // Subquery, toolchest, runner handling all use raw types - private DataSource inlineIfNecessary( + private DataSource inlineIfNecessary( final DataSource dataSource, @Nullable final QueryToolChest toolChestIfOutermost, final AtomicInteger subqueryRowLimitAccumulator, @@ -434,11 +435,17 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker if (dryRun) { queryResults = Sequences.empty(); } else { - final QueryRunner subqueryRunner = subQuery.getRunner(this); - queryResults = subqueryRunner.run( - QueryPlus.wrap(subQuery), - DirectDruidClient.makeResponseContextForQuery() + Query subQueryWithSerialization = subQuery.withOverriddenContext( + Collections.singletonMap( + ResultSerializationMode.CTX_SERIALIZATION_PARAMETER, + ClientQuerySegmentWalkerUtils.getLimitType(maxSubqueryMemory, cannotMaterializeToFrames.get()) + .serializationMode() + .toString() + ) ); + queryResults = subQueryWithSerialization + .getRunner(this) + .run(QueryPlus.wrap(subQueryWithSerialization), DirectDruidClient.makeResponseContextForQuery()); } return toInlineDataSource( @@ -647,14 +654,11 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker .collect(Collectors.toList())); } - /** - */ /** * * Convert the results of a particular query into a materialized (List-based) InlineDataSource. * * @param query the query - * @param results query results * @param toolChest toolchest for the query * @param limitAccumulator an accumulator for tracking the number of accumulated rows in all subqueries for a * particular master query @@ -671,7 +675,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker */ private static > DataSource toInlineDataSource( final QueryType query, - final Sequence results, + final Sequence queryResults, final QueryToolChest toolChest, final AtomicInteger limitAccumulator, final AtomicLong memoryLimitAccumulator, @@ -697,7 +701,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker subqueryStatsProvider.incrementSubqueriesWithRowLimit(); dataSource = materializeResultsAsArray( query, - results, + queryResults, toolChest, limitAccumulator, limit, @@ -713,7 +717,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker } Optional maybeDataSource = materializeResultsAsFrames( query, - results, + queryResults, toolChest, limitAccumulator, memoryLimitAccumulator, @@ -734,7 +738,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker subqueryStatsProvider.incrementSubqueriesFallingBackToRowLimit(); dataSource = materializeResultsAsArray( query, - results, + queryResults, toolChest, limitAccumulator, limit, @@ -770,11 +774,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker final ServiceEmitter emitter ) { - Optional> framesOptional; - boolean startedAccumulating = false; try { - framesOptional = toolChest.resultsAsFrames( + Optional> framesOptional = toolChest.resultsAsFrames( query, results, new ArenaMemoryAllocatorFactory(FRAME_SIZE), @@ -912,5 +914,4 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker QueryContexts.MAX_SUBQUERY_ROWS_KEY ); } - } diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalkerUtils.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalkerUtils.java index 6667cd96112..0435ed23193 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalkerUtils.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalkerUtils.java @@ -19,6 +19,8 @@ package org.apache.druid.server; +import org.apache.druid.query.ResultSerializationMode; + /** * Utilities for {@link ClientQuerySegmentWalker} */ @@ -35,7 +37,13 @@ public class ClientQuerySegmentWalkerUtils * walker ensures that the cumulative number of rows of the results of subqueries of the given query donot exceed * the limit specified in the context or as the server default */ - ROW_LIMIT, + ROW_LIMIT { + @Override + public ResultSerializationMode serializationMode() + { + return ResultSerializationMode.ROWS; + } + }, /** * Subqueries limited by the BYTE_LIMIT are materialized as {@link org.apache.druid.frame.Frame}s on heap. Frames @@ -44,10 +52,18 @@ public class ClientQuerySegmentWalkerUtils * Frames in the broker memory) of a given query do not exceed the limit specified in the context or as the server * default */ - MEMORY_LIMIT + MEMORY_LIMIT { + @Override + public ResultSerializationMode serializationMode() + { + return ResultSerializationMode.FRAMES; + } + }; + + public abstract ResultSerializationMode serializationMode(); } - /** + /** * Returns the limit type to be used for a given subquery. * It returns MEMORY_LIMIT only if: * 1. The user has enabled the 'maxSubqueryBytes' explicitly in the query context or as the server default diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java index 467f375f9f7..fa5585c374e 100644 --- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java @@ -52,6 +52,7 @@ import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.QueryToolChestTestHelper; import org.apache.druid.query.ResourceLimitExceededException; +import org.apache.druid.query.ResultSerializationMode; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.UnionDataSource; @@ -1721,7 +1722,8 @@ public class ClientQuerySegmentWalkerTest .put(GroupByQueryConfig.CTX_KEY_APPLY_LIMIT_PUSH_DOWN, true) .put(GroupingEngine.CTX_KEY_OUTERMOST, true) .put(GroupingEngine.CTX_KEY_FUDGE_TIMESTAMP, "1979") - .put(QueryContexts.QUERY_RESOURCE_ID, "dummy"); + .put(QueryContexts.QUERY_RESOURCE_ID, "dummy") + .put(ResultSerializationMode.CTX_SERIALIZATION_PARAMETER, "blast"); modifiedQuery = query.withOverriddenContext(contextBuilder.build()); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index 676bf8b4dd4..4b26af38adb 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -35,12 +35,14 @@ import org.apache.druid.error.DruidException.Persona; import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.hll.VersionOneHyperLogLogCollector; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.math.expr.Evals; +import org.apache.druid.math.expr.ExprEval; import org.apache.druid.query.DataSource; import org.apache.druid.query.Druids; import org.apache.druid.query.JoinDataSource; @@ -982,23 +984,55 @@ public class BaseCalciteQueryTest extends CalciteTestBase mismatchMessage(row, column), (Float) expectedCell, (Float) resultCell, - ASSERTION_EPSILON); + ASSERTION_EPSILON + ); } else if (expectedCell instanceof Double) { assertEquals( mismatchMessage(row, column), (Double) expectedCell, (Double) resultCell, - ASSERTION_EPSILON); + ASSERTION_EPSILON + ); + } else if (expectedCell instanceof Object[] || expectedCell instanceof List) { + final Object[] expectedCellCasted = homogenizeArray(expectedCell); + final Object[] resultCellCasted = homogenizeArray(resultCell); + if (expectedCellCasted.length != resultCellCasted.length) { + throw new RE( + "Mismatched array lengths: expected[%s] with length[%d], actual[%s] with length[%d]", + Arrays.toString(expectedCellCasted), + expectedCellCasted.length, + Arrays.toString(resultCellCasted), + resultCellCasted.length + ); + } + for (int i = 0; i < expectedCellCasted.length; ++i) { + validate(row, column, type, expectedCellCasted[i], resultCellCasted[i]); + } } else { EQUALS.validate(row, column, type, expectedCell, resultCell); } } }, + + RELAX_NULLS_EPS { + @Override + void validate(int row, int column, ValueType type, Object expectedCell, Object resultCell) + { + if (expectedCell == null) { + if (resultCell == null) { + return; + } + expectedCell = NullHandling.defaultValueForType(type); + } + EQUALS_EPS.validate(row, column, type, expectedCell, resultCell); + } + }, + /** * Comparision which accepts 1000 units of least precision. */ EQUALS_RELATIVE_1000_ULPS { - static final int ASSERTION_ERROR_ULPS = 1000; + private static final int ASSERTION_ERROR_ULPS = 1000; @Override void validate(int row, int column, ValueType type, Object expectedCell, Object resultCell) @@ -1019,10 +1053,43 @@ public class BaseCalciteQueryTest extends CalciteTestBase (Double) resultCell, eps ); + } else if (expectedCell instanceof Object[] || expectedCell instanceof List) { + final Object[] expectedCellCasted = homogenizeArray(expectedCell); + final Object[] resultCellCasted = homogenizeArray(resultCell); + + if (expectedCellCasted.length != resultCellCasted.length) { + throw new RE( + "Mismatched array lengths: expected[%s] with length[%d], actual[%s] with length[%d]", + Arrays.toString(expectedCellCasted), + expectedCellCasted.length, + Arrays.toString(resultCellCasted), + resultCellCasted.length + ); + } + for (int i = 0; i < expectedCellCasted.length; ++i) { + validate(row, column, type, expectedCellCasted[i], resultCellCasted[i]); + } } else { EQUALS.validate(row, column, type, expectedCell, resultCell); } } + }, + + /** + * Relax nulls which accepts 1000 units of least precision. + */ + RELAX_NULLS_RELATIVE_1000_ULPS { + @Override + void validate(int row, int column, ValueType type, Object expectedCell, Object resultCell) + { + if (expectedCell == null) { + if (resultCell == null) { + return; + } + expectedCell = NullHandling.defaultValueForType(type); + } + EQUALS_RELATIVE_1000_ULPS.validate(row, column, type, expectedCell, resultCell); + } }; abstract void validate(int row, int column, ValueType type, Object expectedCell, Object resultCell); @@ -1032,6 +1099,15 @@ public class BaseCalciteQueryTest extends CalciteTestBase return StringUtils.format("column content mismatch at %d,%d", row, column); } + private static Object[] homogenizeArray(Object array) + { + if (array instanceof Object[]) { + return (Object[]) array; + } else if (array instanceof List) { + return ExprEval.coerceListToArray((List) array, true).rhs; + } + throw new ISE("Found array[%s] of type[%s] which is not handled", array.toString(), array.getClass().getName()); + } } /** diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java index ccf459e743e..5850be0bd1c 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java @@ -71,9 +71,17 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest private static final Map DEFAULT_QUERY_CONTEXT = ImmutableMap.of( PlannerContext.CTX_ENABLE_WINDOW_FNS, true, - QueryContexts.ENABLE_DEBUG, true + QueryContexts.ENABLE_DEBUG, true, + QueryContexts.CTX_SQL_STRINGIFY_ARRAYS, false ); + private static final Map DEFAULT_QUERY_CONTEXT_WITH_SUBQUERY_BYTES = + ImmutableMap.builder() + .putAll(DEFAULT_QUERY_CONTEXT) + .put(QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000") + .put(QueryContexts.MAX_SUBQUERY_ROWS_KEY, "0") + .build(); + public static Object[] parametersForWindowQueryTest() throws Exception { final URL windowFolderUrl = ClassLoader.getSystemResource("calcite/tests/window"); @@ -161,7 +169,7 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest } } } - assertResultsValid(ResultMatchMode.RELAX_NULLS, input.expectedResults, results); + assertResultsValid(ResultMatchMode.RELAX_NULLS_EPS, input.expectedResults, results); } private void validateOperators(List expectedOperators, List currentOperators) @@ -223,6 +231,30 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest } } + @MethodSource("parametersForWindowQueryTest") + @ParameterizedTest(name = "{0}") + @SuppressWarnings("unchecked") + public void windowQueryTestsWithSubqueryBytes(String filename) throws Exception + { + TestCase testCase = new TestCase(filename); + + assumeTrue(testCase.getType() != TestType.failingTest); + + if (testCase.getType() == TestType.operatorValidation) { + testBuilder() + .skipVectorize(true) + .sql(testCase.getSql()) + .queryContext( + ImmutableMap.builder() + .putAll(DEFAULT_QUERY_CONTEXT_WITH_SUBQUERY_BYTES) + .putAll(testCase.getQueryContext()) + .build() + ) + .addCustomVerification(QueryVerification.ofResults(testCase)) + .run(); + } + } + @Test public void testWithArrayConcat() { @@ -237,14 +269,14 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest .expectedResults( ResultMatchMode.RELAX_NULLS, ImmutableList.of( - new Object[]{"Austria", null, "#de.wikipedia", "[\"abc\",\"#de.wikipedia\"]"}, - new Object[]{"Republic of Korea", null, "#en.wikipedia", "[\"abc\",\"#de.wikipedia\",\"abc\",\"#en.wikipedia\",\"abc\",\"#ja.wikipedia\",\"abc\",\"#ko.wikipedia\"]"}, - new Object[]{"Republic of Korea", null, "#ja.wikipedia", "[\"abc\",\"#de.wikipedia\",\"abc\",\"#en.wikipedia\",\"abc\",\"#ja.wikipedia\",\"abc\",\"#ko.wikipedia\"]"}, - new Object[]{"Republic of Korea", null, "#ko.wikipedia", "[\"abc\",\"#de.wikipedia\",\"abc\",\"#en.wikipedia\",\"abc\",\"#ja.wikipedia\",\"abc\",\"#ko.wikipedia\"]"}, - new Object[]{"Republic of Korea", "Seoul", "#ko.wikipedia", "[\"abc\",\"#ko.wikipedia\"]"}, - new Object[]{"Austria", "Vienna", "#de.wikipedia", "[\"abc\",\"#de.wikipedia\",\"abc\",\"#es.wikipedia\",\"abc\",\"#tr.wikipedia\"]"}, - new Object[]{"Austria", "Vienna", "#es.wikipedia", "[\"abc\",\"#de.wikipedia\",\"abc\",\"#es.wikipedia\",\"abc\",\"#tr.wikipedia\"]"}, - new Object[]{"Austria", "Vienna", "#tr.wikipedia", "[\"abc\",\"#de.wikipedia\",\"abc\",\"#es.wikipedia\",\"abc\",\"#tr.wikipedia\"]"} + new Object[]{"Austria", null, "#de.wikipedia", ImmutableList.of("abc", "#de.wikipedia")}, + new Object[]{"Republic of Korea", null, "#en.wikipedia", ImmutableList.of("abc", "#de.wikipedia", "abc", "#en.wikipedia", "abc", "#ja.wikipedia", "abc", "#ko.wikipedia")}, + new Object[]{"Republic of Korea", null, "#ja.wikipedia", ImmutableList.of("abc", "#de.wikipedia", "abc", "#en.wikipedia", "abc", "#ja.wikipedia", "abc", "#ko.wikipedia")}, + new Object[]{"Republic of Korea", null, "#ko.wikipedia", ImmutableList.of("abc", "#de.wikipedia", "abc", "#en.wikipedia", "abc", "#ja.wikipedia", "abc", "#ko.wikipedia")}, + new Object[]{"Republic of Korea", "Seoul", "#ko.wikipedia", ImmutableList.of("abc", "#ko.wikipedia")}, + new Object[]{"Austria", "Vienna", "#de.wikipedia", ImmutableList.of("abc", "#de.wikipedia", "abc", "#es.wikipedia", "abc", "#tr.wikipedia")}, + new Object[]{"Austria", "Vienna", "#es.wikipedia", ImmutableList.of("abc", "#de.wikipedia", "abc", "#es.wikipedia", "abc", "#tr.wikipedia")}, + new Object[]{"Austria", "Vienna", "#tr.wikipedia", ImmutableList.of("abc", "#de.wikipedia", "abc", "#es.wikipedia", "abc", "#tr.wikipedia")} ) ) .run(); diff --git a/sql/src/test/resources/calcite/tests/window/arrayAggWithOrderBy.sqlTest b/sql/src/test/resources/calcite/tests/window/arrayAggWithOrderBy.sqlTest index bee3baeac0c..3cbdf42af6f 100644 --- a/sql/src/test/resources/calcite/tests/window/arrayAggWithOrderBy.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/arrayAggWithOrderBy.sqlTest @@ -10,5 +10,5 @@ sql: | ORDER BY d1, f1, m1 expectedResults: - - [2,"[1.0]","[1.0]","[1.0]"] - - [2,"[1.7]","[0.1]","[2.0]"] + - [2,[1.0],[1.0],[1.0]] + - [2,[1.7],[0.1],[2.0]] diff --git a/sql/src/test/resources/calcite/tests/window/arrayConcatAgg.sqlTest b/sql/src/test/resources/calcite/tests/window/arrayConcatAgg.sqlTest index 9ec451a94d9..2a648abc17b 100644 --- a/sql/src/test/resources/calcite/tests/window/arrayConcatAgg.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/arrayConcatAgg.sqlTest @@ -9,5 +9,5 @@ sql: | GROUP BY cityName expectedResults: - - ["Horsching","[\"Horsching\"]"] - - ["Vienna","[\"Vienna\"]"] + - ["Horsching",["Horsching"]] + - ["Vienna",["Vienna"]] diff --git a/sql/src/test/resources/calcite/tests/window/rank_handling.sqlTest b/sql/src/test/resources/calcite/tests/window/rank_handling.sqlTest index 1e4de22dfca..0e66ed87460 100644 --- a/sql/src/test/resources/calcite/tests/window/rank_handling.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/rank_handling.sqlTest @@ -1,8 +1,5 @@ type: "operatorValidation" -queryContext: - maxSubqueryBytes: 100000 - sql: | SELECT __time