mirror of https://github.com/apache/druid.git
Window function on msq (#15470)
This PR aims to introduce Window functions on MSQ by doing the following: Introduce a Window querykit for handling window queries along with its factory and a processor for window queries If a window operator is present with a partition by clause, pushes the partition as a shuffle spec of the previous stage In presence of empty OVER() clause lets all operators loose on a single rac In presence of no empty OVER() clause, breaks down each window into individual stages Associated machinery to handle window functions in MSQ Introduced a separate hidden engine feature WINDOW_LEAF_OPERATOR which is set only for MSQ engine. In presence of this feature, the planner plans without the leaf operators by creating a window query over an inner scan query. In case of native this is set to false and the planner generates the leafOperators Guardrails around materialization Comprehensive UTs
This commit is contained in:
parent
7649957710
commit
524842a3bb
|
@ -62,3 +62,9 @@ properties, and the `indexSpec` [`tuningConfig`](../ingestion/ingestion-spec.md#
|
|||
- `EXTERN` with input sources that match large numbers of files may exhaust available memory on the controller task.
|
||||
|
||||
- `EXTERN` refers to external files. Use `FROM` to access `druid` input sources.
|
||||
|
||||
## `WINDOW` Function
|
||||
|
||||
- The maximum number of elements in a window cannot exceed a value of 100,000.
|
||||
- To avoid `leafOperators` in MSQ engine, window functions have an extra scan stage after the window stage for cases
|
||||
where native engine has a non-empty `leafOperator`.
|
||||
|
|
|
@ -76,7 +76,6 @@ import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
|
|||
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
|
||||
import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;
|
||||
import org.apache.druid.sql.calcite.filtration.Filtration;
|
||||
import org.apache.druid.sql.calcite.planner.PlannerContext;
|
||||
import org.apache.druid.sql.calcite.util.CacheTestHelperModule.ResultCacheMode;
|
||||
import org.apache.druid.sql.calcite.util.CalciteTests;
|
||||
import org.apache.druid.sql.calcite.util.TestDataBuilder;
|
||||
|
@ -1164,7 +1163,6 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
|
|||
public void testHllWithOrderedWindowing()
|
||||
{
|
||||
testBuilder()
|
||||
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
|
||||
.sql(
|
||||
"SELECT dim1,coalesce(cast(l1 as integer),-999),"
|
||||
+ " HLL_SKETCH_ESTIMATE( DS_HLL(dim1) OVER ( ORDER BY l1 ), true)"
|
||||
|
@ -1191,7 +1189,6 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
|
|||
skipVectorize();
|
||||
for (int i = 0; i < 2; i++) {
|
||||
testBuilder()
|
||||
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
|
||||
.sql(
|
||||
"SELECT "
|
||||
+ " TIME_FLOOR(__time, 'P1D') as dayLvl,\n"
|
||||
|
|
|
@ -167,6 +167,7 @@ import org.apache.druid.msq.querykit.MultiQueryKit;
|
|||
import org.apache.druid.msq.querykit.QueryKit;
|
||||
import org.apache.druid.msq.querykit.QueryKitUtils;
|
||||
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
|
||||
import org.apache.druid.msq.querykit.WindowOperatorQueryKit;
|
||||
import org.apache.druid.msq.querykit.groupby.GroupByQueryKit;
|
||||
import org.apache.druid.msq.querykit.results.ExportResultsFrameProcessorFactory;
|
||||
import org.apache.druid.msq.querykit.results.QueryResultFrameProcessorFactory;
|
||||
|
@ -186,6 +187,7 @@ import org.apache.druid.query.QueryContext;
|
|||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
import org.apache.druid.query.groupby.GroupByQuery;
|
||||
import org.apache.druid.query.groupby.GroupByQueryConfig;
|
||||
import org.apache.druid.query.operator.WindowOperatorQuery;
|
||||
import org.apache.druid.query.scan.ScanQuery;
|
||||
import org.apache.druid.segment.DimensionHandlerUtils;
|
||||
import org.apache.druid.segment.column.ColumnHolder;
|
||||
|
@ -1201,6 +1203,7 @@ public class ControllerImpl implements Controller
|
|||
ImmutableMap.<Class<? extends Query>, QueryKit>builder()
|
||||
.put(ScanQuery.class, new ScanQueryKit(context.jsonMapper()))
|
||||
.put(GroupByQuery.class, new GroupByQueryKit(context.jsonMapper()))
|
||||
.put(WindowOperatorQuery.class, new WindowOperatorQueryKit(context.jsonMapper()))
|
||||
.build();
|
||||
|
||||
return new MultiQueryKit(kitMap);
|
||||
|
@ -2769,7 +2772,6 @@ public class ControllerImpl implements Controller
|
|||
if (isFailOnEmptyInsertEnabled && Boolean.TRUE.equals(isShuffleStageOutputEmpty)) {
|
||||
throw new MSQException(new InsertCannotBeEmptyFault(task.getDataSource()));
|
||||
}
|
||||
|
||||
final ClusterByPartitions partitionBoundaries =
|
||||
queryKernel.getResultPartitionBoundariesForStage(shuffleStageId);
|
||||
|
||||
|
|
|
@ -96,4 +96,11 @@ public class Limits
|
|||
* Max number of partition buckets for ingestion queries.
|
||||
*/
|
||||
public static final int MAX_PARTITION_BUCKETS = 5_000;
|
||||
|
||||
/**
|
||||
* Max number of rows with the same key in a window. This acts as a guardrail for
|
||||
* data distribution with high cardinality
|
||||
*/
|
||||
public static final int MAX_ROWS_MATERIALIZED_IN_WINDOW = 100_000;
|
||||
|
||||
}
|
||||
|
|
|
@ -84,6 +84,7 @@ import org.apache.druid.msq.input.table.SegmentsInputSlice;
|
|||
import org.apache.druid.msq.input.table.TableInputSpec;
|
||||
import org.apache.druid.msq.kernel.NilExtraInfoHolder;
|
||||
import org.apache.druid.msq.querykit.InputNumberDataSource;
|
||||
import org.apache.druid.msq.querykit.WindowOperatorQueryFrameProcessorFactory;
|
||||
import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
|
||||
import org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessorFactory;
|
||||
import org.apache.druid.msq.querykit.groupby.GroupByPostShuffleFrameProcessorFactory;
|
||||
|
@ -159,6 +160,7 @@ public class MSQIndexingModule implements DruidModule
|
|||
NilExtraInfoHolder.class,
|
||||
SortMergeJoinFrameProcessorFactory.class,
|
||||
QueryResultFrameProcessorFactory.class,
|
||||
WindowOperatorQueryFrameProcessorFactory.class,
|
||||
ExportResultsFrameProcessorFactory.class,
|
||||
|
||||
// DataSource classes (note: ExternalDataSource is in MSQSqlModule)
|
||||
|
|
|
@ -0,0 +1,91 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.msq.indexing.error;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import org.apache.druid.msq.util.MultiStageQueryContext;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
@JsonTypeName(TooManyRowsInAWindowFault.CODE)
|
||||
public class TooManyRowsInAWindowFault extends BaseMSQFault
|
||||
{
|
||||
|
||||
static final String CODE = "TooManyRowsInAWindow";
|
||||
|
||||
private final int numRows;
|
||||
private final int maxRows;
|
||||
|
||||
@JsonCreator
|
||||
public TooManyRowsInAWindowFault(
|
||||
@JsonProperty("numRows") final int numRows,
|
||||
@JsonProperty("maxRows") final int maxRows
|
||||
)
|
||||
{
|
||||
super(
|
||||
CODE,
|
||||
"Too many rows in a window (requested = %d, max = %d). "
|
||||
+ " Try creating a window with a higher cardinality column or change the query shape."
|
||||
+ " Or you can change the max using query context param %s ."
|
||||
+ " Use it carefully as a higher value can lead to OutOfMemory errors. ",
|
||||
numRows,
|
||||
maxRows,
|
||||
MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW
|
||||
);
|
||||
this.numRows = numRows;
|
||||
this.maxRows = maxRows;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public int getNumRows()
|
||||
{
|
||||
return numRows;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public int getMaxRows()
|
||||
{
|
||||
return maxRows;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
if (!super.equals(o)) {
|
||||
return false;
|
||||
}
|
||||
TooManyRowsInAWindowFault that = (TooManyRowsInAWindowFault) o;
|
||||
return numRows == that.numRows && maxRows == that.maxRows;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(super.hashCode(), numRows, maxRows);
|
||||
}
|
||||
}
|
|
@ -43,6 +43,7 @@ import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
|
|||
import org.apache.druid.msq.kernel.StageDefinition;
|
||||
import org.apache.druid.msq.kernel.StageDefinitionBuilder;
|
||||
import org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessorFactory;
|
||||
import org.apache.druid.msq.util.MultiStageQueryContext;
|
||||
import org.apache.druid.query.DataSource;
|
||||
import org.apache.druid.query.FilteredDataSource;
|
||||
import org.apache.druid.query.InlineDataSource;
|
||||
|
@ -88,7 +89,6 @@ public class DataSourcePlan
|
|||
* of subqueries.
|
||||
*/
|
||||
private static final Map<String, Object> CONTEXT_MAP_NO_SEGMENT_GRANULARITY = new HashMap<>();
|
||||
|
||||
private static final Logger log = new Logger(DataSourcePlan.class);
|
||||
|
||||
static {
|
||||
|
@ -209,7 +209,8 @@ public class DataSourcePlan
|
|||
(QueryDataSource) dataSource,
|
||||
maxWorkerCount,
|
||||
minStageNumber,
|
||||
broadcast
|
||||
broadcast,
|
||||
queryContext
|
||||
);
|
||||
} else if (dataSource instanceof UnionDataSource) {
|
||||
return forUnion(
|
||||
|
@ -419,15 +420,25 @@ public class DataSourcePlan
|
|||
final QueryDataSource dataSource,
|
||||
final int maxWorkerCount,
|
||||
final int minStageNumber,
|
||||
final boolean broadcast
|
||||
final boolean broadcast,
|
||||
@Nullable final QueryContext parentContext
|
||||
)
|
||||
{
|
||||
// check if parentContext has a window operator
|
||||
final Map<String, Object> windowShuffleMap = new HashMap<>();
|
||||
if (parentContext != null && parentContext.containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) {
|
||||
windowShuffleMap.put(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL, parentContext.get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL));
|
||||
}
|
||||
final QueryDefinition subQueryDef = queryKit.makeQueryDefinition(
|
||||
queryId,
|
||||
|
||||
// Subqueries ignore SQL_INSERT_SEGMENT_GRANULARITY, even if set in the context. It's only used for the
|
||||
// outermost query, and setting it for the subquery makes us erroneously add bucketing where it doesn't belong.
|
||||
dataSource.getQuery().withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY),
|
||||
windowShuffleMap.isEmpty()
|
||||
? dataSource.getQuery()
|
||||
.withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY)
|
||||
: dataSource.getQuery()
|
||||
.withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY)
|
||||
.withOverriddenContext(windowShuffleMap),
|
||||
queryKit,
|
||||
ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount),
|
||||
maxWorkerCount,
|
||||
|
@ -683,7 +694,8 @@ public class DataSourcePlan
|
|||
(QueryDataSource) dataSource.getLeft(),
|
||||
maxWorkerCount,
|
||||
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
|
||||
false
|
||||
false,
|
||||
null
|
||||
);
|
||||
leftPlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll);
|
||||
|
||||
|
@ -696,7 +708,8 @@ public class DataSourcePlan
|
|||
(QueryDataSource) dataSource.getRight(),
|
||||
maxWorkerCount,
|
||||
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
|
||||
false
|
||||
false,
|
||||
null
|
||||
);
|
||||
rightPlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll);
|
||||
|
||||
|
|
|
@ -0,0 +1,520 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.msq.querykit;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Iterables;
|
||||
import it.unimi.dsi.fastutil.ints.IntSet;
|
||||
import org.apache.druid.frame.Frame;
|
||||
import org.apache.druid.frame.channel.FrameWithPartition;
|
||||
import org.apache.druid.frame.channel.ReadableFrameChannel;
|
||||
import org.apache.druid.frame.channel.WritableFrameChannel;
|
||||
import org.apache.druid.frame.processor.FrameProcessor;
|
||||
import org.apache.druid.frame.processor.FrameProcessors;
|
||||
import org.apache.druid.frame.processor.FrameRowTooLargeException;
|
||||
import org.apache.druid.frame.processor.ReturnOrAwait;
|
||||
import org.apache.druid.frame.read.FrameReader;
|
||||
import org.apache.druid.frame.write.FrameWriter;
|
||||
import org.apache.druid.frame.write.FrameWriterFactory;
|
||||
import org.apache.druid.java.util.common.Unit;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.msq.indexing.error.MSQException;
|
||||
import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault;
|
||||
import org.apache.druid.query.groupby.ResultRow;
|
||||
import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
|
||||
import org.apache.druid.query.operator.OffsetLimit;
|
||||
import org.apache.druid.query.operator.Operator;
|
||||
import org.apache.druid.query.operator.OperatorFactory;
|
||||
import org.apache.druid.query.operator.WindowOperatorQuery;
|
||||
import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.RowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.ColumnValueSelector;
|
||||
import org.apache.druid.segment.Cursor;
|
||||
import org.apache.druid.segment.column.RowSignature;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
|
||||
{
|
||||
private static final Logger log = new Logger(WindowOperatorQueryFrameProcessor.class);
|
||||
private final WindowOperatorQuery query;
|
||||
|
||||
private final List<OperatorFactory> operatorFactoryList;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final ArrayList<RowsAndColumns> frameRowsAndCols;
|
||||
private final ArrayList<RowsAndColumns> resultRowAndCols;
|
||||
private final ReadableFrameChannel inputChannel;
|
||||
private final WritableFrameChannel outputChannel;
|
||||
private final FrameWriterFactory frameWriterFactory;
|
||||
private final FrameReader frameReader;
|
||||
private final ArrayList<ResultRow> objectsOfASingleRac;
|
||||
private final int maxRowsMaterialized;
|
||||
List<Integer> partitionColsIndex;
|
||||
private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed
|
||||
private Cursor frameCursor = null;
|
||||
private Supplier<ResultRow> rowSupplierFromFrameCursor;
|
||||
private ResultRow outputRow = null;
|
||||
private FrameWriter frameWriter = null;
|
||||
private final boolean isOverEmpty;
|
||||
|
||||
public WindowOperatorQueryFrameProcessor(
|
||||
WindowOperatorQuery query,
|
||||
ReadableFrameChannel inputChannel,
|
||||
WritableFrameChannel outputChannel,
|
||||
FrameWriterFactory frameWriterFactory,
|
||||
FrameReader frameReader,
|
||||
ObjectMapper jsonMapper,
|
||||
final List<OperatorFactory> operatorFactoryList,
|
||||
final RowSignature rowSignature,
|
||||
final boolean isOverEmpty,
|
||||
final int maxRowsMaterializedInWindow
|
||||
)
|
||||
{
|
||||
this.inputChannel = inputChannel;
|
||||
this.outputChannel = outputChannel;
|
||||
this.frameWriterFactory = frameWriterFactory;
|
||||
this.operatorFactoryList = operatorFactoryList;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.frameReader = frameReader;
|
||||
this.query = query;
|
||||
this.frameRowsAndCols = new ArrayList<>();
|
||||
this.resultRowAndCols = new ArrayList<>();
|
||||
this.objectsOfASingleRac = new ArrayList<>();
|
||||
this.partitionColsIndex = new ArrayList<>();
|
||||
this.isOverEmpty = isOverEmpty;
|
||||
this.maxRowsMaterialized = maxRowsMaterializedInWindow;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ReadableFrameChannel> inputChannels()
|
||||
{
|
||||
return Collections.singletonList(inputChannel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<WritableFrameChannel> outputChannels()
|
||||
{
|
||||
return Collections.singletonList(outputChannel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
|
||||
{
|
||||
/*
|
||||
*
|
||||
* PARTITION BY A ORDER BY B
|
||||
*
|
||||
* Frame 1 -> rac1
|
||||
* A B
|
||||
* 1, 2
|
||||
* 1, 3
|
||||
* 2, 1 --> key changed
|
||||
* 2, 2
|
||||
*
|
||||
*
|
||||
* Frame 2 -> rac2
|
||||
* 3, 1 --> key changed
|
||||
* 3, 2
|
||||
* 3, 3
|
||||
* 3, 4
|
||||
*
|
||||
* Frame 3 -> rac3
|
||||
*
|
||||
* 3, 5
|
||||
* 3, 6
|
||||
* 4, 1 --> key changed
|
||||
* 4, 2
|
||||
*
|
||||
* In case of empty OVER clause, all these racs need to be added to a single rows and columns
|
||||
* to be processed. The way we can do this is to use a ConcatRowsAndColumns
|
||||
* ConcatRC [rac1, rac2, rac3]
|
||||
* Run all ops on this
|
||||
*
|
||||
*
|
||||
* The flow would look like:
|
||||
* 1. Validate if the operator has an empty OVER clause
|
||||
* 2. If 1 is true make a giant rows and columns (R&C) using concat as shown above
|
||||
* Let all operators run amok on that R&C
|
||||
* 3. If 1 is false
|
||||
* Read a frame
|
||||
* keep the older row in a class variable
|
||||
* check row by row and compare current with older row to check if partition boundary is reached
|
||||
* when frame partition by changes
|
||||
* create R&C for those particular set of columns, they would have the same partition key
|
||||
* output will be a single R&C
|
||||
* write to output channel
|
||||
*
|
||||
*
|
||||
* Future thoughts: {@link https://github.com/apache/druid/issues/16126}
|
||||
*
|
||||
* 1. We are writing 1 partition to each frame in this way. In case of low cardinality data
|
||||
* we will me making a large number of small frames. We can have a check to keep size of frame to a value
|
||||
* say 20k rows and keep on adding to the same pending frame and not create a new frame
|
||||
*
|
||||
* 2. Current approach with R&C and operators materialize a single R&C for processing. In case of data
|
||||
* with high cardinality a single R&C might be too big to consume. Same for the case of empty OVER() clause
|
||||
* Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with 2 passes of the data.
|
||||
* We might think to reimplement them in the MSQ way so that we do not have to materialize so much data
|
||||
*/
|
||||
|
||||
// Phase 1 of the execution
|
||||
// eagerly validate presence of empty OVER() clause
|
||||
if (isOverEmpty) {
|
||||
// if OVER() found
|
||||
// have to bring all data to a single executor for processing
|
||||
// convert each frame to rac
|
||||
// concat all the racs to make a giant rac
|
||||
// let all operators run on the giant rac when channel is finished
|
||||
if (inputChannel.canRead()) {
|
||||
final Frame frame = inputChannel.read();
|
||||
convertRowFrameToRowsAndColumns(frame);
|
||||
} else if (inputChannel.isFinished()) {
|
||||
runAllOpsOnMultipleRac(frameRowsAndCols);
|
||||
return ReturnOrAwait.returnObject(Unit.instance());
|
||||
} else {
|
||||
return ReturnOrAwait.awaitAll(inputChannels().size());
|
||||
}
|
||||
return ReturnOrAwait.runAgain();
|
||||
} else {
|
||||
// Aha, you found a PARTITION BY and maybe ORDER BY TO
|
||||
// PARTITION BY can also be on multiple keys
|
||||
// typically the last stage would already partition and sort for you
|
||||
// figure out frame boundaries and convert each distinct group to a rac
|
||||
// then run the windowing operator only on each rac
|
||||
if (frameCursor == null || frameCursor.isDone()) {
|
||||
if (readableInputs.isEmpty()) {
|
||||
return ReturnOrAwait.awaitAll(1);
|
||||
} else if (inputChannel.canRead()) {
|
||||
final Frame frame = inputChannel.read();
|
||||
frameCursor = FrameProcessors.makeCursor(frame, frameReader);
|
||||
final ColumnSelectorFactory frameColumnSelectorFactory = frameCursor.getColumnSelectorFactory();
|
||||
partitionColsIndex = findPartitionColumns(frameReader.signature());
|
||||
final Supplier<Object>[] fieldSuppliers = new Supplier[frameReader.signature().size()];
|
||||
for (int i = 0; i < fieldSuppliers.length; i++) {
|
||||
final ColumnValueSelector<?> selector =
|
||||
frameColumnSelectorFactory.makeColumnValueSelector(frameReader.signature().getColumnName(i));
|
||||
fieldSuppliers[i] = selector::getObject;
|
||||
|
||||
}
|
||||
rowSupplierFromFrameCursor = () -> {
|
||||
final ResultRow row = ResultRow.create(fieldSuppliers.length);
|
||||
for (int i = 0; i < fieldSuppliers.length; i++) {
|
||||
row.set(i, fieldSuppliers[i].get());
|
||||
}
|
||||
return row;
|
||||
};
|
||||
} else if (inputChannel.isFinished()) {
|
||||
// reaached end of channel
|
||||
// if there is data remaining
|
||||
// write it into a rac
|
||||
// and run operators on it
|
||||
if (!objectsOfASingleRac.isEmpty()) {
|
||||
if (objectsOfASingleRac.size() > maxRowsMaterialized) {
|
||||
throw new MSQException(new TooManyRowsInAWindowFault(objectsOfASingleRac.size(), maxRowsMaterialized));
|
||||
}
|
||||
RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromResultRow(
|
||||
objectsOfASingleRac,
|
||||
frameReader.signature()
|
||||
);
|
||||
runAllOpsOnSingleRac(rac);
|
||||
objectsOfASingleRac.clear();
|
||||
}
|
||||
return ReturnOrAwait.returnObject(Unit.instance());
|
||||
} else {
|
||||
return ReturnOrAwait.runAgain();
|
||||
}
|
||||
}
|
||||
while (!frameCursor.isDone()) {
|
||||
final ResultRow currentRow = rowSupplierFromFrameCursor.get();
|
||||
if (outputRow == null) {
|
||||
outputRow = currentRow;
|
||||
objectsOfASingleRac.add(currentRow);
|
||||
} else if (comparePartitionKeys(outputRow, currentRow, partitionColsIndex)) {
|
||||
// if they have the same partition key
|
||||
// keep adding them after checking
|
||||
// guardrails
|
||||
if (objectsOfASingleRac.size() > maxRowsMaterialized) {
|
||||
throw new MSQException(new TooManyRowsInAWindowFault(
|
||||
objectsOfASingleRac.size(),
|
||||
maxRowsMaterialized
|
||||
));
|
||||
}
|
||||
objectsOfASingleRac.add(currentRow);
|
||||
|
||||
} else {
|
||||
// key change noted
|
||||
// create rac from the rows seen before
|
||||
// run the operators on these rows and columns
|
||||
// clean up the object to hold the new rows only
|
||||
if (objectsOfASingleRac.size() > maxRowsMaterialized) {
|
||||
throw new MSQException(new TooManyRowsInAWindowFault(
|
||||
objectsOfASingleRac.size(),
|
||||
maxRowsMaterialized
|
||||
));
|
||||
}
|
||||
RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromResultRow(
|
||||
objectsOfASingleRac,
|
||||
frameReader.signature()
|
||||
);
|
||||
runAllOpsOnSingleRac(rac);
|
||||
objectsOfASingleRac.clear();
|
||||
outputRow = currentRow.copy();
|
||||
return ReturnOrAwait.runAgain();
|
||||
}
|
||||
frameCursor.advance();
|
||||
}
|
||||
}
|
||||
return ReturnOrAwait.runAgain();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param singleRac Use this {@link RowsAndColumns} as a single input for the operators to be run
|
||||
*/
|
||||
private void runAllOpsOnSingleRac(RowsAndColumns singleRac)
|
||||
{
|
||||
Operator op = new Operator()
|
||||
{
|
||||
@Nullable
|
||||
@Override
|
||||
public Closeable goOrContinue(Closeable continuationObject, Receiver receiver)
|
||||
{
|
||||
receiver.push(singleRac);
|
||||
if (singleRac.numRows() > maxRowsMaterialized) {
|
||||
throw new MSQException(new TooManyRowsInAWindowFault(singleRac.numRows(), maxRowsMaterialized));
|
||||
}
|
||||
receiver.completed();
|
||||
return null;
|
||||
}
|
||||
};
|
||||
runOperatorsAfterThis(op);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param listOfRacs Concat this list of {@link RowsAndColumns} to a {@link ConcatRowsAndColumns} to use as a single input for the operators to be run
|
||||
*/
|
||||
private void runAllOpsOnMultipleRac(ArrayList<RowsAndColumns> listOfRacs)
|
||||
{
|
||||
Operator op = new Operator()
|
||||
{
|
||||
@Nullable
|
||||
@Override
|
||||
public Closeable goOrContinue(Closeable continuationObject, Receiver receiver)
|
||||
{
|
||||
RowsAndColumns rac = new ConcatRowsAndColumns(listOfRacs);
|
||||
if (rac.numRows() > maxRowsMaterialized) {
|
||||
throw new MSQException(new TooManyRowsInAWindowFault(rac.numRows(), maxRowsMaterialized));
|
||||
}
|
||||
receiver.push(rac);
|
||||
receiver.completed();
|
||||
return null;
|
||||
}
|
||||
};
|
||||
runOperatorsAfterThis(op);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param op Base operator for the operators to be run. Other operators are wrapped under this to run
|
||||
*/
|
||||
private void runOperatorsAfterThis(Operator op)
|
||||
{
|
||||
for (OperatorFactory of : operatorFactoryList) {
|
||||
op = of.wrap(op);
|
||||
}
|
||||
Operator.go(op, new Operator.Receiver()
|
||||
{
|
||||
@Override
|
||||
public Operator.Signal push(RowsAndColumns rac)
|
||||
{
|
||||
resultRowAndCols.add(rac);
|
||||
return Operator.Signal.GO;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void completed()
|
||||
{
|
||||
try {
|
||||
// resultRowsAndCols has reference to frameRowsAndCols
|
||||
// due to the chain of calls across the ops
|
||||
// so we can clear after writing to output
|
||||
flushAllRowsAndCols(resultRowAndCols);
|
||||
frameRowsAndCols.clear();
|
||||
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
finally {
|
||||
frameRowsAndCols.clear();
|
||||
resultRowAndCols.clear();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @param resultRowAndCols Flush the list of {@link RowsAndColumns} to a frame
|
||||
* @throws IOException
|
||||
*/
|
||||
private void flushAllRowsAndCols(ArrayList<RowsAndColumns> resultRowAndCols) throws IOException
|
||||
{
|
||||
RowsAndColumns rac = new ConcatRowsAndColumns(resultRowAndCols);
|
||||
AtomicInteger rowId = new AtomicInteger(0);
|
||||
createFrameWriterIfNeeded(rac, rowId);
|
||||
writeRacToFrame(rac, rowId);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param rac The frame writer to write this {@link RowsAndColumns} object
|
||||
* @param rowId RowId to get the column selector factory from the {@link RowsAndColumns} object
|
||||
*/
|
||||
private void createFrameWriterIfNeeded(RowsAndColumns rac, AtomicInteger rowId)
|
||||
{
|
||||
if (frameWriter == null) {
|
||||
final ColumnSelectorFactoryMaker csfm = ColumnSelectorFactoryMaker.fromRAC(rac);
|
||||
final ColumnSelectorFactory frameWriterColumnSelectorFactory = csfm.make(rowId);
|
||||
frameWriter = frameWriterFactory.newFrameWriter(frameWriterColumnSelectorFactory);
|
||||
currentAllocatorCapacity = frameWriterFactory.allocatorCapacity();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param rac {@link RowsAndColumns} to be written to frame
|
||||
* @param rowId Counter to keep track of how many rows are added
|
||||
* @throws IOException
|
||||
*/
|
||||
public void writeRacToFrame(RowsAndColumns rac, AtomicInteger rowId) throws IOException
|
||||
{
|
||||
final int numRows = rac.numRows();
|
||||
rowId.set(0);
|
||||
while (rowId.get() < numRows) {
|
||||
final boolean didAddToFrame = frameWriter.addSelection();
|
||||
if (didAddToFrame) {
|
||||
rowId.incrementAndGet();
|
||||
} else if (frameWriter.getNumRows() == 0) {
|
||||
throw new FrameRowTooLargeException(currentAllocatorCapacity);
|
||||
} else {
|
||||
flushFrameWriter();
|
||||
return;
|
||||
}
|
||||
}
|
||||
flushFrameWriter();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanup() throws IOException
|
||||
{
|
||||
FrameProcessors.closeAll(inputChannels(), outputChannels(), frameWriter);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Number of rows flushed to the output channel
|
||||
* @throws IOException
|
||||
*/
|
||||
private long flushFrameWriter() throws IOException
|
||||
{
|
||||
if (frameWriter == null || frameWriter.getNumRows() <= 0) {
|
||||
if (frameWriter != null) {
|
||||
frameWriter.close();
|
||||
frameWriter = null;
|
||||
}
|
||||
return 0;
|
||||
} else {
|
||||
final Frame frame = Frame.wrap(frameWriter.toByteArray());
|
||||
Iterables.getOnlyElement(outputChannels()).write(new FrameWithPartition(frame, FrameWithPartition.NO_PARTITION));
|
||||
frameWriter.close();
|
||||
frameWriter = null;
|
||||
return frame.numRows();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param frame Row based frame to be converted to a {@link RowsAndColumns} object
|
||||
* Throw an exception if the resultant rac used goes above the guardrail value
|
||||
*/
|
||||
private void convertRowFrameToRowsAndColumns(Frame frame)
|
||||
{
|
||||
final RowSignature signature = frameReader.signature();
|
||||
RowBasedFrameRowsAndColumns frameRowsAndColumns = new RowBasedFrameRowsAndColumns(frame, signature);
|
||||
LazilyDecoratedRowsAndColumns ldrc = new LazilyDecoratedRowsAndColumns(
|
||||
frameRowsAndColumns,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
OffsetLimit.limit(Integer.MAX_VALUE),
|
||||
null,
|
||||
null
|
||||
);
|
||||
// check if existing + newly added rows exceed guardrails
|
||||
if (frameRowsAndCols.size() + ldrc.numRows() > maxRowsMaterialized) {
|
||||
throw new MSQException(new TooManyRowsInAWindowFault(
|
||||
frameRowsAndCols.size() + ldrc.numRows(),
|
||||
maxRowsMaterialized
|
||||
));
|
||||
}
|
||||
frameRowsAndCols.add(ldrc);
|
||||
}
|
||||
|
||||
private List<Integer> findPartitionColumns(RowSignature rowSignature)
|
||||
{
|
||||
List<Integer> indexList = new ArrayList<>();
|
||||
for (OperatorFactory of : operatorFactoryList) {
|
||||
if (of instanceof NaivePartitioningOperatorFactory) {
|
||||
for (String s : ((NaivePartitioningOperatorFactory) of).getPartitionColumns()) {
|
||||
indexList.add(rowSignature.indexOf(s));
|
||||
}
|
||||
}
|
||||
}
|
||||
return indexList;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* Compare two rows based only the columns in the partitionIndices
|
||||
* In case the parition indices is empty or null compare entire row
|
||||
*
|
||||
*/
|
||||
private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List<Integer> partitionIndices)
|
||||
{
|
||||
if (partitionIndices == null || partitionIndices.isEmpty()) {
|
||||
return row1.equals(row2);
|
||||
} else {
|
||||
int match = 0;
|
||||
for (int i : partitionIndices) {
|
||||
if (Objects.equals(row1.get(i), row2.get(i))) {
|
||||
match++;
|
||||
}
|
||||
}
|
||||
return match == partitionIndices.size();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,195 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.msq.querykit;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Iterables;
|
||||
import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
|
||||
import it.unimi.dsi.fastutil.ints.Int2ObjectSortedMap;
|
||||
import org.apache.druid.frame.processor.FrameProcessor;
|
||||
import org.apache.druid.frame.processor.OutputChannel;
|
||||
import org.apache.druid.frame.processor.OutputChannelFactory;
|
||||
import org.apache.druid.frame.processor.OutputChannels;
|
||||
import org.apache.druid.frame.processor.manager.ProcessorManagers;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.java.util.common.guava.Sequences;
|
||||
import org.apache.druid.msq.counters.CounterTracker;
|
||||
import org.apache.druid.msq.input.InputSlice;
|
||||
import org.apache.druid.msq.input.InputSliceReader;
|
||||
import org.apache.druid.msq.input.ReadableInput;
|
||||
import org.apache.druid.msq.input.stage.ReadablePartition;
|
||||
import org.apache.druid.msq.input.stage.StageInputSlice;
|
||||
import org.apache.druid.msq.kernel.FrameContext;
|
||||
import org.apache.druid.msq.kernel.ProcessorsAndChannels;
|
||||
import org.apache.druid.msq.kernel.StageDefinition;
|
||||
import org.apache.druid.query.operator.OperatorFactory;
|
||||
import org.apache.druid.query.operator.WindowOperatorQuery;
|
||||
import org.apache.druid.segment.column.RowSignature;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
@JsonTypeName("window")
|
||||
public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessorFactory
|
||||
{
|
||||
private final WindowOperatorQuery query;
|
||||
private final List<OperatorFactory> operatorList;
|
||||
private final RowSignature stageRowSignature;
|
||||
private final boolean isEmptyOver;
|
||||
private final int maxRowsMaterializedInWindow;
|
||||
|
||||
@JsonCreator
|
||||
public WindowOperatorQueryFrameProcessorFactory(
|
||||
@JsonProperty("query") WindowOperatorQuery query,
|
||||
@JsonProperty("operatorList") List<OperatorFactory> operatorFactoryList,
|
||||
@JsonProperty("stageRowSignature") RowSignature stageRowSignature,
|
||||
@JsonProperty("emptyOver") boolean emptyOver,
|
||||
@JsonProperty("maxRowsMaterializedInWindow") int maxRowsMaterializedInWindow
|
||||
)
|
||||
{
|
||||
this.query = Preconditions.checkNotNull(query, "query");
|
||||
this.operatorList = Preconditions.checkNotNull(operatorFactoryList, "bad operator");
|
||||
this.stageRowSignature = Preconditions.checkNotNull(stageRowSignature, "stageSignature");
|
||||
this.isEmptyOver = emptyOver;
|
||||
this.maxRowsMaterializedInWindow = maxRowsMaterializedInWindow;
|
||||
}
|
||||
|
||||
@JsonProperty("query")
|
||||
public WindowOperatorQuery getQuery()
|
||||
{
|
||||
return query;
|
||||
}
|
||||
|
||||
@JsonProperty("operatorList")
|
||||
public List<OperatorFactory> getOperators()
|
||||
{
|
||||
return operatorList;
|
||||
}
|
||||
|
||||
@JsonProperty("stageRowSignature")
|
||||
public RowSignature getSignature()
|
||||
{
|
||||
return stageRowSignature;
|
||||
}
|
||||
|
||||
@JsonProperty("emptyOver")
|
||||
public boolean isEmptyOverFound()
|
||||
{
|
||||
return isEmptyOver;
|
||||
}
|
||||
|
||||
@JsonProperty("maxRowsMaterializedInWindow")
|
||||
public int getMaxRowsMaterializedInWindow()
|
||||
{
|
||||
return maxRowsMaterializedInWindow;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProcessorsAndChannels<Object, Long> makeProcessors(
|
||||
StageDefinition stageDefinition,
|
||||
int workerNumber,
|
||||
List<InputSlice> inputSlices,
|
||||
InputSliceReader inputSliceReader,
|
||||
@Nullable Object extra,
|
||||
OutputChannelFactory outputChannelFactory,
|
||||
FrameContext frameContext,
|
||||
int maxOutstandingProcessors,
|
||||
CounterTracker counters,
|
||||
Consumer<Throwable> warningPublisher
|
||||
)
|
||||
{
|
||||
// Expecting a single input slice from some prior stage.
|
||||
final StageInputSlice slice = (StageInputSlice) Iterables.getOnlyElement(inputSlices);
|
||||
final Int2ObjectSortedMap<OutputChannel> outputChannels = new Int2ObjectAVLTreeMap<>();
|
||||
|
||||
for (final ReadablePartition partition : slice.getPartitions()) {
|
||||
outputChannels.computeIfAbsent(
|
||||
partition.getPartitionNumber(),
|
||||
i -> {
|
||||
try {
|
||||
return outputChannelFactory.openChannel(i);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
final Sequence<ReadableInput> readableInputs =
|
||||
Sequences.simple(inputSliceReader.attach(0, slice, counters, warningPublisher));
|
||||
|
||||
final Sequence<FrameProcessor<Object>> processors = readableInputs.map(
|
||||
readableInput -> {
|
||||
final OutputChannel outputChannel =
|
||||
outputChannels.get(readableInput.getStagePartition().getPartitionNumber());
|
||||
|
||||
return new WindowOperatorQueryFrameProcessor(
|
||||
query,
|
||||
readableInput.getChannel(),
|
||||
outputChannel.getWritableChannel(),
|
||||
stageDefinition.createFrameWriterFactory(outputChannel.getFrameMemoryAllocator()),
|
||||
readableInput.getChannelFrameReader(),
|
||||
frameContext.jsonMapper(),
|
||||
operatorList,
|
||||
stageRowSignature,
|
||||
isEmptyOver,
|
||||
maxRowsMaterializedInWindow
|
||||
);
|
||||
}
|
||||
);
|
||||
|
||||
return new ProcessorsAndChannels<>(
|
||||
ProcessorManagers.of(processors),
|
||||
OutputChannels.wrapReadOnly(ImmutableList.copyOf(outputChannels.values()))
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
WindowOperatorQueryFrameProcessorFactory that = (WindowOperatorQueryFrameProcessorFactory) o;
|
||||
return isEmptyOver == that.isEmptyOver
|
||||
&& maxRowsMaterializedInWindow == that.maxRowsMaterializedInWindow
|
||||
&& Objects.equals(query, that.query)
|
||||
&& Objects.equals(operatorList, that.operatorList)
|
||||
&& Objects.equals(stageRowSignature, that.stageRowSignature);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(query, operatorList, stageRowSignature, isEmptyOver, maxRowsMaterializedInWindow);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,250 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.msq.querykit;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.druid.frame.key.ClusterBy;
|
||||
import org.apache.druid.frame.key.KeyColumn;
|
||||
import org.apache.druid.frame.key.KeyOrder;
|
||||
import org.apache.druid.msq.exec.Limits;
|
||||
import org.apache.druid.msq.input.stage.StageInputSpec;
|
||||
import org.apache.druid.msq.kernel.HashShuffleSpec;
|
||||
import org.apache.druid.msq.kernel.QueryDefinition;
|
||||
import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
|
||||
import org.apache.druid.msq.kernel.ShuffleSpec;
|
||||
import org.apache.druid.msq.kernel.StageDefinition;
|
||||
import org.apache.druid.msq.util.MultiStageQueryContext;
|
||||
import org.apache.druid.query.Query;
|
||||
import org.apache.druid.query.operator.ColumnWithDirection;
|
||||
import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
|
||||
import org.apache.druid.query.operator.NaiveSortOperatorFactory;
|
||||
import org.apache.druid.query.operator.OperatorFactory;
|
||||
import org.apache.druid.query.operator.WindowOperatorQuery;
|
||||
import org.apache.druid.query.operator.window.WindowOperatorFactory;
|
||||
import org.apache.druid.segment.column.RowSignature;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
|
||||
{
|
||||
private final ObjectMapper jsonMapper;
|
||||
|
||||
public WindowOperatorQueryKit(ObjectMapper jsonMapper)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueryDefinition makeQueryDefinition(
|
||||
String queryId,
|
||||
WindowOperatorQuery originalQuery,
|
||||
QueryKit<Query<?>> queryKit,
|
||||
ShuffleSpecFactory resultShuffleSpecFactory,
|
||||
int maxWorkerCount,
|
||||
int minStageNumber
|
||||
)
|
||||
{
|
||||
// need to validate query first
|
||||
// populate the group of operators to be processed as each stage
|
||||
// the size of the operators is the number of serialized stages
|
||||
// later we should also check if these can be parallelized
|
||||
// check there is an empty over clause or not
|
||||
List<List<OperatorFactory>> operatorList = new ArrayList<>();
|
||||
boolean isEmptyOverFound = ifEmptyOverPresentInWindowOperstors(originalQuery, operatorList);
|
||||
|
||||
ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount);
|
||||
// add this shuffle spec to the last stage of the inner query
|
||||
|
||||
final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder().queryId(queryId);
|
||||
if (nextShuffleSpec != null) {
|
||||
final ClusterBy windowClusterBy = nextShuffleSpec.clusterBy();
|
||||
originalQuery = (WindowOperatorQuery) originalQuery.withOverriddenContext(ImmutableMap.of(
|
||||
MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL,
|
||||
windowClusterBy
|
||||
));
|
||||
}
|
||||
final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource(
|
||||
queryKit,
|
||||
queryId,
|
||||
originalQuery.context(),
|
||||
originalQuery.getDataSource(),
|
||||
originalQuery.getQuerySegmentSpec(),
|
||||
originalQuery.getFilter(),
|
||||
null,
|
||||
maxWorkerCount,
|
||||
minStageNumber,
|
||||
false
|
||||
);
|
||||
|
||||
dataSourcePlan.getSubQueryDefBuilder().ifPresent(queryDefBuilder::addAll);
|
||||
|
||||
final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber());
|
||||
final WindowOperatorQuery queryToRun = (WindowOperatorQuery) originalQuery.withDataSource(dataSourcePlan.getNewDataSource());
|
||||
final int maxRowsMaterialized;
|
||||
RowSignature rowSignature = queryToRun.getRowSignature();
|
||||
if (originalQuery.context() != null && originalQuery.context().containsKey(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW)) {
|
||||
maxRowsMaterialized = (int) originalQuery.context()
|
||||
.get(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW);
|
||||
} else {
|
||||
maxRowsMaterialized = Limits.MAX_ROWS_MATERIALIZED_IN_WINDOW;
|
||||
}
|
||||
|
||||
|
||||
if (isEmptyOverFound) {
|
||||
// empty over clause found
|
||||
// moving everything to a single partition
|
||||
queryDefBuilder.add(
|
||||
StageDefinition.builder(firstStageNumber)
|
||||
.inputs(new StageInputSpec(firstStageNumber - 1))
|
||||
.signature(rowSignature)
|
||||
.maxWorkerCount(maxWorkerCount)
|
||||
.shuffleSpec(null)
|
||||
.processorFactory(new WindowOperatorQueryFrameProcessorFactory(
|
||||
queryToRun,
|
||||
queryToRun.getOperators(),
|
||||
rowSignature,
|
||||
true,
|
||||
maxRowsMaterialized
|
||||
))
|
||||
);
|
||||
} else {
|
||||
// there are multiple windows present in the query
|
||||
// Create stages for each window in the query
|
||||
// These stages will be serialized
|
||||
// the partition by clause of the next window will be the shuffle key for the previous window
|
||||
RowSignature.Builder bob = RowSignature.builder();
|
||||
final int numberOfWindows = operatorList.size();
|
||||
final int baseSize = rowSignature.size() - numberOfWindows;
|
||||
for (int i = 0; i < baseSize; i++) {
|
||||
bob.add(rowSignature.getColumnName(i), rowSignature.getColumnType(i).get());
|
||||
}
|
||||
|
||||
for (int i = 0; i < numberOfWindows; i++) {
|
||||
bob.add(rowSignature.getColumnName(baseSize + i), rowSignature.getColumnType(baseSize + i).get()).build();
|
||||
// find the shuffle spec of the next stage
|
||||
// if it is the last stage set the next shuffle spec to single partition
|
||||
if (i + 1 == numberOfWindows) {
|
||||
nextShuffleSpec = ShuffleSpecFactories.singlePartition()
|
||||
.build(ClusterBy.none(), false);
|
||||
} else {
|
||||
nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(i + 1), maxWorkerCount);
|
||||
}
|
||||
|
||||
final RowSignature intermediateSignature = bob.build();
|
||||
final RowSignature stageRowSignature;
|
||||
if (nextShuffleSpec == null) {
|
||||
stageRowSignature = intermediateSignature;
|
||||
} else {
|
||||
stageRowSignature = QueryKitUtils.sortableSignature(
|
||||
intermediateSignature,
|
||||
nextShuffleSpec.clusterBy().getColumns()
|
||||
);
|
||||
}
|
||||
|
||||
queryDefBuilder.add(
|
||||
StageDefinition.builder(firstStageNumber + i)
|
||||
.inputs(new StageInputSpec(firstStageNumber + i - 1))
|
||||
.signature(stageRowSignature)
|
||||
.maxWorkerCount(maxWorkerCount)
|
||||
.shuffleSpec(nextShuffleSpec)
|
||||
.processorFactory(new WindowOperatorQueryFrameProcessorFactory(
|
||||
queryToRun,
|
||||
operatorList.get(i),
|
||||
stageRowSignature,
|
||||
false,
|
||||
maxRowsMaterialized
|
||||
))
|
||||
);
|
||||
}
|
||||
}
|
||||
return queryDefBuilder.queryId(queryId).build();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param originalQuery
|
||||
* @param operatorList
|
||||
* @return true if the operator List has a partitioning operator with an empty OVER clause, false otherwise
|
||||
*/
|
||||
private boolean ifEmptyOverPresentInWindowOperstors(
|
||||
WindowOperatorQuery originalQuery,
|
||||
List<List<OperatorFactory>> operatorList
|
||||
)
|
||||
{
|
||||
final List<OperatorFactory> operators = originalQuery.getOperators();
|
||||
List<OperatorFactory> operatorFactoryList = new ArrayList<>();
|
||||
for (OperatorFactory of : operators) {
|
||||
operatorFactoryList.add(of);
|
||||
if (of instanceof WindowOperatorFactory) {
|
||||
operatorList.add(operatorFactoryList);
|
||||
operatorFactoryList = new ArrayList<>();
|
||||
} else if (of instanceof NaivePartitioningOperatorFactory) {
|
||||
if (((NaivePartitioningOperatorFactory) of).getPartitionColumns().isEmpty()) {
|
||||
operatorList.clear();
|
||||
operatorList.add(originalQuery.getOperators());
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private ShuffleSpec findShuffleSpecForNextWindow(List<OperatorFactory> operatorFactories, int maxWorkerCount)
|
||||
{
|
||||
NaivePartitioningOperatorFactory partition = null;
|
||||
NaiveSortOperatorFactory sort = null;
|
||||
List<KeyColumn> keyColsOfWindow = new ArrayList<>();
|
||||
for (OperatorFactory of : operatorFactories) {
|
||||
if (of instanceof NaivePartitioningOperatorFactory) {
|
||||
partition = (NaivePartitioningOperatorFactory) of;
|
||||
} else if (of instanceof NaiveSortOperatorFactory) {
|
||||
sort = (NaiveSortOperatorFactory) of;
|
||||
}
|
||||
}
|
||||
Map<String, ColumnWithDirection.Direction> colMap = new HashMap<>();
|
||||
if (sort != null) {
|
||||
for (ColumnWithDirection sortColumn : sort.getSortColumns()) {
|
||||
colMap.put(sortColumn.getColumn(), sortColumn.getDirection());
|
||||
}
|
||||
}
|
||||
assert partition != null;
|
||||
if (partition.getPartitionColumns().isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
for (String partitionColumn : partition.getPartitionColumns()) {
|
||||
KeyColumn kc;
|
||||
if (colMap.containsKey(partitionColumn)) {
|
||||
if (colMap.get(partitionColumn) == ColumnWithDirection.Direction.ASC) {
|
||||
kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING);
|
||||
} else {
|
||||
kc = new KeyColumn(partitionColumn, KeyOrder.DESCENDING);
|
||||
}
|
||||
} else {
|
||||
kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING);
|
||||
}
|
||||
keyColsOfWindow.add(kc);
|
||||
}
|
||||
return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), maxWorkerCount);
|
||||
}
|
||||
}
|
|
@ -174,7 +174,6 @@ public class GroupByPostShuffleFrameProcessor implements FrameProcessor<Object>
|
|||
|
||||
while (!frameCursor.isDone()) {
|
||||
final ResultRow currentRow = rowSupplierFromFrameCursor.get();
|
||||
|
||||
if (outputRow == null) {
|
||||
outputRow = currentRow.copy();
|
||||
} else if (compareFn.compare(outputRow, currentRow) == 0) {
|
||||
|
|
|
@ -28,8 +28,10 @@ import org.apache.druid.java.util.common.ISE;
|
|||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||
import org.apache.druid.msq.input.stage.StageInputSpec;
|
||||
import org.apache.druid.msq.kernel.HashShuffleSpec;
|
||||
import org.apache.druid.msq.kernel.QueryDefinition;
|
||||
import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
|
||||
import org.apache.druid.msq.kernel.ShuffleSpec;
|
||||
import org.apache.druid.msq.kernel.StageDefinition;
|
||||
import org.apache.druid.msq.querykit.DataSourcePlan;
|
||||
import org.apache.druid.msq.querykit.QueryKit;
|
||||
|
@ -37,6 +39,7 @@ import org.apache.druid.msq.querykit.QueryKitUtils;
|
|||
import org.apache.druid.msq.querykit.ShuffleSpecFactories;
|
||||
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
|
||||
import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
|
||||
import org.apache.druid.msq.util.MultiStageQueryContext;
|
||||
import org.apache.druid.query.DimensionComparisonUtils;
|
||||
import org.apache.druid.query.Query;
|
||||
import org.apache.druid.query.dimension.DimensionSpec;
|
||||
|
@ -62,6 +65,7 @@ public class GroupByQueryKit implements QueryKit<GroupByQuery>
|
|||
this.jsonMapper = jsonMapper;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public QueryDefinition makeQueryDefinition(
|
||||
final String queryId,
|
||||
|
@ -164,39 +168,102 @@ public class GroupByQueryKit implements QueryKit<GroupByQuery>
|
|||
partitionBoost
|
||||
);
|
||||
|
||||
queryDefBuilder.add(
|
||||
StageDefinition.builder(firstStageNumber + 1)
|
||||
.inputs(new StageInputSpec(firstStageNumber))
|
||||
.signature(resultSignature)
|
||||
.maxWorkerCount(maxWorkerCount)
|
||||
.shuffleSpec(
|
||||
shuffleSpecFactoryPostAggregation != null
|
||||
? shuffleSpecFactoryPostAggregation.build(resultClusterBy, false)
|
||||
: null
|
||||
)
|
||||
.processorFactory(new GroupByPostShuffleFrameProcessorFactory(queryToRun))
|
||||
);
|
||||
final ShuffleSpec nextShuffleWindowSpec = getShuffleSpecForNextWindow(originalQuery, maxWorkerCount);
|
||||
|
||||
if (doLimitOrOffset) {
|
||||
final DefaultLimitSpec limitSpec = (DefaultLimitSpec) queryToRun.getLimitSpec();
|
||||
if (nextShuffleWindowSpec == null) {
|
||||
queryDefBuilder.add(
|
||||
StageDefinition.builder(firstStageNumber + 2)
|
||||
.inputs(new StageInputSpec(firstStageNumber + 1))
|
||||
StageDefinition.builder(firstStageNumber + 1)
|
||||
.inputs(new StageInputSpec(firstStageNumber))
|
||||
.signature(resultSignature)
|
||||
.maxWorkerCount(1)
|
||||
.shuffleSpec(null) // no shuffling should be required after a limit processor.
|
||||
.processorFactory(
|
||||
new OffsetLimitFrameProcessorFactory(
|
||||
limitSpec.getOffset(),
|
||||
limitSpec.isLimited() ? (long) limitSpec.getLimit() : null
|
||||
)
|
||||
.maxWorkerCount(maxWorkerCount)
|
||||
.shuffleSpec(
|
||||
shuffleSpecFactoryPostAggregation != null
|
||||
? shuffleSpecFactoryPostAggregation.build(resultClusterBy, false)
|
||||
: null
|
||||
)
|
||||
.processorFactory(new GroupByPostShuffleFrameProcessorFactory(queryToRun))
|
||||
);
|
||||
|
||||
if (doLimitOrOffset) {
|
||||
final DefaultLimitSpec limitSpec = (DefaultLimitSpec) queryToRun.getLimitSpec();
|
||||
queryDefBuilder.add(
|
||||
StageDefinition.builder(firstStageNumber + 2)
|
||||
.inputs(new StageInputSpec(firstStageNumber + 1))
|
||||
.signature(resultSignature)
|
||||
.maxWorkerCount(1)
|
||||
.shuffleSpec(null) // no shuffling should be required after a limit processor.
|
||||
.processorFactory(
|
||||
new OffsetLimitFrameProcessorFactory(
|
||||
limitSpec.getOffset(),
|
||||
limitSpec.isLimited() ? (long) limitSpec.getLimit() : null
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
} else {
|
||||
final RowSignature stageSignature;
|
||||
// sort the signature to make sure the prefix is aligned
|
||||
stageSignature = QueryKitUtils.sortableSignature(
|
||||
resultSignature,
|
||||
nextShuffleWindowSpec.clusterBy().getColumns()
|
||||
);
|
||||
|
||||
|
||||
queryDefBuilder.add(
|
||||
StageDefinition.builder(firstStageNumber + 1)
|
||||
.inputs(new StageInputSpec(firstStageNumber))
|
||||
.signature(stageSignature)
|
||||
.maxWorkerCount(maxWorkerCount)
|
||||
.shuffleSpec(doLimitOrOffset ? (shuffleSpecFactoryPostAggregation != null
|
||||
? shuffleSpecFactoryPostAggregation.build(
|
||||
resultClusterBy,
|
||||
false
|
||||
)
|
||||
: null) : nextShuffleWindowSpec)
|
||||
.processorFactory(new GroupByPostShuffleFrameProcessorFactory(queryToRun))
|
||||
);
|
||||
if (doLimitOrOffset) {
|
||||
final DefaultLimitSpec limitSpec = (DefaultLimitSpec) queryToRun.getLimitSpec();
|
||||
queryDefBuilder.add(
|
||||
StageDefinition.builder(firstStageNumber + 2)
|
||||
.inputs(new StageInputSpec(firstStageNumber + 1))
|
||||
.signature(resultSignature)
|
||||
.maxWorkerCount(1)
|
||||
.shuffleSpec(null)
|
||||
.processorFactory(
|
||||
new OffsetLimitFrameProcessorFactory(
|
||||
limitSpec.getOffset(),
|
||||
limitSpec.isLimited() ? (long) limitSpec.getLimit() : null
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return queryDefBuilder.queryId(queryId).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param originalQuery which has the context for the next shuffle if that's present in the next window
|
||||
* @param maxWorkerCount max worker count
|
||||
* @return shuffle spec without partition boosting for next stage, null if there is no partition by for next window
|
||||
*/
|
||||
private ShuffleSpec getShuffleSpecForNextWindow(GroupByQuery originalQuery, int maxWorkerCount)
|
||||
{
|
||||
final ShuffleSpec nextShuffleWindowSpec;
|
||||
if (originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) {
|
||||
final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext()
|
||||
.get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL);
|
||||
nextShuffleWindowSpec = new HashShuffleSpec(
|
||||
windowClusterBy,
|
||||
maxWorkerCount
|
||||
);
|
||||
} else {
|
||||
nextShuffleWindowSpec = null;
|
||||
}
|
||||
return nextShuffleWindowSpec;
|
||||
}
|
||||
|
||||
/**
|
||||
* Intermediate signature of a particular {@link GroupByQuery}. Does not include post-aggregators, and all
|
||||
* aggregations are nonfinalized.
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.druid.msq.querykit.QueryKit;
|
|||
import org.apache.druid.msq.querykit.QueryKitUtils;
|
||||
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
|
||||
import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
|
||||
import org.apache.druid.msq.util.MultiStageQueryContext;
|
||||
import org.apache.druid.query.Query;
|
||||
import org.apache.druid.query.scan.ScanQuery;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
|
@ -137,9 +138,27 @@ public class ScanQueryKit implements QueryKit<ScanQuery>
|
|||
);
|
||||
}
|
||||
|
||||
// Add partition boosting column.
|
||||
clusterByColumns.add(new KeyColumn(QueryKitUtils.PARTITION_BOOST_COLUMN, KeyOrder.ASCENDING));
|
||||
signatureBuilder.add(QueryKitUtils.PARTITION_BOOST_COLUMN, ColumnType.LONG);
|
||||
// Update partition by of next window
|
||||
final RowSignature signatureSoFar = signatureBuilder.build();
|
||||
boolean addShuffle = true;
|
||||
if (originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) {
|
||||
final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext()
|
||||
.get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL);
|
||||
for (KeyColumn c : windowClusterBy.getColumns()) {
|
||||
if (!signatureSoFar.contains(c.columnName())) {
|
||||
addShuffle = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (addShuffle) {
|
||||
clusterByColumns.addAll(windowClusterBy.getColumns());
|
||||
}
|
||||
} else {
|
||||
// Add partition boosting column.
|
||||
clusterByColumns.add(new KeyColumn(QueryKitUtils.PARTITION_BOOST_COLUMN, KeyOrder.ASCENDING));
|
||||
signatureBuilder.add(QueryKitUtils.PARTITION_BOOST_COLUMN, ColumnType.LONG);
|
||||
}
|
||||
|
||||
|
||||
final ClusterBy clusterBy =
|
||||
QueryKitUtils.clusterByWithSegmentGranularity(new ClusterBy(clusterByColumns, 0), segmentGranularity);
|
||||
|
|
|
@ -127,10 +127,11 @@ public class MSQTaskSqlEngine implements SqlEngine
|
|||
case TOPN_QUERY:
|
||||
case TIME_BOUNDARY_QUERY:
|
||||
case GROUPING_SETS:
|
||||
case WINDOW_FUNCTIONS:
|
||||
case ALLOW_TOP_LEVEL_UNION_ALL:
|
||||
case GROUPBY_IMPLICITLY_SORTS:
|
||||
return false;
|
||||
case WINDOW_FUNCTIONS:
|
||||
case WINDOW_LEAF_OPERATOR:
|
||||
case UNNEST:
|
||||
case CAN_SELECT:
|
||||
case CAN_INSERT:
|
||||
|
|
|
@ -89,6 +89,9 @@ import java.util.stream.Collectors;
|
|||
* {@link TaskLockType}. If the flag is not set, msq uses {@link TaskLockType#EXCLUSIVE} for replace queries and
|
||||
* {@link TaskLockType#SHARED} for insert queries.
|
||||
*
|
||||
* <li><b>maxRowsMaterializedInWindow</b>: Query context that specifies the largest window size that can be processed
|
||||
* using window functions in MSQ. This is to ensure guardrails using window function in MSQ.
|
||||
*
|
||||
* </ol>
|
||||
**/
|
||||
public class MultiStageQueryContext
|
||||
|
@ -154,6 +157,10 @@ public class MultiStageQueryContext
|
|||
public static final String CTX_ARRAY_INGEST_MODE = "arrayIngestMode";
|
||||
public static final ArrayIngestMode DEFAULT_ARRAY_INGEST_MODE = ArrayIngestMode.MVD;
|
||||
|
||||
public static final String NEXT_WINDOW_SHUFFLE_COL = "__windowShuffleCol";
|
||||
|
||||
public static final String MAX_ROWS_MATERIALIZED_IN_WINDOW = "maxRowsMaterializedInWindow";
|
||||
|
||||
public static final String CTX_SKIP_TYPE_VERIFICATION = "skipTypeVerification";
|
||||
|
||||
private static final Pattern LOOKS_LIKE_JSON_ARRAY = Pattern.compile("^\\s*\\[.*", Pattern.DOTALL);
|
||||
|
|
|
@ -2459,8 +2459,8 @@ public class MSQSelectTest extends MSQTestBase
|
|||
.build();
|
||||
|
||||
RowSignature resultSignature1 = RowSignature.builder()
|
||||
.add("dim3", ColumnType.STRING)
|
||||
.build();
|
||||
.add("dim3", ColumnType.STRING)
|
||||
.build();
|
||||
|
||||
RowSignature outputSignature = RowSignature.builder()
|
||||
.add("d3", ColumnType.STRING)
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -177,6 +177,7 @@ import org.apache.druid.sql.calcite.util.CalciteTests;
|
|||
import org.apache.druid.sql.calcite.util.LookylooModule;
|
||||
import org.apache.druid.sql.calcite.util.QueryFrameworkUtils;
|
||||
import org.apache.druid.sql.calcite.util.SqlTestFramework;
|
||||
import org.apache.druid.sql.calcite.util.TestDataBuilder;
|
||||
import org.apache.druid.sql.calcite.view.InProcessViewManager;
|
||||
import org.apache.druid.sql.guice.SqlBindings;
|
||||
import org.apache.druid.storage.StorageConfig;
|
||||
|
@ -222,6 +223,7 @@ import java.util.stream.Collectors;
|
|||
|
||||
import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE1;
|
||||
import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE2;
|
||||
import static org.apache.druid.sql.calcite.util.CalciteTests.WIKIPEDIA;
|
||||
import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS1;
|
||||
import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS2;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
|
@ -657,6 +659,9 @@ public class MSQTestBase extends BaseCalciteQueryTest
|
|||
.rows(ROWS2)
|
||||
.buildMMappedIndex();
|
||||
break;
|
||||
case WIKIPEDIA:
|
||||
index = TestDataBuilder.makeWikipediaIndex(newTempFolder());
|
||||
break;
|
||||
default:
|
||||
throw new ISE("Cannot query segment %s in test runner", segmentId);
|
||||
|
||||
|
|
|
@ -21,7 +21,7 @@ package org.apache.druid.frame.segment;
|
|||
|
||||
import org.apache.druid.frame.Frame;
|
||||
import org.apache.druid.frame.read.FrameReader;
|
||||
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
|
||||
import org.apache.druid.segment.CloseableShapeshifter;
|
||||
import org.apache.druid.segment.QueryableIndex;
|
||||
import org.apache.druid.segment.Segment;
|
||||
|
@ -87,7 +87,7 @@ public class FrameSegment implements Segment
|
|||
public <T> T as(@Nonnull Class<T> clazz)
|
||||
{
|
||||
if (CloseableShapeshifter.class.equals(clazz)) {
|
||||
return (T) new FrameRowsAndColumns(frame, frameReader.signature());
|
||||
return (T) new ColumnBasedFrameRowsAndColumns(frame, frameReader.signature());
|
||||
}
|
||||
return Segment.super.as(clazz);
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ import java.nio.ByteOrder;
|
|||
/**
|
||||
* Writer for {@link org.apache.druid.frame.Frame}. See that class for format information.
|
||||
*
|
||||
* Generally obtained through a {@link FrameWriters#makeFrameWriterFactory}.
|
||||
* Generally obtained through a {@link FrameWriters#makeFrameWriterFactory}
|
||||
*/
|
||||
public interface FrameWriter extends Closeable
|
||||
{
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.druid.query.QueryRunnerFactory;
|
|||
import org.apache.druid.query.QueryToolChest;
|
||||
import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.RowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
|
||||
import org.apache.druid.segment.Segment;
|
||||
import org.apache.druid.segment.column.RowSignature;
|
||||
|
@ -112,7 +112,7 @@ public class WindowOperatorQueryQueryRunnerFactory implements QueryRunnerFactory
|
|||
sigBob.add(column, input.findColumn(column).toAccessor().getType());
|
||||
}
|
||||
|
||||
return new FrameRowsAndColumns(Frame.wrap(frameBytes), sigBob.build());
|
||||
return new ColumnBasedFrameRowsAndColumns(Frame.wrap(frameBytes), sigBob.build());
|
||||
}
|
||||
return input;
|
||||
}
|
||||
|
|
|
@ -122,6 +122,4 @@ public class WindowFramedAggregateProcessor implements Processor
|
|||
WindowFramedAggregateProcessor other = (WindowFramedAggregateProcessor) obj;
|
||||
return Arrays.equals(aggregations, other.aggregations) && Objects.equals(frame, other.frame);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -40,7 +40,7 @@ import org.apache.druid.query.operator.ColumnWithDirection;
|
|||
import org.apache.druid.query.operator.OffsetLimit;
|
||||
import org.apache.druid.query.rowsandcols.column.Column;
|
||||
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
|
||||
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
|
||||
import org.apache.druid.query.rowsandcols.semantic.DefaultRowsAndColumnsDecorator;
|
||||
import org.apache.druid.query.rowsandcols.semantic.RowsAndColumnsDecorator;
|
||||
|
@ -169,7 +169,7 @@ public class LazilyDecoratedRowsAndColumns implements RowsAndColumns
|
|||
if (thePair == null) {
|
||||
reset(new EmptyRowsAndColumns());
|
||||
} else {
|
||||
reset(new FrameRowsAndColumns(Frame.wrap(thePair.lhs), thePair.rhs));
|
||||
reset(new ColumnBasedFrameRowsAndColumns(Frame.wrap(thePair.lhs), thePair.rhs));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -225,9 +225,9 @@ public class LazilyDecoratedRowsAndColumns implements RowsAndColumns
|
|||
cols = base.getColumnNames();
|
||||
} else {
|
||||
cols = ImmutableList.<String>builder()
|
||||
.addAll(base.getColumnNames())
|
||||
.addAll(virtualColumns.getColumnNames())
|
||||
.build();
|
||||
.addAll(base.getColumnNames())
|
||||
.addAll(virtualColumns.getColumnNames())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
AtomicReference<RowSignature> siggy = new AtomicReference<>(null);
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.druid.query.rowsandcols;
|
|||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.query.groupby.ResultRow;
|
||||
import org.apache.druid.query.rowsandcols.column.Column;
|
||||
import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
|
||||
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
|
||||
|
@ -29,6 +30,7 @@ import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
|
|||
import org.apache.druid.segment.column.ColumnType;
|
||||
import org.apache.druid.segment.column.RowSignature;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
@ -80,6 +82,24 @@ public class MapOfColumnsRowsAndColumns implements RowsAndColumns
|
|||
);
|
||||
}
|
||||
|
||||
public static MapOfColumnsRowsAndColumns fromResultRow(ArrayList<ResultRow> objs, RowSignature signature)
|
||||
{
|
||||
final Builder bob = builder();
|
||||
if (!objs.isEmpty()) {
|
||||
Object[][] columnOriented = new Object[objs.get(0).length()][objs.size()];
|
||||
for (int i = 0; i < objs.size(); ++i) {
|
||||
for (int j = 0; j < objs.get(i).length(); ++j) {
|
||||
columnOriented[j][i] = objs.get(i).get(j);
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < signature.size(); ++i) {
|
||||
final ColumnType type = signature.getColumnType(i).orElse(null);
|
||||
bob.add(signature.getColumnName(i), columnOriented[i], type);
|
||||
}
|
||||
}
|
||||
return bob.build();
|
||||
}
|
||||
|
||||
public static MapOfColumnsRowsAndColumns fromRowObjects(Object[][] objs, RowSignature signature)
|
||||
{
|
||||
final Builder bob = builder();
|
||||
|
|
|
@ -30,7 +30,7 @@ import org.apache.druid.java.util.common.Intervals;
|
|||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.query.rowsandcols.column.Column;
|
||||
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
|
||||
import org.apache.druid.segment.CloseableShapeshifter;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.Cursor;
|
||||
|
@ -138,7 +138,7 @@ public class StorageAdapterRowsAndColumns implements CloseableShapeshifter, Rows
|
|||
return new EmptyRowsAndColumns();
|
||||
} else {
|
||||
final byte[] bytes = writer.toByteArray();
|
||||
return new FrameRowsAndColumns(Frame.wrap(bytes), rowSignature);
|
||||
return new ColumnBasedFrameRowsAndColumns(Frame.wrap(bytes), rowSignature);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,105 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.rowsandcols.concrete;
|
||||
|
||||
import org.apache.druid.frame.Frame;
|
||||
import org.apache.druid.frame.FrameType;
|
||||
import org.apache.druid.frame.read.FrameReader;
|
||||
import org.apache.druid.frame.read.columnar.FrameColumnReaders;
|
||||
import org.apache.druid.frame.segment.FrameStorageAdapter;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.query.rowsandcols.RowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.column.Column;
|
||||
import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
|
||||
import org.apache.druid.segment.CloseableShapeshifter;
|
||||
import org.apache.druid.segment.StorageAdapter;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
import org.apache.druid.segment.column.RowSignature;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedHashMap;
|
||||
|
||||
public class ColumnBasedFrameRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter
|
||||
{
|
||||
private final Frame frame;
|
||||
private final RowSignature signature;
|
||||
private final LinkedHashMap<String, Column> colCache = new LinkedHashMap<>();
|
||||
|
||||
public ColumnBasedFrameRowsAndColumns(Frame frame, RowSignature signature)
|
||||
{
|
||||
this.frame = FrameType.COLUMNAR.ensureType(frame);
|
||||
this.signature = signature;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<String> getColumnNames()
|
||||
{
|
||||
return signature.getColumnNames();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int numRows()
|
||||
{
|
||||
return frame.numRows();
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public Column findColumn(String name)
|
||||
{
|
||||
// Use contains so that we can negative cache.
|
||||
if (!colCache.containsKey(name)) {
|
||||
final int columnIndex = signature.indexOf(name);
|
||||
if (columnIndex < 0) {
|
||||
colCache.put(name, null);
|
||||
} else {
|
||||
final ColumnType columnType = signature
|
||||
.getColumnType(columnIndex)
|
||||
.orElseThrow(() -> new ISE("just got the id, why is columnType not there?"));
|
||||
|
||||
colCache.put(name, FrameColumnReaders.create(name, columnIndex, columnType).readRACColumn(frame));
|
||||
}
|
||||
}
|
||||
return colCache.get(name);
|
||||
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Nullable
|
||||
@Override
|
||||
public <T> T as(Class<T> clazz)
|
||||
{
|
||||
if (StorageAdapter.class.equals(clazz)) {
|
||||
return (T) new FrameStorageAdapter(frame, FrameReader.create(signature), Intervals.ETERNITY);
|
||||
}
|
||||
if (WireTransferable.class.equals(clazz)) {
|
||||
return (T) this;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
// nothing to close
|
||||
}
|
||||
}
|
|
@ -37,15 +37,15 @@ import javax.annotation.Nullable;
|
|||
import java.util.Collection;
|
||||
import java.util.LinkedHashMap;
|
||||
|
||||
public class FrameRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter
|
||||
public class RowBasedFrameRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter
|
||||
{
|
||||
private final Frame frame;
|
||||
private final RowSignature signature;
|
||||
private final LinkedHashMap<String, Column> colCache = new LinkedHashMap<>();
|
||||
|
||||
public FrameRowsAndColumns(Frame frame, RowSignature signature)
|
||||
public RowBasedFrameRowsAndColumns(Frame frame, RowSignature signature)
|
||||
{
|
||||
this.frame = FrameType.COLUMNAR.ensureType(frame);
|
||||
this.frame = FrameType.ROW_BASED.ensureType(frame);
|
||||
this.signature = signature;
|
||||
}
|
||||
|
||||
|
@ -65,7 +65,6 @@ public class FrameRowsAndColumns implements RowsAndColumns, AutoCloseable, Close
|
|||
@Override
|
||||
public Column findColumn(String name)
|
||||
{
|
||||
// Use contains so that we can negative cache.
|
||||
if (!colCache.containsKey(name)) {
|
||||
final int columnIndex = signature.indexOf(name);
|
||||
if (columnIndex < 0) {
|
||||
|
@ -79,7 +78,6 @@ public class FrameRowsAndColumns implements RowsAndColumns, AutoCloseable, Close
|
|||
}
|
||||
}
|
||||
return colCache.get(name);
|
||||
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
|
@ -22,8 +22,8 @@ package org.apache.druid.query.rowsandcols;
|
|||
import com.google.common.collect.Lists;
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumnsTest;
|
||||
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumnsTest;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -67,7 +67,7 @@ public abstract class RowsAndColumnsTestBase
|
|||
new Object[]{ArrayListRowsAndColumns.class, ArrayListRowsAndColumnsTest.MAKER},
|
||||
new Object[]{ConcatRowsAndColumns.class, ConcatRowsAndColumnsTest.MAKER},
|
||||
new Object[]{RearrangedRowsAndColumns.class, RearrangedRowsAndColumnsTest.MAKER},
|
||||
new Object[]{FrameRowsAndColumns.class, FrameRowsAndColumnsTest.MAKER},
|
||||
new Object[]{ColumnBasedFrameRowsAndColumns.class, ColumnBasedFrameRowsAndColumnsTest.MAKER},
|
||||
new Object[]{StorageAdapterRowsAndColumns.class, StorageAdapterRowsAndColumnsTest.MAKER}
|
||||
);
|
||||
}
|
||||
|
|
|
@ -19,8 +19,8 @@
|
|||
|
||||
package org.apache.druid.query.rowsandcols;
|
||||
|
||||
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumnsTest;
|
||||
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumnsTest;
|
||||
import org.apache.druid.segment.StorageAdapter;
|
||||
|
||||
import java.util.function.Function;
|
||||
|
@ -38,7 +38,7 @@ public class StorageAdapterRowsAndColumnsTest extends RowsAndColumnsTestBase
|
|||
|
||||
private static StorageAdapterRowsAndColumns buildFrame(MapOfColumnsRowsAndColumns input)
|
||||
{
|
||||
FrameRowsAndColumns fRAC = FrameRowsAndColumnsTest.buildFrame(input);
|
||||
ColumnBasedFrameRowsAndColumns fRAC = ColumnBasedFrameRowsAndColumnsTest.buildFrame(input);
|
||||
return new StorageAdapterRowsAndColumns(fRAC.as(StorageAdapter.class));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,24 +25,24 @@ import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
|
|||
import org.apache.druid.query.rowsandcols.RowsAndColumnsTestBase;
|
||||
import java.util.function.Function;
|
||||
|
||||
public class FrameRowsAndColumnsTest extends RowsAndColumnsTestBase
|
||||
public class ColumnBasedFrameRowsAndColumnsTest extends RowsAndColumnsTestBase
|
||||
{
|
||||
public FrameRowsAndColumnsTest()
|
||||
public ColumnBasedFrameRowsAndColumnsTest()
|
||||
{
|
||||
super(FrameRowsAndColumns.class);
|
||||
super(ColumnBasedFrameRowsAndColumns.class);
|
||||
}
|
||||
|
||||
public static Function<MapOfColumnsRowsAndColumns, FrameRowsAndColumns> MAKER = input -> {
|
||||
public static Function<MapOfColumnsRowsAndColumns, ColumnBasedFrameRowsAndColumns> MAKER = input -> {
|
||||
|
||||
return buildFrame(input);
|
||||
};
|
||||
|
||||
public static FrameRowsAndColumns buildFrame(MapOfColumnsRowsAndColumns input)
|
||||
public static ColumnBasedFrameRowsAndColumns buildFrame(MapOfColumnsRowsAndColumns input)
|
||||
{
|
||||
LazilyDecoratedRowsAndColumns rac = new LazilyDecoratedRowsAndColumns(input, null, null, null, OffsetLimit.limit(Integer.MAX_VALUE), null, null);
|
||||
|
||||
rac.numRows(); // materialize
|
||||
|
||||
return (FrameRowsAndColumns) rac.getBase();
|
||||
return (ColumnBasedFrameRowsAndColumns) rac.getBase();
|
||||
}
|
||||
}
|
|
@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.granularity.Granularities;
|
|||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.query.filter.Filter;
|
||||
import org.apache.druid.query.filter.InDimFilter;
|
||||
import org.apache.druid.query.groupby.ResultRow;
|
||||
import org.apache.druid.query.operator.ColumnWithDirection;
|
||||
import org.apache.druid.query.operator.OffsetLimit;
|
||||
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
|
||||
|
@ -129,6 +130,91 @@ public class RowsAndColumnsDecoratorTest extends SemanticTestBase
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDecorationWithListOfResultRows()
|
||||
{
|
||||
ArrayList<ResultRow> resultRowArrayList = new ArrayList<>();
|
||||
|
||||
resultRowArrayList.add(ResultRow.of(1L, 1L, 123L, 0L));
|
||||
resultRowArrayList.add(ResultRow.of(2L, 2L, 456L, 1L));
|
||||
resultRowArrayList.add(ResultRow.of(3L, 3L, 789L, 2L));
|
||||
resultRowArrayList.add(ResultRow.of(4L, 4L, 123L, 3L));
|
||||
resultRowArrayList.add(ResultRow.of(5L, 5L, 456L, 4L));
|
||||
resultRowArrayList.add(ResultRow.of(6L, 6L, 789L, 5L));
|
||||
resultRowArrayList.add(ResultRow.of(7L, 7L, 123L, 6L));
|
||||
resultRowArrayList.add(ResultRow.of(8L, 8L, 456L, 7L));
|
||||
resultRowArrayList.add(ResultRow.of(9L, 9L, 789L, 8L));
|
||||
resultRowArrayList.add(ResultRow.of(10L, 10L, 123L, 9L));
|
||||
resultRowArrayList.add(ResultRow.of(11L, 11L, 456L, 10L));
|
||||
resultRowArrayList.add(ResultRow.of(12L, 12L, 789L, 11L));
|
||||
|
||||
RowSignature siggy = RowSignature.builder()
|
||||
.add("__time", ColumnType.LONG)
|
||||
.add("dim", ColumnType.LONG)
|
||||
.add("val", ColumnType.LONG)
|
||||
.add("arrayIndex", ColumnType.LONG)
|
||||
.build();
|
||||
|
||||
final RowsAndColumns base = make(MapOfColumnsRowsAndColumns.fromResultRow(resultRowArrayList, siggy));
|
||||
|
||||
Object[][] vals = new Object[][]{
|
||||
{1L, 1L, 123L, 0L},
|
||||
{2L, 2L, 456L, 1L},
|
||||
{3L, 3L, 789L, 2L},
|
||||
{4L, 4L, 123L, 3L},
|
||||
{5L, 5L, 456L, 4L},
|
||||
{6L, 6L, 789L, 5L},
|
||||
{7L, 7L, 123L, 6L},
|
||||
{8L, 8L, 456L, 7L},
|
||||
{9L, 9L, 789L, 8L},
|
||||
{10L, 10L, 123L, 9L},
|
||||
{11L, 11L, 456L, 10L},
|
||||
{12L, 12L, 789L, 11L},
|
||||
};
|
||||
|
||||
Interval[] intervals = new Interval[]{Intervals.utc(0, 6), Intervals.utc(6, 13), Intervals.utc(4, 8)};
|
||||
Filter[] filters = new Filter[]{
|
||||
new InDimFilter("dim", ImmutableSet.of("a", "b", "c", "e", "g")),
|
||||
new AndFilter(Arrays.asList(
|
||||
new InDimFilter("dim", ImmutableSet.of("a", "b", "g")),
|
||||
new SelectorFilter("val", "789")
|
||||
)),
|
||||
new OrFilter(Arrays.asList(
|
||||
new SelectorFilter("dim", "b"),
|
||||
new SelectorFilter("val", "789")
|
||||
)),
|
||||
new SelectorFilter("dim", "f")
|
||||
};
|
||||
int[] limits = new int[]{3, 6, 100};
|
||||
List<ColumnWithDirection>[] orderings = new List[]{
|
||||
Arrays.asList(ColumnWithDirection.descending("__time"), ColumnWithDirection.ascending("dim")),
|
||||
Collections.singletonList(ColumnWithDirection.ascending("val"))
|
||||
};
|
||||
|
||||
// call the same method multiple times
|
||||
|
||||
for (int i = 0; i <= intervals.length; ++i) {
|
||||
Interval interval = (i == 0 ? null : intervals[i - 1]);
|
||||
for (int j = 0; j < filters.length; ++j) {
|
||||
Filter filter = (j == 0 ? null : filters[j - 1]);
|
||||
for (int k = 0; k <= limits.length; ++k) {
|
||||
int limit = (k == 0 ? -1 : limits[k - 1]);
|
||||
for (int l = 0; l <= orderings.length; ++l) {
|
||||
validateDecorated(
|
||||
base,
|
||||
siggy,
|
||||
vals,
|
||||
interval,
|
||||
filter,
|
||||
OffsetLimit.limit(limit),
|
||||
l == 0 ? null : orderings[l - 1]
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void validateDecorated(
|
||||
RowsAndColumns base,
|
||||
RowSignature siggy,
|
||||
|
|
|
@ -65,7 +65,6 @@ import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
|
|||
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
|
||||
import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
|
||||
import org.apache.druid.sql.calcite.parser.ExternalDestinationSqlIdentifier;
|
||||
import org.apache.druid.sql.calcite.run.EngineFeature;
|
||||
import org.apache.druid.sql.calcite.table.DatasourceTable;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
|
||||
|
@ -622,15 +621,6 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
|
|||
@Override
|
||||
public void validateCall(SqlCall call, SqlValidatorScope scope)
|
||||
{
|
||||
if (call.getKind() == SqlKind.OVER) {
|
||||
if (!plannerContext.featureAvailable(EngineFeature.WINDOW_FUNCTIONS)) {
|
||||
throw buildCalciteContextException(
|
||||
StringUtils.format(
|
||||
"The query contains window functions; To run these window functions, specify [%s] in query context.",
|
||||
PlannerContext.CTX_ENABLE_WINDOW_FNS),
|
||||
call);
|
||||
}
|
||||
}
|
||||
if (call.getKind() == SqlKind.NULLS_FIRST) {
|
||||
SqlNode op0 = call.getOperandList().get(0);
|
||||
if (op0.getKind() == SqlKind.DESCENDING) {
|
||||
|
|
|
@ -84,10 +84,6 @@ public class PlannerContext
|
|||
*/
|
||||
public static final String CTX_SQL_OUTER_LIMIT = "sqlOuterLimit";
|
||||
|
||||
/**
|
||||
* Key to enable window functions.
|
||||
*/
|
||||
public static final String CTX_ENABLE_WINDOW_FNS = "enableWindowing";
|
||||
|
||||
/**
|
||||
* Context key for {@link PlannerContext#isUseBoundsAndSelectors()}.
|
||||
|
@ -574,16 +570,10 @@ public class PlannerContext
|
|||
/**
|
||||
* Checks if the current {@link SqlEngine} supports a particular feature.
|
||||
*
|
||||
* When executing a specific query, use this method instead of {@link SqlEngine#featureAvailable(EngineFeature)},
|
||||
* because it also verifies feature flags such as {@link #CTX_ENABLE_WINDOW_FNS}.
|
||||
* When executing a specific query, use this method instead of {@link SqlEngine#featureAvailable(EngineFeature)}
|
||||
*/
|
||||
public boolean featureAvailable(final EngineFeature feature)
|
||||
{
|
||||
if (feature == EngineFeature.WINDOW_FUNCTIONS &&
|
||||
!QueryContexts.getAsBoolean(CTX_ENABLE_WINDOW_FNS, queryContext.get(CTX_ENABLE_WINDOW_FNS), false)) {
|
||||
// Short-circuit: feature requires context flag.
|
||||
return false;
|
||||
}
|
||||
if (feature == EngineFeature.TIME_BOUNDARY_QUERY && !queryContext().isTimeBoundaryPlanningEnabled()) {
|
||||
// Short-circuit: feature requires context flag.
|
||||
return false;
|
||||
|
|
|
@ -1479,13 +1479,18 @@ public class DruidQuery
|
|||
.addAll(windowing.getOperators())
|
||||
.build();
|
||||
}
|
||||
// if planning in native set to null
|
||||
// if planning in MSQ set to empty list
|
||||
// This would cause MSQ queries to plan as
|
||||
// Window over an inner scan and avoid
|
||||
// leaf operators
|
||||
return new WindowOperatorQuery(
|
||||
dataSource,
|
||||
new LegacySegmentSpec(Intervals.ETERNITY),
|
||||
plannerContext.queryContextMap(),
|
||||
windowing.getSignature(),
|
||||
operators,
|
||||
null
|
||||
plannerContext.featureAvailable(EngineFeature.WINDOW_LEAF_OPERATOR) ? ImmutableList.of() : null
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -164,6 +164,7 @@ public class DruidJoinRule extends RelOptRule
|
|||
final Project rightProject = right.getPartialDruidQuery().getSelectProject();
|
||||
|
||||
// Right-side projection expressions rewritten to be on top of the join.
|
||||
|
||||
for (final RexNode rexNode : RexUtil.shift(rightProject.getProjects(), newLeft.getRowType().getFieldCount())) {
|
||||
if (join.getJoinType().generatesNullsOnRight()) {
|
||||
newProjectExprs.add(makeNullableIfLiteral(rexNode, rexBuilder));
|
||||
|
|
|
@ -92,6 +92,11 @@ public enum EngineFeature
|
|||
*/
|
||||
WINDOW_FUNCTIONS,
|
||||
|
||||
/**
|
||||
* Used to ignore leaf operators when planning for MSQ engine
|
||||
*/
|
||||
WINDOW_LEAF_OPERATOR,
|
||||
|
||||
/**
|
||||
* Queries can use {@link org.apache.calcite.sql.fun.SqlStdOperatorTable#UNNEST}.
|
||||
*/
|
||||
|
|
|
@ -116,6 +116,7 @@ public class NativeSqlEngine implements SqlEngine
|
|||
case WRITE_EXTERNAL_DATA:
|
||||
case SCAN_ORDER_BY_NON_TIME:
|
||||
case SCAN_NEEDS_SIGNATURE:
|
||||
case WINDOW_LEAF_OPERATOR:
|
||||
return false;
|
||||
default:
|
||||
throw SqlEngines.generateUnrecognizedFeatureException(NativeSqlEngine.class.getSimpleName(), feature);
|
||||
|
|
|
@ -68,6 +68,7 @@ public class ViewSqlEngine implements SqlEngine
|
|||
case ALLOW_TOP_LEVEL_UNION_ALL:
|
||||
return true;
|
||||
// Views can't sit on top of INSERT or REPLACE.
|
||||
case WINDOW_LEAF_OPERATOR:
|
||||
case CAN_INSERT:
|
||||
case CAN_REPLACE:
|
||||
return false;
|
||||
|
|
|
@ -14925,7 +14925,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
public void testUnSupportedNullsFirst()
|
||||
{
|
||||
DruidException e = assertThrows(DruidException.class, () -> testBuilder()
|
||||
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
|
||||
.sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 DESC NULLS FIRST) from druid.foo")
|
||||
.run());
|
||||
|
||||
|
@ -14936,7 +14935,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
public void testUnSupportedNullsLast()
|
||||
{
|
||||
DruidException e = assertThrows(DruidException.class, () -> testBuilder()
|
||||
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
|
||||
.sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 NULLS LAST) from druid.foo")
|
||||
.run());
|
||||
assertThat(e, invalidSqlIs("ASCENDING ordering with NULLS LAST is not supported! (line [1], column [41])"));
|
||||
|
@ -14948,7 +14946,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
assumeFeatureAvailable(EngineFeature.WINDOW_FUNCTIONS);
|
||||
|
||||
DruidException e = assertThrows(DruidException.class, () -> testBuilder()
|
||||
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
|
||||
.sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 RANGE BETWEEN 3 PRECEDING AND 2 FOLLOWING) from druid.foo")
|
||||
.run());
|
||||
assertThat(e, invalidSqlIs("The query contains a window frame which may return incorrect results. To disregard this warning, set [windowingStrictValidation] to false in the query context. (line [1], column [31])"));
|
||||
|
@ -14960,7 +14957,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
assumeFeatureAvailable(EngineFeature.WINDOW_FUNCTIONS);
|
||||
|
||||
DruidException e = assertThrows(DruidException.class, () -> testBuilder()
|
||||
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
|
||||
.sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 ROWS BETWEEN dim1 PRECEDING AND dim1 FOLLOWING) from druid.foo")
|
||||
.run());
|
||||
assertThat(e, invalidSqlIs("Window frames with expression based lower/upper bounds are not supported. (line [1], column [31])"));
|
||||
|
@ -14974,30 +14970,16 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
|
||||
DruidException e;
|
||||
e = assertThrows(DruidException.class, () -> testBuilder()
|
||||
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
|
||||
.sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) from druid.foo")
|
||||
.run());
|
||||
assertThat(e, invalidSqlIs("Query bounds with both lower and upper bounds as PRECEDING or FOLLOWING is not supported. (line [1], column [31])"));
|
||||
|
||||
e = assertThrows(DruidException.class, () -> testBuilder()
|
||||
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
|
||||
.sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) from druid.foo")
|
||||
.run());
|
||||
assertThat(e, invalidSqlIs("Query bounds with both lower and upper bounds as PRECEDING or FOLLOWING is not supported. (line [1], column [31])"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testWindowingErrorWithoutFeatureFlag()
|
||||
{
|
||||
DruidException e = assertThrows(DruidException.class, () -> testBuilder()
|
||||
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, false))
|
||||
.sql("SELECT dim1,ROW_NUMBER() OVER () from druid.foo")
|
||||
.run());
|
||||
|
||||
assertThat(e, invalidSqlIs("The query contains window functions; To run these window functions, specify [enableWindowing] in query context. (line [1], column [13])"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNtileNotSupportedWithFrame()
|
||||
{
|
||||
|
@ -15006,7 +14988,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
DruidException e = assertThrows(
|
||||
DruidException.class,
|
||||
() -> testBuilder()
|
||||
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
|
||||
.sql("SELECT ntile(4) OVER (ORDER BY dim1 ROWS BETWEEN 1 FOLLOWING AND CURRENT ROW) from druid.foo")
|
||||
.run()
|
||||
);
|
||||
|
@ -15214,7 +15195,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
|
||||
testBuilder()
|
||||
.sql(sql)
|
||||
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
|
||||
.expectedQuery(
|
||||
WindowOperatorQueryBuilder.builder()
|
||||
.setDataSource(
|
||||
|
@ -15302,7 +15282,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
)
|
||||
.queryContext(
|
||||
ImmutableMap.of(
|
||||
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
|
||||
QueryContexts.ENABLE_DEBUG, true
|
||||
)
|
||||
)
|
||||
|
@ -15403,7 +15382,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
)
|
||||
.queryContext(
|
||||
ImmutableMap.of(
|
||||
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
|
||||
QueryContexts.ENABLE_DEBUG, true
|
||||
)
|
||||
)
|
||||
|
|
|
@ -20,10 +20,8 @@
|
|||
package org.apache.druid.sql.calcite;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.druid.sql.calcite.NotYetSupported.Modes;
|
||||
import org.apache.druid.sql.calcite.NotYetSupported.NotYetSupportedProcessor;
|
||||
import org.apache.druid.sql.calcite.planner.PlannerContext;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
|
||||
|
@ -54,7 +52,6 @@ public class CalciteSysQueryTest extends BaseCalciteQueryTest
|
|||
msqIncompatible();
|
||||
|
||||
testBuilder()
|
||||
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
|
||||
.sql("select datasource, sum(duration) over () from sys.tasks group by datasource")
|
||||
.expectedResults(ImmutableList.of(
|
||||
new Object[]{"foo", 11L},
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.apache.druid.segment.column.RowSignature;
|
|||
import org.apache.druid.sql.calcite.CalciteWindowQueryTest.WindowQueryTestInputClass.TestType;
|
||||
import org.apache.druid.sql.calcite.QueryTestRunner.QueryResults;
|
||||
import org.apache.druid.sql.calcite.QueryVerification.QueryResultsVerifier;
|
||||
import org.apache.druid.sql.calcite.planner.PlannerContext;
|
||||
import org.junit.Assert;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
@ -198,7 +197,6 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
|
|||
.skipVectorize(true)
|
||||
.sql(testCase.getSql())
|
||||
.queryContext(ImmutableMap.of(
|
||||
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
|
||||
QueryContexts.ENABLE_DEBUG, true,
|
||||
QueryContexts.WINDOWING_STRICT_VALIDATION, false
|
||||
))
|
||||
|
@ -220,8 +218,7 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
|
|||
testBuilder()
|
||||
.skipVectorize(true)
|
||||
.sql(testCase.getSql())
|
||||
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
|
||||
QueryContexts.ENABLE_DEBUG, true,
|
||||
.queryContext(ImmutableMap.of(QueryContexts.ENABLE_DEBUG, true,
|
||||
QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000",
|
||||
QueryContexts.WINDOWING_STRICT_VALIDATION, false
|
||||
)
|
||||
|
|
|
@ -57,7 +57,6 @@ import org.apache.druid.sql.calcite.NotYetSupported.Modes;
|
|||
import org.apache.druid.sql.calcite.NotYetSupported.NotYetSupportedProcessor;
|
||||
import org.apache.druid.sql.calcite.QueryTestRunner.QueryResults;
|
||||
import org.apache.druid.sql.calcite.planner.PlannerCaptureHook;
|
||||
import org.apache.druid.sql.calcite.planner.PlannerContext;
|
||||
import org.apache.druid.timeline.DataSegment;
|
||||
import org.apache.druid.timeline.partition.NumberedShardSpec;
|
||||
import org.joda.time.DateTime;
|
||||
|
@ -481,7 +480,6 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
|
|||
testBuilder()
|
||||
.skipVectorize(true)
|
||||
.queryContext(ImmutableMap.of(
|
||||
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
|
||||
PlannerCaptureHook.NEED_CAPTURE_HOOK, true,
|
||||
QueryContexts.ENABLE_DEBUG, true)
|
||||
)
|
||||
|
|
|
@ -86,6 +86,8 @@ public class IngestionTestSqlEngine implements SqlEngine
|
|||
case SCAN_NEEDS_SIGNATURE:
|
||||
case UNNEST:
|
||||
case GROUPBY_IMPLICITLY_SORTS:
|
||||
case WINDOW_FUNCTIONS:
|
||||
case WINDOW_LEAF_OPERATOR:
|
||||
return false;
|
||||
case CAN_INSERT:
|
||||
case CAN_REPLACE:
|
||||
|
|
|
@ -2,7 +2,7 @@ type: "operatorValidation"
|
|||
|
||||
sql: |
|
||||
SELECT
|
||||
m1,
|
||||
m2, m1,
|
||||
SUM(m1) OVER () cc
|
||||
FROM druid.foo
|
||||
|
||||
|
@ -18,9 +18,9 @@ expectedOperators:
|
|||
name: "w0"
|
||||
fieldName: "m1"
|
||||
expectedResults:
|
||||
- [1.0,21.0]
|
||||
- [2.0,21.0]
|
||||
- [3.0,21.0]
|
||||
- [4.0,21.0]
|
||||
- [5.0,21.0]
|
||||
- [6.0,21.0]
|
||||
- [1.0, 1.0, 21.0]
|
||||
- [2.0, 2.0, 21.0]
|
||||
- [3.0, 3.0, 21.0]
|
||||
- [4.0,4.0,21.0]
|
||||
- [5.0,5.0,21.0]
|
||||
- [6.0,6.0,21.0]
|
||||
|
|
Loading…
Reference in New Issue