diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java index 6d8cfdfd277..2bf21397ffb 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java @@ -38,7 +38,6 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.indexing.error.MSQException; import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault; import org.apache.druid.query.groupby.ResultRow; -import org.apache.druid.query.operator.NaivePartitioningOperatorFactory; import org.apache.druid.query.operator.OffsetLimit; import org.apache.druid.query.operator.Operator; import org.apache.druid.query.operator.OperatorFactory; @@ -70,6 +69,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor private final WindowOperatorQuery query; private final List operatorFactoryList; + private final List partitionColumnNames; private final ObjectMapper jsonMapper; private final ArrayList frameRowsAndCols; private final ArrayList resultRowAndCols; @@ -79,7 +79,6 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor private final FrameReader frameReader; private final ArrayList objectsOfASingleRac; private final int maxRowsMaterialized; - List partitionColsIndex; private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed private Cursor frameCursor = null; private Supplier rowSupplierFromFrameCursor; @@ -97,7 +96,8 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor final List operatorFactoryList, final RowSignature rowSignature, final boolean isOverEmpty, - final int maxRowsMaterializedInWindow + final int maxRowsMaterializedInWindow, + final List partitionColumnNames ) { this.inputChannel = inputChannel; @@ -110,9 +110,9 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor this.frameRowsAndCols = new ArrayList<>(); this.resultRowAndCols = new ArrayList<>(); this.objectsOfASingleRac = new ArrayList<>(); - this.partitionColsIndex = new ArrayList<>(); this.isOverEmpty = isOverEmpty; this.maxRowsMaterialized = maxRowsMaterializedInWindow; + this.partitionColumnNames = partitionColumnNames; } @Override @@ -177,12 +177,12 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor * * Future thoughts: {@link https://github.com/apache/druid/issues/16126} * - * 1. We are writing 1 partition to each frame in this way. In case of low cardinality data - * we will me making a large number of small frames. We can have a check to keep size of frame to a value + * 1. We are writing 1 partition to each frame in this way. In case of high cardinality data + * we will be making a large number of small frames. We can have a check to keep size of frame to a value * say 20k rows and keep on adding to the same pending frame and not create a new frame * * 2. Current approach with R&C and operators materialize a single R&C for processing. In case of data - * with high cardinality a single R&C might be too big to consume. Same for the case of empty OVER() clause + * with low cardinality a single R&C might be too big to consume. Same for the case of empty OVER() clause * Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with 2 passes of the data. * We might think to reimplement them in the MSQ way so that we do not have to materialize so much data */ @@ -218,7 +218,6 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor final Frame frame = inputChannel.read(); frameCursor = FrameProcessors.makeCursor(frame, frameReader); final ColumnSelectorFactory frameColumnSelectorFactory = frameCursor.getColumnSelectorFactory(); - partitionColsIndex = findPartitionColumns(frameReader.signature()); final Supplier[] fieldSuppliers = new Supplier[frameReader.signature().size()]; for (int i = 0; i < fieldSuppliers.length; i++) { final ColumnValueSelector selector = @@ -259,18 +258,17 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor if (outputRow == null) { outputRow = currentRow; objectsOfASingleRac.add(currentRow); - } else if (comparePartitionKeys(outputRow, currentRow, partitionColsIndex)) { + } else if (comparePartitionKeys(outputRow, currentRow, partitionColumnNames)) { // if they have the same partition key // keep adding them after checking // guardrails + objectsOfASingleRac.add(currentRow); if (objectsOfASingleRac.size() > maxRowsMaterialized) { throw new MSQException(new TooManyRowsInAWindowFault( objectsOfASingleRac.size(), maxRowsMaterialized )); } - objectsOfASingleRac.add(currentRow); - } else { // key change noted // create rac from the rows seen before @@ -484,37 +482,36 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor frameRowsAndCols.add(ldrc); } - private List findPartitionColumns(RowSignature rowSignature) - { - List indexList = new ArrayList<>(); - for (OperatorFactory of : operatorFactoryList) { - if (of instanceof NaivePartitioningOperatorFactory) { - for (String s : ((NaivePartitioningOperatorFactory) of).getPartitionColumns()) { - indexList.add(rowSignature.indexOf(s)); - } - } - } - return indexList; - } - /** - * - * Compare two rows based only the columns in the partitionIndices - * In case the parition indices is empty or null compare entire row - * + * Compare two rows based on the columns in partitionColumnNames. + * If the partitionColumnNames is empty or null, compare entire row. + *

+ * For example, say: + *

    + *
  • partitionColumnNames = ["d1", "d2"]
  • + *
  • frameReader's row signature = {d1:STRING, d2:STRING, p0:STRING}
  • + *
  • frameReader.signature.indexOf("d1") = 0
  • + *
  • frameReader.signature.indexOf("d2") = 1
  • + *
  • row1 = [d1_row1, d2_row1, p0_row1]
  • + *
  • row2 = [d1_row2, d2_row2, p0_row2]
  • + *
+ *

+ * Then this method will return true if d1_row1==d1_row2 && d2_row1==d2_row2, false otherwise. + * Returning true would indicate that these 2 rows can be put into the same partition for window function processing. */ - private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List partitionIndices) + private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List partitionColumnNames) { - if (partitionIndices == null || partitionIndices.isEmpty()) { + if (partitionColumnNames == null || partitionColumnNames.isEmpty()) { return row1.equals(row2); } else { int match = 0; - for (int i : partitionIndices) { + for (String columnName : partitionColumnNames) { + int i = frameReader.signature().indexOf(columnName); if (Objects.equals(row1.get(i), row2.get(i))) { match++; } } - return match == partitionIndices.size(); + return match == partitionColumnNames.size(); } } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java index fbbc0a0fc3e..d9c14390736 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java @@ -61,6 +61,7 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor private final RowSignature stageRowSignature; private final boolean isEmptyOver; private final int maxRowsMaterializedInWindow; + private final List partitionColumnNames; @JsonCreator public WindowOperatorQueryFrameProcessorFactory( @@ -68,7 +69,8 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor @JsonProperty("operatorList") List operatorFactoryList, @JsonProperty("stageRowSignature") RowSignature stageRowSignature, @JsonProperty("emptyOver") boolean emptyOver, - @JsonProperty("maxRowsMaterializedInWindow") int maxRowsMaterializedInWindow + @JsonProperty("maxRowsMaterializedInWindow") int maxRowsMaterializedInWindow, + @JsonProperty("partitionColumnNames") List partitionColumnNames ) { this.query = Preconditions.checkNotNull(query, "query"); @@ -76,6 +78,7 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor this.stageRowSignature = Preconditions.checkNotNull(stageRowSignature, "stageSignature"); this.isEmptyOver = emptyOver; this.maxRowsMaterializedInWindow = maxRowsMaterializedInWindow; + this.partitionColumnNames = partitionColumnNames; } @JsonProperty("query") @@ -90,6 +93,12 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor return operatorList; } + @JsonProperty("partitionColumnNames") + public List getPartitionColumnNames() + { + return partitionColumnNames; + } + @JsonProperty("stageRowSignature") public RowSignature getSignature() { @@ -148,7 +157,6 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor readableInput -> { final OutputChannel outputChannel = outputChannels.get(readableInput.getStagePartition().getPartitionNumber()); - return new WindowOperatorQueryFrameProcessor( query, readableInput.getChannel(), @@ -159,7 +167,8 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor operatorList, stageRowSignature, isEmptyOver, - maxRowsMaterializedInWindow + maxRowsMaterializedInWindow, + partitionColumnNames ); } ); @@ -185,12 +194,13 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor && maxRowsMaterializedInWindow == that.maxRowsMaterializedInWindow && Objects.equals(query, that.query) && Objects.equals(operatorList, that.operatorList) + && Objects.equals(partitionColumnNames, that.partitionColumnNames) && Objects.equals(stageRowSignature, that.stageRowSignature); } @Override public int hashCode() { - return Objects.hash(query, operatorList, stageRowSignature, isEmptyOver, maxRowsMaterializedInWindow); + return Objects.hash(query, operatorList, partitionColumnNames, stageRowSignature, isEmptyOver, maxRowsMaterializedInWindow); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java index d08d78ef791..3754f081a27 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java @@ -24,9 +24,12 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.frame.key.ClusterBy; import org.apache.druid.frame.key.KeyColumn; import org.apache.druid.frame.key.KeyOrder; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.exec.Limits; import org.apache.druid.msq.input.stage.StageInputSpec; import org.apache.druid.msq.kernel.HashShuffleSpec; +import org.apache.druid.msq.kernel.MixShuffleSpec; import org.apache.druid.msq.kernel.QueryDefinition; import org.apache.druid.msq.kernel.QueryDefinitionBuilder; import org.apache.druid.msq.kernel.ShuffleSpec; @@ -39,6 +42,7 @@ import org.apache.druid.query.operator.NaiveSortOperatorFactory; import org.apache.druid.query.operator.OperatorFactory; import org.apache.druid.query.operator.WindowOperatorQuery; import org.apache.druid.query.operator.window.WindowOperatorFactory; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import java.util.ArrayList; @@ -48,6 +52,7 @@ import java.util.Map; public class WindowOperatorQueryKit implements QueryKit { + private static final Logger log = new Logger(WindowOperatorQueryKit.class); private final ObjectMapper jsonMapper; public WindowOperatorQueryKit(ObjectMapper jsonMapper) @@ -65,13 +70,22 @@ public class WindowOperatorQueryKit implements QueryKit int minStageNumber ) { - // need to validate query first - // populate the group of operators to be processed as each stage - // the size of the operators is the number of serialized stages - // later we should also check if these can be parallelized - // check there is an empty over clause or not - List> operatorList = new ArrayList<>(); - boolean isEmptyOverFound = ifEmptyOverPresentInWindowOperstors(originalQuery, operatorList); + // Need to validate query first. + // Populate the group of operators to be processed at each stage. + // The size of the operators is the number of serialized stages. + // Later we should also check if these can be parallelized. + // Check if there is an empty OVER() clause or not. + RowSignature rowSignature = originalQuery.getRowSignature(); + log.info("Row signature received for query is [%s].", rowSignature); + + boolean isEmptyOverPresent = originalQuery.getOperators() + .stream() + .filter(of -> of instanceof NaivePartitioningOperatorFactory) + .map(of -> (NaivePartitioningOperatorFactory) of) + .anyMatch(of -> of.getPartitionColumns().isEmpty()); + + List> operatorList = getOperatorListFromQuery(originalQuery); + log.info("Created operatorList with operator factories: [%s]", operatorList); ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount); // add this shuffle spec to the last stage of the inner query @@ -102,16 +116,14 @@ public class WindowOperatorQueryKit implements QueryKit final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber()); final WindowOperatorQuery queryToRun = (WindowOperatorQuery) originalQuery.withDataSource(dataSourcePlan.getNewDataSource()); final int maxRowsMaterialized; - RowSignature rowSignature = queryToRun.getRowSignature(); + if (originalQuery.context() != null && originalQuery.context().containsKey(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW)) { - maxRowsMaterialized = (int) originalQuery.context() - .get(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW); + maxRowsMaterialized = (int) originalQuery.context().get(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW); } else { maxRowsMaterialized = Limits.MAX_ROWS_MATERIALIZED_IN_WINDOW; } - - if (isEmptyOverFound) { + if (isEmptyOverPresent) { // empty over clause found // moving everything to a single partition queryDefBuilder.add( @@ -125,28 +137,59 @@ public class WindowOperatorQueryKit implements QueryKit queryToRun.getOperators(), rowSignature, true, - maxRowsMaterialized + maxRowsMaterialized, + new ArrayList<>() )) ); } else { - // there are multiple windows present in the query - // Create stages for each window in the query - // These stages will be serialized - // the partition by clause of the next window will be the shuffle key for the previous window + // There are multiple windows present in the query. + // Create stages for each window in the query. + // These stages will be serialized. + // The partition by clause of the next window will be the shuffle key for the previous window. RowSignature.Builder bob = RowSignature.builder(); - final int numberOfWindows = operatorList.size(); - final int baseSize = rowSignature.size() - numberOfWindows; - for (int i = 0; i < baseSize; i++) { - bob.add(rowSignature.getColumnName(i), rowSignature.getColumnType(i).get()); + RowSignature signatureFromInput = dataSourcePlan.getSubQueryDefBuilder().get().build().getFinalStageDefinition().getSignature(); + log.info("Row signature received from last stage is [%s].", signatureFromInput); + + for (int i = 0; i < signatureFromInput.getColumnNames().size(); i++) { + bob.add(signatureFromInput.getColumnName(i), signatureFromInput.getColumnType(i).get()); } - for (int i = 0; i < numberOfWindows; i++) { - bob.add(rowSignature.getColumnName(baseSize + i), rowSignature.getColumnType(baseSize + i).get()).build(); + List partitionColumnNames = new ArrayList<>(); + + /* + operatorList is a List>, where each List corresponds to the operator factories + to be used for a different window stage. + + We iterate over operatorList, and add the definition for a window stage to QueryDefinitionBuilder. + */ + for (int i = 0; i < operatorList.size(); i++) { + for (OperatorFactory operatorFactory : operatorList.get(i)) { + if (operatorFactory instanceof WindowOperatorFactory) { + List outputColumnNames = ((WindowOperatorFactory) operatorFactory).getProcessor().getOutputColumnNames(); + + // Need to add column names which are present in outputColumnNames and rowSignature but not in bob, + // since they need to be present in the row signature for this window stage. + for (String columnName : outputColumnNames) { + int indexInRowSignature = rowSignature.indexOf(columnName); + if (indexInRowSignature != -1 && bob.build().indexOf(columnName) == -1) { + ColumnType columnType = rowSignature.getColumnType(indexInRowSignature).get(); + bob.add(columnName, columnType); + log.info("Added column [%s] of type [%s] to row signature for window stage.", columnName, columnType); + } else { + throw new ISE( + "Found unexpected column [%s] already present in row signature [%s].", + columnName, + rowSignature + ); + } + } + } + } + // find the shuffle spec of the next stage // if it is the last stage set the next shuffle spec to single partition - if (i + 1 == numberOfWindows) { - nextShuffleSpec = ShuffleSpecFactories.singlePartition() - .build(ClusterBy.none(), false); + if (i + 1 == operatorList.size()) { + nextShuffleSpec = MixShuffleSpec.instance(); } else { nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(i + 1), maxWorkerCount); } @@ -162,6 +205,28 @@ public class WindowOperatorQueryKit implements QueryKit ); } + log.info("Using row signature [%s] for window stage.", stageRowSignature); + + boolean partitionOperatorExists = false; + List currentPartitionColumns = new ArrayList<>(); + for (OperatorFactory of : operatorList.get(i)) { + if (of instanceof NaivePartitioningOperatorFactory) { + for (String s : ((NaivePartitioningOperatorFactory) of).getPartitionColumns()) { + currentPartitionColumns.add(s); + partitionOperatorExists = true; + } + } + } + + if (partitionOperatorExists) { + partitionColumnNames = currentPartitionColumns; + } + + log.info( + "Columns which would be used to define partitioning boundaries for this window stage are [%s]", + partitionColumnNames + ); + queryDefBuilder.add( StageDefinition.builder(firstStageNumber + i) .inputs(new StageInputSpec(firstStageNumber + i - 1)) @@ -173,7 +238,8 @@ public class WindowOperatorQueryKit implements QueryKit operatorList.get(i), stageRowSignature, false, - maxRowsMaterialized + maxRowsMaterialized, + partitionColumnNames )) ); } @@ -184,14 +250,12 @@ public class WindowOperatorQueryKit implements QueryKit /** * * @param originalQuery - * @param operatorList - * @return true if the operator List has a partitioning operator with an empty OVER clause, false otherwise + * @return A list of list of operator factories, where each list represents the operator factories for a particular + * window stage. */ - private boolean ifEmptyOverPresentInWindowOperstors( - WindowOperatorQuery originalQuery, - List> operatorList - ) + private List> getOperatorListFromQuery(WindowOperatorQuery originalQuery) { + List> operatorList = new ArrayList<>(); final List operators = originalQuery.getOperators(); List operatorFactoryList = new ArrayList<>(); for (OperatorFactory of : operators) { @@ -203,18 +267,17 @@ public class WindowOperatorQueryKit implements QueryKit if (((NaivePartitioningOperatorFactory) of).getPartitionColumns().isEmpty()) { operatorList.clear(); operatorList.add(originalQuery.getOperators()); - return true; + return operatorList; } } } - return false; + return operatorList; } private ShuffleSpec findShuffleSpecForNextWindow(List operatorFactories, int maxWorkerCount) { NaivePartitioningOperatorFactory partition = null; NaiveSortOperatorFactory sort = null; - List keyColsOfWindow = new ArrayList<>(); for (OperatorFactory of : operatorFactories) { if (of instanceof NaivePartitioningOperatorFactory) { partition = (NaivePartitioningOperatorFactory) of; @@ -222,29 +285,31 @@ public class WindowOperatorQueryKit implements QueryKit sort = (NaiveSortOperatorFactory) of; } } - Map colMap = new HashMap<>(); + + Map sortColumnsMap = new HashMap<>(); if (sort != null) { for (ColumnWithDirection sortColumn : sort.getSortColumns()) { - colMap.put(sortColumn.getColumn(), sortColumn.getDirection()); + sortColumnsMap.put(sortColumn.getColumn(), sortColumn.getDirection()); } } - assert partition != null; - if (partition.getPartitionColumns().isEmpty()) { + + if (partition == null || partition.getPartitionColumns().isEmpty()) { + // If operatorFactories doesn't have any partitioning factory, then we should keep the shuffle spec from previous stage. + // This indicates that we already have the data partitioned correctly, and hence we don't need to do any shuffling. return null; } + + List keyColsOfWindow = new ArrayList<>(); for (String partitionColumn : partition.getPartitionColumns()) { KeyColumn kc; - if (colMap.containsKey(partitionColumn)) { - if (colMap.get(partitionColumn) == ColumnWithDirection.Direction.ASC) { - kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING); - } else { - kc = new KeyColumn(partitionColumn, KeyOrder.DESCENDING); - } + if (sortColumnsMap.get(partitionColumn) == ColumnWithDirection.Direction.DESC) { + kc = new KeyColumn(partitionColumn, KeyOrder.DESCENDING); } else { kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING); } keyColsOfWindow.add(kc); } + return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), maxWorkerCount); } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactoryTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactoryTest.java new file mode 100644 index 00000000000..2049c0194ed --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactoryTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.Test; + +public class WindowOperatorQueryFrameProcessorFactoryTest +{ + @Test + public void testEqualsAndHashcode() + { + EqualsVerifier.forClass(WindowOperatorQueryFrameProcessorFactory.class) + .withNonnullFields("query", "operatorList", "stageRowSignature", "isEmptyOver", "maxRowsMaterializedInWindow", "partitionColumnNames") + .usingGetClass() + .verify(); + } +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java index 54552e5d5b0..eaa2a9efe5a 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java @@ -39,6 +39,7 @@ import org.apache.druid.guice.JoinableFactoryModule; import org.apache.druid.guice.annotations.Self; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.math.expr.ExprMacroTable; @@ -62,12 +63,14 @@ import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.TestGroupByBuffers; import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexStorageAdapter; import org.apache.druid.segment.Segment; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.TestIndex; import org.apache.druid.segment.column.ColumnConfig; +import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusher; @@ -91,6 +94,7 @@ import javax.annotation.Nullable; import java.io.File; import java.util.List; import java.util.Set; +import java.util.UUID; import java.util.function.Function; import java.util.function.Supplier; @@ -99,6 +103,7 @@ import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE1; import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE2; import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE3; import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE5; +import static org.apache.druid.sql.calcite.util.CalciteTests.WIKIPEDIA; import static org.apache.druid.sql.calcite.util.TestDataBuilder.INDEX_SCHEMA_LOTS_O_COLUMNS; import static org.apache.druid.sql.calcite.util.TestDataBuilder.INDEX_SCHEMA_NUMERIC_DIMS; import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS1; @@ -205,6 +210,17 @@ public class CalciteMSQTestsHelper { final QueryableIndex index; switch (segmentId.getDataSource()) { + case WIKIPEDIA: + try { + final File directory = new File(tempFolderProducer.apply("tmpDir"), StringUtils.format("wikipedia-index-%s", UUID.randomUUID())); + final IncrementalIndex incrementalIndex = TestIndex.makeWikipediaIncrementalIndex(); + TestIndex.INDEX_MERGER.persist(incrementalIndex, directory, IndexSpec.DEFAULT, null); + index = TestIndex.INDEX_IO.loadIndex(directory); + } + catch (Exception e) { + throw new RuntimeException(e); + } + break; case DATASOURCE1: IncrementalIndexSchema foo1Schema = new IncrementalIndexSchema.Builder() .withMetrics( diff --git a/processing/src/main/java/org/apache/druid/query/operator/Operator.java b/processing/src/main/java/org/apache/druid/query/operator/Operator.java index a9a18c36d54..57bc1013fc4 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/Operator.java +++ b/processing/src/main/java/org/apache/druid/query/operator/Operator.java @@ -126,7 +126,7 @@ public interface Operator */ STOP, /** - * Inidcates that the downstream processing should pause its pushing of results and instead return a + * Indicates that the downstream processing should pause its pushing of results and instead return a * continuation object that encapsulates whatever state is required to resume processing. When this signal is * received, Operators that are generating data might choose to exert backpressure or otherwise pause their * processing efforts until called again with the returned continuation object. diff --git a/processing/src/main/java/org/apache/druid/query/operator/window/ComposingProcessor.java b/processing/src/main/java/org/apache/druid/query/operator/window/ComposingProcessor.java index a4fa74967f6..0e0fc59498c 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/window/ComposingProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/operator/window/ComposingProcessor.java @@ -23,7 +23,9 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.query.rowsandcols.RowsAndColumns; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; public class ComposingProcessor implements Processor { @@ -37,6 +39,16 @@ public class ComposingProcessor implements Processor this.processors = processors; } + @Override + public List getOutputColumnNames() + { + List outputColumnNames = new ArrayList<>(); + for (Processor processor : processors) { + outputColumnNames.addAll(processor.getOutputColumnNames()); + } + return outputColumnNames; + } + @JsonProperty("processors") public Processor[] getProcessors() { diff --git a/processing/src/main/java/org/apache/druid/query/operator/window/Processor.java b/processing/src/main/java/org/apache/druid/query/operator/window/Processor.java index fe8d125cbdf..b271d3064ef 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/window/Processor.java +++ b/processing/src/main/java/org/apache/druid/query/operator/window/Processor.java @@ -31,6 +31,8 @@ import org.apache.druid.query.operator.window.value.WindowLastProcessor; import org.apache.druid.query.operator.window.value.WindowOffsetProcessor; import org.apache.druid.query.rowsandcols.RowsAndColumns; +import java.util.List; + /** * A Processor is a bit of logic that processes a single RowsAndColumns object to produce a new RowsAndColumns * object. Generally speaking, it is used to add or alter columns in a batch-oriented fashion. @@ -80,4 +82,9 @@ public interface Processor * @return boolean identifying if these processors should be considered equivalent to each other. */ boolean validateEquivalent(Processor otherProcessor); + + /** + * @return List of output column names for the Processor. + */ + List getOutputColumnNames(); } diff --git a/processing/src/main/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessor.java b/processing/src/main/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessor.java index 3545c3740f4..41baced4e61 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessor.java @@ -27,7 +27,9 @@ import org.apache.druid.query.rowsandcols.semantic.DefaultFramedOnHeapAggregatab import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Objects; public class WindowFramedAggregateProcessor implements Processor @@ -45,6 +47,16 @@ public class WindowFramedAggregateProcessor implements Processor private final WindowFrame frame; private final AggregatorFactory[] aggregations; + @Override + public List getOutputColumnNames() + { + List outputColumnNames = new ArrayList<>(); + for (AggregatorFactory aggregation : aggregations) { + outputColumnNames.add(aggregation.getName()); + } + return outputColumnNames; + } + @JsonCreator public WindowFramedAggregateProcessor( @JsonProperty("frame") WindowFrame frame, diff --git a/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowPercentileProcessor.java b/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowPercentileProcessor.java index 541c1399e36..b7f77d50969 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowPercentileProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowPercentileProcessor.java @@ -28,12 +28,20 @@ import org.apache.druid.query.rowsandcols.column.IntArrayColumn; import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns; import java.util.Arrays; +import java.util.Collections; +import java.util.List; public class WindowPercentileProcessor implements Processor { private final int numBuckets; private final String outputColumn; + @Override + public List getOutputColumnNames() + { + return Collections.singletonList(outputColumn); + } + @JsonCreator public WindowPercentileProcessor( @JsonProperty("outputColumn") String outputColumn, diff --git a/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowRankingProcessorBase.java b/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowRankingProcessorBase.java index fb5bedf9519..4e026cbdd3d 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowRankingProcessorBase.java +++ b/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowRankingProcessorBase.java @@ -27,6 +27,7 @@ import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns; import org.apache.druid.query.rowsandcols.semantic.ClusteredGroupPartitioner; import org.apache.druid.query.rowsandcols.semantic.DefaultClusteredGroupPartitioner; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.function.Function; @@ -124,4 +125,9 @@ public abstract class WindowRankingProcessorBase implements Processor return Objects.equals(groupingCols, other.groupingCols) && Objects.equals(outputColumn, other.outputColumn); } + @Override + public List getOutputColumnNames() + { + return Collections.singletonList(outputColumn); + } } diff --git a/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowRowNumberProcessor.java b/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowRowNumberProcessor.java index 7821e3fd53b..98b09b6f80d 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowRowNumberProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowRowNumberProcessor.java @@ -28,6 +28,9 @@ import org.apache.druid.query.rowsandcols.column.ColumnAccessorBasedColumn; import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns; import org.apache.druid.segment.column.ColumnType; +import java.util.Collections; +import java.util.List; + public class WindowRowNumberProcessor implements Processor { private final String outputColumn; @@ -128,4 +131,10 @@ public class WindowRowNumberProcessor implements Processor "outputColumn='" + outputColumn + '\'' + '}'; } + + @Override + public List getOutputColumnNames() + { + return Collections.singletonList(outputColumn); + } } diff --git a/processing/src/main/java/org/apache/druid/query/operator/window/value/WindowValueProcessorBase.java b/processing/src/main/java/org/apache/druid/query/operator/window/value/WindowValueProcessorBase.java index 2e084ae983a..93a7ccd9a5b 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/window/value/WindowValueProcessorBase.java +++ b/processing/src/main/java/org/apache/druid/query/operator/window/value/WindowValueProcessorBase.java @@ -26,6 +26,8 @@ import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns; +import java.util.Collections; +import java.util.List; import java.util.function.Function; public abstract class WindowValueProcessorBase implements Processor @@ -100,4 +102,10 @@ public abstract class WindowValueProcessorBase implements Processor return "inputColumn=" + inputColumn + ", outputColumn='" + outputColumn + '\''; } + + @Override + public List getOutputColumnNames() + { + return Collections.singletonList(outputColumn); + } } diff --git a/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java b/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java index 9cce74cb98c..c11a50cf5cb 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java +++ b/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java @@ -27,6 +27,9 @@ import org.apache.druid.query.rowsandcols.column.IntArrayColumn; import org.junit.Assert; import org.junit.Test; +import java.util.Collections; +import java.util.List; + public class WindowProcessorOperatorTest { @Test @@ -53,6 +56,12 @@ public class WindowProcessorOperatorTest { return true; } + + @Override + public List getOutputColumnNames() + { + return Collections.emptyList(); + } }, InlineScanOperator.make(rac) ); diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/ComposingProcessorTest.java b/processing/src/test/java/org/apache/druid/query/operator/window/ComposingProcessorTest.java index 570cba65d92..d8f4599eb1a 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/window/ComposingProcessorTest.java +++ b/processing/src/test/java/org/apache/druid/query/operator/window/ComposingProcessorTest.java @@ -23,6 +23,9 @@ import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.junit.Assert; import org.junit.Test; +import java.util.Collections; +import java.util.List; + public class ComposingProcessorTest { @Test @@ -32,6 +35,7 @@ public class ComposingProcessorTest final ProcessorForTesting secondProcessor = new ProcessorForTesting(); ComposingProcessor proc = new ComposingProcessor(firstProcessor, secondProcessor); + Assert.assertTrue(proc.getOutputColumnNames().isEmpty()); proc.process(null); Assert.assertEquals(1, firstProcessor.processCounter); @@ -70,5 +74,11 @@ public class ComposingProcessorTest ++validateCounter; return validationResult; } + + @Override + public List getOutputColumnNames() + { + return Collections.emptyList(); + } } } diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java b/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java index 88d79c87cdb..5af321b53c8 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java +++ b/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java @@ -19,6 +19,7 @@ package org.apache.druid.query.operator.window; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.common.config.NullHandling; @@ -51,6 +52,7 @@ public class WindowFramedAggregateProcessorTest new DoubleSumAggregatorFactory("cummSum", "doubleCol") }; WindowFramedAggregateProcessor proc = new WindowFramedAggregateProcessor(theFrame, theAggs); + Assert.assertEquals(ImmutableList.of("cummMax", "cummSum"), proc.getOutputColumnNames()); final MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(ImmutableMap.of( "yay", new IntArrayColumn(new int[]{1, 2, 3}) diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowCumeDistProcessorTest.java b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowCumeDistProcessorTest.java index f5914e4f5db..877c7841549 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowCumeDistProcessorTest.java +++ b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowCumeDistProcessorTest.java @@ -25,6 +25,7 @@ import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.IntArrayColumn; +import org.junit.Assert; import org.junit.Test; import java.util.Collections; @@ -42,6 +43,7 @@ public class WindowCumeDistProcessorTest MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(map); Processor processor = new WindowCumeDistProcessor(Collections.singletonList("vals"), "CumeDist"); + Assert.assertEquals(Collections.singletonList("CumeDist"), processor.getOutputColumnNames()); final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper() .expectColumn("vals", new int[]{7, 18, 18, 30, 120, 121, 122, 122, 8290, 8290}) diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowDenseRankProcessorTest.java b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowDenseRankProcessorTest.java index e165f46f074..86580e5bd2f 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowDenseRankProcessorTest.java +++ b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowDenseRankProcessorTest.java @@ -25,6 +25,7 @@ import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.IntArrayColumn; +import org.junit.Assert; import org.junit.Test; import java.util.Collections; @@ -42,6 +43,7 @@ public class WindowDenseRankProcessorTest MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(map); Processor processor = new WindowDenseRankProcessor(Collections.singletonList("vals"), "DenseRank"); + Assert.assertEquals(Collections.singletonList("DenseRank"), processor.getOutputColumnNames()); final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper() .expectColumn("vals", new int[]{7, 18, 18, 30, 120, 121, 122, 122, 8290, 8290}) diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowPercentileProcessorTest.java b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowPercentileProcessorTest.java index c38cd2a245c..bf5bb727b0a 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowPercentileProcessorTest.java +++ b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowPercentileProcessorTest.java @@ -19,6 +19,7 @@ package org.apache.druid.query.operator.window.ranking; +import com.google.common.collect.ImmutableList; import org.apache.druid.query.operator.window.ComposingProcessor; import org.apache.druid.query.operator.window.Processor; import org.apache.druid.query.operator.window.RowsAndColumnsHelper; @@ -29,6 +30,7 @@ import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn; import org.apache.druid.query.rowsandcols.column.IntArrayColumn; import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn; import org.apache.druid.segment.column.ColumnType; +import org.junit.Assert; import org.junit.Test; import java.util.LinkedHashMap; @@ -63,6 +65,11 @@ public class WindowPercentileProcessorTest new WindowPercentileProcessor("10292", 10292) ); + Assert.assertEquals( + ImmutableList.of("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "10292"), + processor.getOutputColumnNames() + ); + final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper() .expectColumn("intCol", new int[]{88, 1, 2, 3, 4, 5, 6, 7, 8, 9}) .expectColumn("doubleCol", new double[]{0.4728, 1, 2, 3, 4, 5, 6, 7, 8, 9}) diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowRankProcessorTest.java b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowRankProcessorTest.java index 59c7dd6df36..b7f281c423e 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowRankProcessorTest.java +++ b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowRankProcessorTest.java @@ -19,6 +19,7 @@ package org.apache.druid.query.operator.window.ranking; +import com.google.common.collect.ImmutableList; import org.apache.druid.query.operator.window.ComposingProcessor; import org.apache.druid.query.operator.window.Processor; import org.apache.druid.query.operator.window.RowsAndColumnsHelper; @@ -26,6 +27,7 @@ import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.IntArrayColumn; +import org.junit.Assert; import org.junit.Test; import java.util.Collections; @@ -49,6 +51,8 @@ public class WindowRankProcessorTest new WindowRankProcessor(orderingCols, "rankAsPercent", true) ); + Assert.assertEquals(ImmutableList.of("rank", "rankAsPercent"), processor.getOutputColumnNames()); + final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper() .expectColumn("vals", new int[]{7, 18, 18, 30, 120, 121, 122, 122, 8290, 8290}) .expectColumn("rank", new int[]{1, 2, 2, 4, 5, 6, 7, 7, 9, 9}) diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowRowNumberProcessorTest.java b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowRowNumberProcessorTest.java index 937fea7c360..f4f9b5bfeee 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowRowNumberProcessorTest.java +++ b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowRowNumberProcessorTest.java @@ -28,8 +28,10 @@ import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn; import org.apache.druid.query.rowsandcols.column.IntArrayColumn; import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn; import org.apache.druid.segment.column.ColumnType; +import org.junit.Assert; import org.junit.Test; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; @@ -49,6 +51,7 @@ public class WindowRowNumberProcessorTest MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(map); Processor processor = new WindowRowNumberProcessor("rowRow"); + Assert.assertEquals(Collections.singletonList("rowRow"), processor.getOutputColumnNames()); final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper() .expectColumn("intCol", new int[]{88, 1, 2, 3, 4, 5, 6, 7, 8, 9}) diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/value/WindowFirstProcessorTest.java b/processing/src/test/java/org/apache/druid/query/operator/window/value/WindowFirstProcessorTest.java index 67242f05503..eb6caa10a0b 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/window/value/WindowFirstProcessorTest.java +++ b/processing/src/test/java/org/apache/druid/query/operator/window/value/WindowFirstProcessorTest.java @@ -19,6 +19,7 @@ package org.apache.druid.query.operator.window.value; +import com.google.common.collect.ImmutableList; import org.apache.druid.query.operator.window.ComposingProcessor; import org.apache.druid.query.operator.window.RowsAndColumnsHelper; import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; @@ -28,6 +29,7 @@ import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn; import org.apache.druid.query.rowsandcols.column.IntArrayColumn; import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn; import org.apache.druid.segment.column.ColumnType; +import org.junit.Assert; import org.junit.Test; import java.util.LinkedHashMap; @@ -59,6 +61,11 @@ public class WindowFirstProcessorTest new WindowFirstProcessor("nullFirstCol", "NullFirstCol") ); + Assert.assertEquals( + ImmutableList.of("FirstIntCol", "FirstDoubleCol", "FirstObjectCol", "NullFirstCol"), + processor.getOutputColumnNames() + ); + final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper() .expectColumn("intCol", new int[]{88, 1, 2, 3, 4, 5, 6, 7, 8, 9}) .expectColumn("doubleCol", new double[]{0.4728, 1, 2, 3, 4, 5, 6, 7, 8, 9}) diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/value/WindowLastProcessorTest.java b/processing/src/test/java/org/apache/druid/query/operator/window/value/WindowLastProcessorTest.java index 5aa212b6acb..1910401f34a 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/window/value/WindowLastProcessorTest.java +++ b/processing/src/test/java/org/apache/druid/query/operator/window/value/WindowLastProcessorTest.java @@ -19,6 +19,7 @@ package org.apache.druid.query.operator.window.value; +import com.google.common.collect.ImmutableList; import org.apache.druid.query.operator.window.ComposingProcessor; import org.apache.druid.query.operator.window.RowsAndColumnsHelper; import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; @@ -28,6 +29,7 @@ import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn; import org.apache.druid.query.rowsandcols.column.IntArrayColumn; import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn; import org.apache.druid.segment.column.ColumnType; +import org.junit.Assert; import org.junit.Test; import java.util.LinkedHashMap; @@ -58,6 +60,10 @@ public class WindowLastProcessorTest new WindowLastProcessor("objectCol", "LastObjectCol"), new WindowLastProcessor("nullLastCol", "NullLastCol") ); + Assert.assertEquals( + ImmutableList.of("LastIntCol", "LastDoubleCol", "LastObjectCol", "NullLastCol"), + processor.getOutputColumnNames() + ); final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper() diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java index cb7bed7e041..4e958383945 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java @@ -7533,4 +7533,78 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest { windowQueryTest(); } + + /* + Druid query tests + */ + + @DrillTest("druid_queries/same_window_across_columns/wikipedia_query_1") + @Test + public void test_same_window_wikipedia_query_1() + { + windowQueryTest(); + } + + @DrillTest("druid_queries/same_window_across_columns/wikipedia_query_1_named_window") + @Test + public void test_same_window_wikipedia_query_1_named_window() + { + windowQueryTest(); + } + + @DrillTest("druid_queries/multiple_windows/wikipedia_query_1") + @Test + public void test_multiple_windows_wikipedia_query_1() + { + windowQueryTest(); + } + + @DrillTest("druid_queries/multiple_windows/wikipedia_query_1_named_windows") + @Test + public void test_multiple_windows_wikipedia_query_1_named_windows() + { + windowQueryTest(); + } + + @DrillTest("druid_queries/shuffle_columns/wikipedia_query_1") + @Test + public void test_shuffle_columns_wikipedia_query_1() + { + windowQueryTest(); + } + + @DrillTest("druid_queries/shuffle_columns/wikipedia_query_1_shuffle_1") + @Test + public void test_shuffle_columns_wikipedia_query_1_shuffle_1() + { + windowQueryTest(); + } + + @DrillTest("druid_queries/shuffle_columns/wikipedia_query_2") + @Test + public void test_shuffle_columns_wikipedia_query_2() + { + windowQueryTest(); + } + + @DrillTest("druid_queries/shuffle_columns/wikipedia_query_2_shuffle_1") + @Test + public void test_shuffle_columns_wikipedia_query_2_shuffle_1() + { + windowQueryTest(); + } + + @DrillTest("druid_queries/partition_by_multiple_columns/wikipedia_query_1") + @Test + public void test_partition_by_multiple_columns_wikipedia_query_1() + { + windowQueryTest(); + } + + @DrillTest("druid_queries/partition_by_multiple_columns/wikipedia_query_2") + @Test + public void test_partition_by_multiple_columns_wikipedia_query_2() + { + windowQueryTest(); + } } diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1.e b/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1.e new file mode 100644 index 00000000000..3625be892e2 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1.e @@ -0,0 +1,13 @@ +null Austria 1 1 +null Republic of Korea 1 2 +null Republic of Korea 2 3 +null Republic of Korea 3 4 +Horsching Austria 2 1 +Jeonju Republic of Korea 4 1 +Seongnam-si Republic of Korea 5 1 +Seoul Republic of Korea 6 1 +Suwon-si Republic of Korea 7 1 +Vienna Austria 3 1 +Vienna Austria 4 2 +Vienna Austria 5 3 +Yongsan-dong Republic of Korea 8 1 diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1.q b/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1.q new file mode 100644 index 00000000000..d61a33e401f --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1.q @@ -0,0 +1,6 @@ +select cityName, countryName, +row_number() over (partition by countryName order by countryName, cityName, channel) as c1, +count(channel) over (partition by cityName order by countryName, cityName, channel) as c2 +from wikipedia +where countryName in ('Austria', 'Republic of Korea') +group by countryName, cityName, channel diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1_named_windows.e b/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1_named_windows.e new file mode 100644 index 00000000000..3625be892e2 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1_named_windows.e @@ -0,0 +1,13 @@ +null Austria 1 1 +null Republic of Korea 1 2 +null Republic of Korea 2 3 +null Republic of Korea 3 4 +Horsching Austria 2 1 +Jeonju Republic of Korea 4 1 +Seongnam-si Republic of Korea 5 1 +Seoul Republic of Korea 6 1 +Suwon-si Republic of Korea 7 1 +Vienna Austria 3 1 +Vienna Austria 4 2 +Vienna Austria 5 3 +Yongsan-dong Republic of Korea 8 1 diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1_named_windows.q b/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1_named_windows.q new file mode 100644 index 00000000000..12739d58ceb --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1_named_windows.q @@ -0,0 +1,9 @@ +select cityName, countryName, +row_number() over w1 as c1, +count(channel) over w2 as c2 +from wikipedia +where countryName in ('Austria', 'Republic of Korea') +group by countryName, cityName, channel +WINDOW + w1 AS (partition by countryName order by countryName, cityName, channel), + w2 AS (partition by cityName order by countryName, cityName, channel) diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_1.e b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_1.e new file mode 100644 index 00000000000..36812a418ae --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_1.e @@ -0,0 +1,15 @@ +Austria null 94 7 +Austria null 4685 7 +Austria null 14 7 +Austria null 0 7 +Austria null 272 7 +Austria null 0 7 +Austria null 6979 7 +Guatemala null 0 1 +Guatemala El Salvador 1 1 +Guatemala Guatemala City 173 1 +Austria Horsching 0 1 +Austria Vienna 93 4 +Austria Vienna 72 4 +Austria Vienna 0 4 +Austria Vienna 0 4 diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_1.q b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_1.q new file mode 100644 index 00000000000..5d0dd075678 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_1.q @@ -0,0 +1,7 @@ +SELECT +countryName, +cityName, +added, +count(added) OVER (PARTITION BY countryName, cityName) +FROM "wikipedia" +where countryName in ('Guatemala', 'Austria') diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_2.e b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_2.e new file mode 100644 index 00000000000..a1b94f5a865 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_2.e @@ -0,0 +1,15 @@ +Austria null 0 7 12044 1 +Austria null 0 7 12044 2 +Austria null 14 7 12044 1 +Austria null 94 7 12044 1 +Austria null 272 7 12044 1 +Austria null 4685 7 12044 1 +Austria null 6979 7 12044 1 +Guatemala null 0 1 0 1 +Guatemala El Salvador 1 1 1 1 +Guatemala Guatemala City 173 1 173 1 +Austria Horsching 0 1 0 1 +Austria Vienna 0 4 165 1 +Austria Vienna 0 4 165 2 +Austria Vienna 72 4 165 1 +Austria Vienna 93 4 165 1 diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_2.q b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_2.q new file mode 100644 index 00000000000..b1a594beeda --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_2.q @@ -0,0 +1,9 @@ +SELECT +countryName, +cityName, +added, +count(added) OVER (PARTITION BY countryName, cityName), +sum(added) OVER (PARTITION BY countryName, cityName), +ROW_NUMBER() OVER (PARTITION BY countryName, cityName, added) +FROM "wikipedia" +where countryName in ('Guatemala', 'Austria') diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1.e b/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1.e new file mode 100644 index 00000000000..0dfb6a832b8 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1.e @@ -0,0 +1,15 @@ +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Guatemala 167 7 174 +Guatemala 167 7 174 +Guatemala 167 7 174 diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1.q b/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1.q new file mode 100644 index 00000000000..dcb83c09c23 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1.q @@ -0,0 +1,6 @@ +SELECT countryName, +sum("deleted") OVER (PARTITION BY countryName) as count_c3, +sum(delta) OVER (PARTITION BY countryName) as count_c1, +sum(added) OVER (PARTITION BY countryName) as count_c2 +FROM "wikipedia" +where countryName in ('Guatemala', 'Austria') diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1_named_window.e b/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1_named_window.e new file mode 100644 index 00000000000..0dfb6a832b8 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1_named_window.e @@ -0,0 +1,15 @@ +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Guatemala 167 7 174 +Guatemala 167 7 174 +Guatemala 167 7 174 diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1_named_window.q b/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1_named_window.q new file mode 100644 index 00000000000..adb9287d378 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1_named_window.q @@ -0,0 +1,7 @@ +SELECT countryName, +sum("deleted") OVER w as count_c3, +sum(delta) OVER w as count_c1, +sum(added) OVER w as count_c2 +FROM "wikipedia" +where countryName in ('Guatemala', 'Austria') +WINDOW w AS (PARTITION BY countryName) diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1.e b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1.e new file mode 100644 index 00000000000..e934bc8fc27 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1.e @@ -0,0 +1,15 @@ +Austria 1017.4166666666666 +Austria 1017.4166666666666 +Austria 1017.4166666666666 +Austria 1017.4166666666666 +Austria 1017.4166666666666 +Austria 1017.4166666666666 +Austria 1017.4166666666666 +Austria 1017.4166666666666 +Austria 1017.4166666666666 +Austria 1017.4166666666666 +Austria 1017.4166666666666 +Austria 1017.4166666666666 +Guatemala 58 +Guatemala 58 +Guatemala 58 diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1.q b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1.q new file mode 100644 index 00000000000..f1a7bcb09b1 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1.q @@ -0,0 +1,5 @@ +SELECT +countryName, +AVG(added) OVER(PARTITION BY countryName) +FROM wikipedia +where countryName in ('Guatemala', 'Austria') diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1_shuffle_1.e b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1_shuffle_1.e new file mode 100644 index 00000000000..e74706be009 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1_shuffle_1.e @@ -0,0 +1,15 @@ +1017.4166666666666 Austria +1017.4166666666666 Austria +1017.4166666666666 Austria +1017.4166666666666 Austria +1017.4166666666666 Austria +1017.4166666666666 Austria +1017.4166666666666 Austria +1017.4166666666666 Austria +1017.4166666666666 Austria +1017.4166666666666 Austria +1017.4166666666666 Austria +1017.4166666666666 Austria +58 Guatemala +58 Guatemala +58 Guatemala diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1_shuffle_1.q b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1_shuffle_1.q new file mode 100644 index 00000000000..c2dc11546a9 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1_shuffle_1.q @@ -0,0 +1,5 @@ +SELECT +AVG(added) OVER(PARTITION BY countryName), +countryName +FROM wikipedia +where countryName in ('Guatemala', 'Austria') diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2.e b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2.e new file mode 100644 index 00000000000..daf6eff61ba --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2.e @@ -0,0 +1,16 @@ +Austria null 1 #de.wikipedia 1 +Guatemala null 1 #es.wikipedia 2 +Republic of Korea null 1 #en.wikipedia 3 +Republic of Korea null 2 #ja.wikipedia 4 +Republic of Korea null 3 #ko.wikipedia 5 +Guatemala El Salvador 2 #es.wikipedia 1 +Guatemala Guatemala City 3 #es.wikipedia 1 +Austria Horsching 2 #de.wikipedia 1 +Republic of Korea Jeonju 4 #ko.wikipedia 1 +Republic of Korea Seongnam-si 5 #ko.wikipedia 1 +Republic of Korea Seoul 6 #ko.wikipedia 1 +Republic of Korea Suwon-si 7 #ko.wikipedia 1 +Austria Vienna 3 #de.wikipedia 1 +Austria Vienna 4 #es.wikipedia 2 +Austria Vienna 5 #tr.wikipedia 3 +Republic of Korea Yongsan-dong 8 #ko.wikipedia 1 diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2.q b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2.q new file mode 100644 index 00000000000..d3ea2dfc729 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2.q @@ -0,0 +1,9 @@ +SELECT +countryName, +cityName, +ROW_NUMBER() OVER(PARTITION BY countryName), +channel, +COUNT(channel) over (PARTITION BY cityName order by countryName, cityName, channel) +FROM wikipedia +where countryName in ('Guatemala', 'Austria', 'Republic of Korea') +group by countryName, cityName, channel diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2_shuffle_1.e b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2_shuffle_1.e new file mode 100644 index 00000000000..813ccdbf6aa --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2_shuffle_1.e @@ -0,0 +1,16 @@ +1 Austria null 1 #de.wikipedia +1 Guatemala null 2 #es.wikipedia +1 Republic of Korea null 3 #en.wikipedia +2 Republic of Korea null 4 #ja.wikipedia +3 Republic of Korea null 5 #ko.wikipedia +2 Guatemala El Salvador 1 #es.wikipedia +3 Guatemala Guatemala City 1 #es.wikipedia +2 Austria Horsching 1 #de.wikipedia +4 Republic of Korea Jeonju 1 #ko.wikipedia +5 Republic of Korea Seongnam-si 1 #ko.wikipedia +6 Republic of Korea Seoul 1 #ko.wikipedia +7 Republic of Korea Suwon-si 1 #ko.wikipedia +3 Austria Vienna 1 #de.wikipedia +4 Austria Vienna 2 #es.wikipedia +5 Austria Vienna 3 #tr.wikipedia +8 Republic of Korea Yongsan-dong 1 #ko.wikipedia diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2_shuffle_1.q b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2_shuffle_1.q new file mode 100644 index 00000000000..779aaf3a86f --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2_shuffle_1.q @@ -0,0 +1,9 @@ +SELECT +ROW_NUMBER() OVER(PARTITION BY countryName), +countryName, +cityName, +COUNT(channel) over (PARTITION BY cityName order by countryName, cityName, channel), +channel +FROM wikipedia +where countryName in ('Guatemala', 'Austria', 'Republic of Korea') +group by countryName, cityName, channel