Support maxSubqueryBytes for window functions (#16800)

Window queries now acknowledge maxSubqueryBytes.
This commit is contained in:
Laksh Singla 2024-09-17 10:06:24 +05:30 committed by GitHub
parent 2e2f3cf66a
commit bb487a4193
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 461 additions and 82 deletions

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
/**
* Serialization medium of the query results on the broker. It is currently used to communicate the result's format between
* the main query processing walker and the individual toolchests while materializing subquery's rows
*/
public enum ResultSerializationMode
{
/**
* Materialize the inner results as rows
*/
ROWS,
/**
* Materialize the inner results as frames
*/
FRAMES;
public static final String CTX_SERIALIZATION_PARAMETER = "serialization";
}

View File

@ -23,24 +23,30 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.allocation.MemoryAllocatorFactory;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.DefaultQueryMetrics;
import org.apache.druid.query.FrameSignaturePair;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.ResultSerializationMode;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.column.NullColumn;
import org.apache.druid.query.rowsandcols.semantic.FrameMaker;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
public class WindowOperatorQueryQueryToolChest extends QueryToolChest<RowsAndColumns, WindowOperatorQuery>
@ -50,7 +56,7 @@ public class WindowOperatorQueryQueryToolChest extends QueryToolChest<RowsAndCol
@SuppressWarnings("unchecked")
public QueryRunner<RowsAndColumns> mergeResults(QueryRunner<RowsAndColumns> runner)
{
return new RowsAndColumnsUnravelingQueryRunner(
return new RowsAndColumnsSerializingQueryRunner(
(queryPlus, responseContext) -> {
final WindowOperatorQuery query = (WindowOperatorQuery) queryPlus.getQuery();
final List<OperatorFactory> opFactories = query.getOperators();
@ -61,7 +67,7 @@ public class WindowOperatorQueryQueryToolChest extends QueryToolChest<RowsAndCol
Supplier<Operator> opSupplier = () -> {
Operator retVal = new SequenceOperator(
runner.run(
queryPlus.withQuery(query.withOperators(new ArrayList<OperatorFactory>())),
queryPlus.withQuery(query.withOperators(new ArrayList<>())),
responseContext
)
);
@ -112,16 +118,29 @@ public class WindowOperatorQueryQueryToolChest extends QueryToolChest<RowsAndCol
Sequence<RowsAndColumns> resultSequence
)
{
// Dark magic; see RowsAndColumnsUnravelingQueryRunner.
// Dark magic; see RowsAndColumnsSerializingQueryRunner.
return (Sequence) resultSequence;
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Optional<Sequence<FrameSignaturePair>> resultsAsFrames(
WindowOperatorQuery query,
Sequence<RowsAndColumns> resultSequence,
MemoryAllocatorFactory memoryAllocatorFactory,
boolean useNestedForUnknownTypes
)
{
// see RowsAndColumnsSerializingQueryRunner
return Optional.of((Sequence) resultSequence);
}
/**
* This class exists to unravel the RowsAndColumns that are used in this query and make it the return Sequence
* actually be a Sequence of rows. This is relatively broken in a number of regards, the most obvious of which
* is that it is going to run counter to the stated class on the Generic of the QueryToolChest. That is, the
* code makes it look like you are getting a Sequence of RowsAndColumns, but, by using this, the query will
* actually ultimately produce a Sequence of Object[]. This works because of type Erasure in Java (it's all Object
* This class exists to serialize the RowsAndColumns that are used in this query and make it the return Sequence
* actually be a Sequence of rows or frames, as the query requires.
* This is relatively broken in a number of regards, the most obvious of which is that it is going to run counter to the stated class on the Generic of the QueryToolChest.
* That is, the code makes it look like you are getting a Sequence of RowsAndColumns, but, by using this, the query will
* actually ultimately produce a Sequence of Object[] or Frames. This works because of type Erasure in Java (it's all Object
* at the end of the day).
* <p>
* While it might seem like this will break all sorts of things, the Generic type is actually there more as a type
@ -132,12 +151,12 @@ public class WindowOperatorQueryQueryToolChest extends QueryToolChest<RowsAndCol
* Not our proudest moment, but we use the tools available to us.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private static class RowsAndColumnsUnravelingQueryRunner implements QueryRunner
private static class RowsAndColumnsSerializingQueryRunner implements QueryRunner
{
private final QueryRunner<RowsAndColumns> baseQueryRunner;
private RowsAndColumnsUnravelingQueryRunner(
private RowsAndColumnsSerializingQueryRunner(
QueryRunner<RowsAndColumns> baseQueryRunner
)
{
@ -158,42 +177,77 @@ public class WindowOperatorQueryQueryToolChest extends QueryToolChest<RowsAndCol
queryPlus.withQuery(query.withOverriddenContext(ImmutableMap.of("unravel", false))),
responseContext
);
final RowSignature rowSignature = query.getRowSignature();
return baseSequence.flatMap(
rac -> {
List<Object[]> results = new ArrayList<>(rac.numRows());
ColumnAccessor[] accessors = new ColumnAccessor[rowSignature.size()];
int index = 0;
for (String columnName : rowSignature.getColumnNames()) {
final Column column = rac.findColumn(columnName);
if (column == null) {
final ColumnType columnType = rowSignature
.getColumnType(columnName)
.orElse(ColumnType.UNKNOWN_COMPLEX);
accessors[index] = new NullColumn.Accessor(columnType, rac.numRows());
} else {
accessors[index] = column.toAccessor();
}
++index;
}
for (int i = 0; i < rac.numRows(); ++i) {
Object[] objArr = new Object[accessors.length];
for (int j = 0; j < accessors.length; j++) {
objArr[j] = accessors[j].getObject(i);
}
results.add(objArr);
}
return Sequences.simple(results);
}
final ResultSerializationMode serializationMode = query.context().getEnum(
ResultSerializationMode.CTX_SERIALIZATION_PARAMETER,
ResultSerializationMode.class,
ResultSerializationMode.ROWS
);
switch (serializationMode) {
case ROWS:
return asRows(baseSequence, query);
case FRAMES:
return asFrames(baseSequence);
default:
throw DruidException.defensive("Serialization mode[%s] not supported", serializationMode);
}
}
return baseQueryRunner.run(queryPlus, responseContext);
}
/**
* Translates Sequence of RACs to a Sequence of Object[]
*/
private static Sequence asRows(final Sequence<RowsAndColumns> baseSequence, final WindowOperatorQuery query)
{
final RowSignature rowSignature = query.getRowSignature();
return baseSequence.flatMap(
rac -> {
List<Object[]> results = new ArrayList<>(rac.numRows());
ColumnAccessor[] accessors = new ColumnAccessor[rowSignature.size()];
int index = 0;
for (String columnName : rowSignature.getColumnNames()) {
final Column column = rac.findColumn(columnName);
if (column == null) {
final ColumnType columnType = rowSignature
.getColumnType(columnName)
.orElse(ColumnType.UNKNOWN_COMPLEX);
accessors[index] = new NullColumn.Accessor(columnType, rac.numRows());
} else {
accessors[index] = column.toAccessor();
}
++index;
}
for (int i = 0; i < rac.numRows(); ++i) {
Object[] objArr = new Object[accessors.length];
for (int j = 0; j < accessors.length; j++) {
objArr[j] = accessors[j].getObject(i);
}
results.add(objArr);
}
return Sequences.simple(results);
}
);
}
/**
* Translates a sequence of RACs to a Sequence of Frames
*/
private static Sequence asFrames(final Sequence<RowsAndColumns> baseSequence)
{
return baseSequence.map(
rac -> {
FrameMaker frameMaker = FrameMaker.fromRAC(rac);
return new FrameSignaturePair(
frameMaker.toColumnBasedFrame(),
frameMaker.computeSignature()
);
}
);
}
}
}

View File

@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.operator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.Druids;
import org.apache.druid.query.FrameSignaturePair;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.ResultSerializationMode;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.operator.window.WindowOperatorFactory;
import org.apache.druid.query.operator.window.ranking.WindowRowNumberProcessor;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.LegacySegmentSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
public class WindowOperatorQueryQueryToolChestTest extends InitializedNullHandlingTest
{
private final WindowOperatorQueryQueryToolChest toolchest = new WindowOperatorQueryQueryToolChest();
@Test
public void mergeResultsWithRowResultSerializationMode()
{
RowSignature inputSignature = RowSignature.builder()
.add("length", ColumnType.LONG)
.build();
RowSignature outputSignature = RowSignature.builder()
.addAll(inputSignature)
.add("w0", ColumnType.LONG)
.build();
final WindowOperatorQuery query = new WindowOperatorQuery(
new QueryDataSource(
Druids.newScanQueryBuilder()
.dataSource(new TableDataSource("test"))
.intervals(new LegacySegmentSpec(Intervals.ETERNITY))
.columns("length")
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(new HashMap<>())
.build()
),
new LegacySegmentSpec(Intervals.ETERNITY),
new HashMap<>(),
outputSignature,
ImmutableList.of(
new WindowOperatorFactory(new WindowRowNumberProcessor("w0"))
),
ImmutableList.of()
);
List results = toolchest.mergeResults(
(queryPlus, responseContext) -> Sequences.simple(
Collections.singletonList(
MapOfColumnsRowsAndColumns.fromMap(
ImmutableMap.of("length", new IntArrayColumn(new int[]{1, 5, 10}))
)
)
)
).run(QueryPlus.wrap(query)).toList();
Assert.assertTrue(results.get(0) instanceof Object[]);
Assert.assertEquals(3, results.size());
List<Object[]> expectedResults = ImmutableList.of(
new Object[]{1, 1},
new Object[]{5, 2},
new Object[]{10, 3}
);
for (int i = 0; i < 3; ++i) {
Assert.assertArrayEquals(expectedResults.get(i), (Object[]) results.get(i));
}
}
@Test
public void mergeResultsWithFrameResultSerializationMode()
{
RowSignature inputSignature = RowSignature.builder()
.add("length", ColumnType.LONG)
.build();
RowSignature outputSignature = RowSignature.builder()
.addAll(inputSignature)
.add("w0", ColumnType.LONG)
.build();
final WindowOperatorQuery query = new WindowOperatorQuery(
new QueryDataSource(
Druids.newScanQueryBuilder()
.dataSource(new TableDataSource("test"))
.intervals(new LegacySegmentSpec(Intervals.ETERNITY))
.columns("length")
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(new HashMap<>())
.build()
),
new LegacySegmentSpec(Intervals.ETERNITY),
Collections.singletonMap(ResultSerializationMode.CTX_SERIALIZATION_PARAMETER, ResultSerializationMode.FRAMES.toString()),
outputSignature,
ImmutableList.of(
new WindowOperatorFactory(new WindowRowNumberProcessor("w0"))
),
ImmutableList.of()
);
List results = toolchest.mergeResults(
(queryPlus, responseContext) -> Sequences.simple(
Collections.singletonList(
MapOfColumnsRowsAndColumns.fromMap(
ImmutableMap.of("length", new IntArrayColumn(new int[]{1, 5, 10}))
)
)
)
).run(QueryPlus.wrap(query)).toList();
Assert.assertTrue(results.get(0) instanceof FrameSignaturePair);
Assert.assertEquals(1, results.size());
FrameReader reader = FrameReader.create(((FrameSignaturePair) results.get(0)).getRowSignature());
List<List<Object>> resultRows = FrameTestUtil.readRowsFromCursorFactory(
reader.makeCursorFactory(((FrameSignaturePair) results.get(0)).getFrame())
).toList();
List<List<Object>> expectedResults = ImmutableList.of(
ImmutableList.of(1L, 1L),
ImmutableList.of(5L, 2L),
ImmutableList.of(10L, 3L)
);
Assertions.assertEquals(expectedResults, resultRows);
}
}

View File

@ -56,6 +56,7 @@ import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.ResultLevelCachingQueryRunner;
import org.apache.druid.query.ResultSerializationMode;
import org.apache.druid.query.RetryQueryRunner;
import org.apache.druid.query.RetryQueryRunnerConfig;
import org.apache.druid.query.SegmentDescriptor;
@ -359,7 +360,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
* @param dryRun if true, does not actually execute any subqueries, but will inline empty result sets.
*/
@SuppressWarnings({"rawtypes", "unchecked"}) // Subquery, toolchest, runner handling all use raw types
private DataSource inlineIfNecessary(
private <T> DataSource inlineIfNecessary(
final DataSource dataSource,
@Nullable final QueryToolChest toolChestIfOutermost,
final AtomicInteger subqueryRowLimitAccumulator,
@ -434,11 +435,17 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
if (dryRun) {
queryResults = Sequences.empty();
} else {
final QueryRunner subqueryRunner = subQuery.getRunner(this);
queryResults = subqueryRunner.run(
QueryPlus.wrap(subQuery),
DirectDruidClient.makeResponseContextForQuery()
Query subQueryWithSerialization = subQuery.withOverriddenContext(
Collections.singletonMap(
ResultSerializationMode.CTX_SERIALIZATION_PARAMETER,
ClientQuerySegmentWalkerUtils.getLimitType(maxSubqueryMemory, cannotMaterializeToFrames.get())
.serializationMode()
.toString()
)
);
queryResults = subQueryWithSerialization
.getRunner(this)
.run(QueryPlus.wrap(subQueryWithSerialization), DirectDruidClient.makeResponseContextForQuery());
}
return toInlineDataSource(
@ -647,14 +654,11 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
.collect(Collectors.toList()));
}
/**
*/
/**
*
* Convert the results of a particular query into a materialized (List-based) InlineDataSource.
*
* @param query the query
* @param results query results
* @param toolChest toolchest for the query
* @param limitAccumulator an accumulator for tracking the number of accumulated rows in all subqueries for a
* particular master query
@ -671,7 +675,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
*/
private static <T, QueryType extends Query<T>> DataSource toInlineDataSource(
final QueryType query,
final Sequence<T> results,
final Sequence<T> queryResults,
final QueryToolChest<T, QueryType> toolChest,
final AtomicInteger limitAccumulator,
final AtomicLong memoryLimitAccumulator,
@ -697,7 +701,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
subqueryStatsProvider.incrementSubqueriesWithRowLimit();
dataSource = materializeResultsAsArray(
query,
results,
queryResults,
toolChest,
limitAccumulator,
limit,
@ -713,7 +717,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
}
Optional<DataSource> maybeDataSource = materializeResultsAsFrames(
query,
results,
queryResults,
toolChest,
limitAccumulator,
memoryLimitAccumulator,
@ -734,7 +738,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
subqueryStatsProvider.incrementSubqueriesFallingBackToRowLimit();
dataSource = materializeResultsAsArray(
query,
results,
queryResults,
toolChest,
limitAccumulator,
limit,
@ -770,11 +774,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
final ServiceEmitter emitter
)
{
Optional<Sequence<FrameSignaturePair>> framesOptional;
boolean startedAccumulating = false;
try {
framesOptional = toolChest.resultsAsFrames(
Optional<Sequence<FrameSignaturePair>> framesOptional = toolChest.resultsAsFrames(
query,
results,
new ArenaMemoryAllocatorFactory(FRAME_SIZE),
@ -912,5 +914,4 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
QueryContexts.MAX_SUBQUERY_ROWS_KEY
);
}
}

View File

@ -19,6 +19,8 @@
package org.apache.druid.server;
import org.apache.druid.query.ResultSerializationMode;
/**
* Utilities for {@link ClientQuerySegmentWalker}
*/
@ -35,7 +37,13 @@ public class ClientQuerySegmentWalkerUtils
* walker ensures that the cumulative number of rows of the results of subqueries of the given query donot exceed
* the limit specified in the context or as the server default
*/
ROW_LIMIT,
ROW_LIMIT {
@Override
public ResultSerializationMode serializationMode()
{
return ResultSerializationMode.ROWS;
}
},
/**
* Subqueries limited by the BYTE_LIMIT are materialized as {@link org.apache.druid.frame.Frame}s on heap. Frames
@ -44,10 +52,18 @@ public class ClientQuerySegmentWalkerUtils
* Frames in the broker memory) of a given query do not exceed the limit specified in the context or as the server
* default
*/
MEMORY_LIMIT
MEMORY_LIMIT {
@Override
public ResultSerializationMode serializationMode()
{
return ResultSerializationMode.FRAMES;
}
};
public abstract ResultSerializationMode serializationMode();
}
/**
/**
* Returns the limit type to be used for a given subquery.
* It returns MEMORY_LIMIT only if:
* 1. The user has enabled the 'maxSubqueryBytes' explicitly in the query context or as the server default

View File

@ -52,6 +52,7 @@ import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChestTestHelper;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.ResultSerializationMode;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
@ -1721,7 +1722,8 @@ public class ClientQuerySegmentWalkerTest
.put(GroupByQueryConfig.CTX_KEY_APPLY_LIMIT_PUSH_DOWN, true)
.put(GroupingEngine.CTX_KEY_OUTERMOST, true)
.put(GroupingEngine.CTX_KEY_FUDGE_TIMESTAMP, "1979")
.put(QueryContexts.QUERY_RESOURCE_ID, "dummy");
.put(QueryContexts.QUERY_RESOURCE_ID, "dummy")
.put(ResultSerializationMode.CTX_SERIALIZATION_PARAMETER, "blast");
modifiedQuery = query.withOverriddenContext(contextBuilder.build());

View File

@ -35,12 +35,14 @@ import org.apache.druid.error.DruidException.Persona;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.hll.VersionOneHyperLogLogCollector;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.Evals;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Druids;
import org.apache.druid.query.JoinDataSource;
@ -982,23 +984,55 @@ public class BaseCalciteQueryTest extends CalciteTestBase
mismatchMessage(row, column),
(Float) expectedCell,
(Float) resultCell,
ASSERTION_EPSILON);
ASSERTION_EPSILON
);
} else if (expectedCell instanceof Double) {
assertEquals(
mismatchMessage(row, column),
(Double) expectedCell,
(Double) resultCell,
ASSERTION_EPSILON);
ASSERTION_EPSILON
);
} else if (expectedCell instanceof Object[] || expectedCell instanceof List) {
final Object[] expectedCellCasted = homogenizeArray(expectedCell);
final Object[] resultCellCasted = homogenizeArray(resultCell);
if (expectedCellCasted.length != resultCellCasted.length) {
throw new RE(
"Mismatched array lengths: expected[%s] with length[%d], actual[%s] with length[%d]",
Arrays.toString(expectedCellCasted),
expectedCellCasted.length,
Arrays.toString(resultCellCasted),
resultCellCasted.length
);
}
for (int i = 0; i < expectedCellCasted.length; ++i) {
validate(row, column, type, expectedCellCasted[i], resultCellCasted[i]);
}
} else {
EQUALS.validate(row, column, type, expectedCell, resultCell);
}
}
},
RELAX_NULLS_EPS {
@Override
void validate(int row, int column, ValueType type, Object expectedCell, Object resultCell)
{
if (expectedCell == null) {
if (resultCell == null) {
return;
}
expectedCell = NullHandling.defaultValueForType(type);
}
EQUALS_EPS.validate(row, column, type, expectedCell, resultCell);
}
},
/**
* Comparision which accepts 1000 units of least precision.
*/
EQUALS_RELATIVE_1000_ULPS {
static final int ASSERTION_ERROR_ULPS = 1000;
private static final int ASSERTION_ERROR_ULPS = 1000;
@Override
void validate(int row, int column, ValueType type, Object expectedCell, Object resultCell)
@ -1019,10 +1053,43 @@ public class BaseCalciteQueryTest extends CalciteTestBase
(Double) resultCell,
eps
);
} else if (expectedCell instanceof Object[] || expectedCell instanceof List) {
final Object[] expectedCellCasted = homogenizeArray(expectedCell);
final Object[] resultCellCasted = homogenizeArray(resultCell);
if (expectedCellCasted.length != resultCellCasted.length) {
throw new RE(
"Mismatched array lengths: expected[%s] with length[%d], actual[%s] with length[%d]",
Arrays.toString(expectedCellCasted),
expectedCellCasted.length,
Arrays.toString(resultCellCasted),
resultCellCasted.length
);
}
for (int i = 0; i < expectedCellCasted.length; ++i) {
validate(row, column, type, expectedCellCasted[i], resultCellCasted[i]);
}
} else {
EQUALS.validate(row, column, type, expectedCell, resultCell);
}
}
},
/**
* Relax nulls which accepts 1000 units of least precision.
*/
RELAX_NULLS_RELATIVE_1000_ULPS {
@Override
void validate(int row, int column, ValueType type, Object expectedCell, Object resultCell)
{
if (expectedCell == null) {
if (resultCell == null) {
return;
}
expectedCell = NullHandling.defaultValueForType(type);
}
EQUALS_RELATIVE_1000_ULPS.validate(row, column, type, expectedCell, resultCell);
}
};
abstract void validate(int row, int column, ValueType type, Object expectedCell, Object resultCell);
@ -1032,6 +1099,15 @@ public class BaseCalciteQueryTest extends CalciteTestBase
return StringUtils.format("column content mismatch at %d,%d", row, column);
}
private static Object[] homogenizeArray(Object array)
{
if (array instanceof Object[]) {
return (Object[]) array;
} else if (array instanceof List) {
return ExprEval.coerceListToArray((List) array, true).rhs;
}
throw new ISE("Found array[%s] of type[%s] which is not handled", array.toString(), array.getClass().getName());
}
}
/**

View File

@ -71,9 +71,17 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
private static final Map<String, Object> DEFAULT_QUERY_CONTEXT = ImmutableMap.of(
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
QueryContexts.ENABLE_DEBUG, true
QueryContexts.ENABLE_DEBUG, true,
QueryContexts.CTX_SQL_STRINGIFY_ARRAYS, false
);
private static final Map<String, Object> DEFAULT_QUERY_CONTEXT_WITH_SUBQUERY_BYTES =
ImmutableMap.<String, Object>builder()
.putAll(DEFAULT_QUERY_CONTEXT)
.put(QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000")
.put(QueryContexts.MAX_SUBQUERY_ROWS_KEY, "0")
.build();
public static Object[] parametersForWindowQueryTest() throws Exception
{
final URL windowFolderUrl = ClassLoader.getSystemResource("calcite/tests/window");
@ -161,7 +169,7 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
}
}
}
assertResultsValid(ResultMatchMode.RELAX_NULLS, input.expectedResults, results);
assertResultsValid(ResultMatchMode.RELAX_NULLS_EPS, input.expectedResults, results);
}
private void validateOperators(List<OperatorFactory> expectedOperators, List<OperatorFactory> currentOperators)
@ -223,6 +231,30 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
}
}
@MethodSource("parametersForWindowQueryTest")
@ParameterizedTest(name = "{0}")
@SuppressWarnings("unchecked")
public void windowQueryTestsWithSubqueryBytes(String filename) throws Exception
{
TestCase testCase = new TestCase(filename);
assumeTrue(testCase.getType() != TestType.failingTest);
if (testCase.getType() == TestType.operatorValidation) {
testBuilder()
.skipVectorize(true)
.sql(testCase.getSql())
.queryContext(
ImmutableMap.<String, Object>builder()
.putAll(DEFAULT_QUERY_CONTEXT_WITH_SUBQUERY_BYTES)
.putAll(testCase.getQueryContext())
.build()
)
.addCustomVerification(QueryVerification.ofResults(testCase))
.run();
}
}
@Test
public void testWithArrayConcat()
{
@ -237,14 +269,14 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
.expectedResults(
ResultMatchMode.RELAX_NULLS,
ImmutableList.of(
new Object[]{"Austria", null, "#de.wikipedia", "[\"abc\",\"#de.wikipedia\"]"},
new Object[]{"Republic of Korea", null, "#en.wikipedia", "[\"abc\",\"#de.wikipedia\",\"abc\",\"#en.wikipedia\",\"abc\",\"#ja.wikipedia\",\"abc\",\"#ko.wikipedia\"]"},
new Object[]{"Republic of Korea", null, "#ja.wikipedia", "[\"abc\",\"#de.wikipedia\",\"abc\",\"#en.wikipedia\",\"abc\",\"#ja.wikipedia\",\"abc\",\"#ko.wikipedia\"]"},
new Object[]{"Republic of Korea", null, "#ko.wikipedia", "[\"abc\",\"#de.wikipedia\",\"abc\",\"#en.wikipedia\",\"abc\",\"#ja.wikipedia\",\"abc\",\"#ko.wikipedia\"]"},
new Object[]{"Republic of Korea", "Seoul", "#ko.wikipedia", "[\"abc\",\"#ko.wikipedia\"]"},
new Object[]{"Austria", "Vienna", "#de.wikipedia", "[\"abc\",\"#de.wikipedia\",\"abc\",\"#es.wikipedia\",\"abc\",\"#tr.wikipedia\"]"},
new Object[]{"Austria", "Vienna", "#es.wikipedia", "[\"abc\",\"#de.wikipedia\",\"abc\",\"#es.wikipedia\",\"abc\",\"#tr.wikipedia\"]"},
new Object[]{"Austria", "Vienna", "#tr.wikipedia", "[\"abc\",\"#de.wikipedia\",\"abc\",\"#es.wikipedia\",\"abc\",\"#tr.wikipedia\"]"}
new Object[]{"Austria", null, "#de.wikipedia", ImmutableList.of("abc", "#de.wikipedia")},
new Object[]{"Republic of Korea", null, "#en.wikipedia", ImmutableList.of("abc", "#de.wikipedia", "abc", "#en.wikipedia", "abc", "#ja.wikipedia", "abc", "#ko.wikipedia")},
new Object[]{"Republic of Korea", null, "#ja.wikipedia", ImmutableList.of("abc", "#de.wikipedia", "abc", "#en.wikipedia", "abc", "#ja.wikipedia", "abc", "#ko.wikipedia")},
new Object[]{"Republic of Korea", null, "#ko.wikipedia", ImmutableList.of("abc", "#de.wikipedia", "abc", "#en.wikipedia", "abc", "#ja.wikipedia", "abc", "#ko.wikipedia")},
new Object[]{"Republic of Korea", "Seoul", "#ko.wikipedia", ImmutableList.of("abc", "#ko.wikipedia")},
new Object[]{"Austria", "Vienna", "#de.wikipedia", ImmutableList.of("abc", "#de.wikipedia", "abc", "#es.wikipedia", "abc", "#tr.wikipedia")},
new Object[]{"Austria", "Vienna", "#es.wikipedia", ImmutableList.of("abc", "#de.wikipedia", "abc", "#es.wikipedia", "abc", "#tr.wikipedia")},
new Object[]{"Austria", "Vienna", "#tr.wikipedia", ImmutableList.of("abc", "#de.wikipedia", "abc", "#es.wikipedia", "abc", "#tr.wikipedia")}
)
)
.run();

View File

@ -10,5 +10,5 @@ sql: |
ORDER BY d1, f1, m1
expectedResults:
- [2,"[1.0]","[1.0]","[1.0]"]
- [2,"[1.7]","[0.1]","[2.0]"]
- [2,[1.0],[1.0],[1.0]]
- [2,[1.7],[0.1],[2.0]]

View File

@ -9,5 +9,5 @@ sql: |
GROUP BY cityName
expectedResults:
- ["Horsching","[\"Horsching\"]"]
- ["Vienna","[\"Vienna\"]"]
- ["Horsching",["Horsching"]]
- ["Vienna",["Vienna"]]

View File

@ -1,8 +1,5 @@
type: "operatorValidation"
queryContext:
maxSubqueryBytes: 100000
sql: |
SELECT
__time