Fix issues with partitioning boundaries for MSQ window functions (#16729)

* Fix issues with partitioning boundaries for MSQ window functions

* Address review comments

* Address review comments

* Add test for coverage check failure

* Address review comment

* Remove DruidWindowQueryTest and WindowQueryTestBase, move those tests to DrillWindowQueryTest

* Update extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java

* Address review comments

* Add test for equals and hashcode for WindowOperatorQueryFrameProcessorFactory

* Address review comment

* Fix checkstyle

---------

Co-authored-by: Benedict Jin <asdf2014@apache.org>
This commit is contained in:
Akshat Jain 2024-07-18 07:35:09 +05:30 committed by GitHub
parent 44b3f8e588
commit b53c26f5c5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
44 changed files with 614 additions and 83 deletions

View File

@ -38,7 +38,6 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
import org.apache.druid.query.operator.OffsetLimit;
import org.apache.druid.query.operator.Operator;
import org.apache.druid.query.operator.OperatorFactory;
@ -70,6 +69,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
private final WindowOperatorQuery query;
private final List<OperatorFactory> operatorFactoryList;
private final List<String> partitionColumnNames;
private final ObjectMapper jsonMapper;
private final ArrayList<RowsAndColumns> frameRowsAndCols;
private final ArrayList<RowsAndColumns> resultRowAndCols;
@ -79,7 +79,6 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
private final FrameReader frameReader;
private final ArrayList<ResultRow> objectsOfASingleRac;
private final int maxRowsMaterialized;
List<Integer> partitionColsIndex;
private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed
private Cursor frameCursor = null;
private Supplier<ResultRow> rowSupplierFromFrameCursor;
@ -97,7 +96,8 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
final List<OperatorFactory> operatorFactoryList,
final RowSignature rowSignature,
final boolean isOverEmpty,
final int maxRowsMaterializedInWindow
final int maxRowsMaterializedInWindow,
final List<String> partitionColumnNames
)
{
this.inputChannel = inputChannel;
@ -110,9 +110,9 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
this.frameRowsAndCols = new ArrayList<>();
this.resultRowAndCols = new ArrayList<>();
this.objectsOfASingleRac = new ArrayList<>();
this.partitionColsIndex = new ArrayList<>();
this.isOverEmpty = isOverEmpty;
this.maxRowsMaterialized = maxRowsMaterializedInWindow;
this.partitionColumnNames = partitionColumnNames;
}
@Override
@ -177,12 +177,12 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
*
* Future thoughts: {@link https://github.com/apache/druid/issues/16126}
*
* 1. We are writing 1 partition to each frame in this way. In case of low cardinality data
* we will me making a large number of small frames. We can have a check to keep size of frame to a value
* 1. We are writing 1 partition to each frame in this way. In case of high cardinality data
* we will be making a large number of small frames. We can have a check to keep size of frame to a value
* say 20k rows and keep on adding to the same pending frame and not create a new frame
*
* 2. Current approach with R&C and operators materialize a single R&C for processing. In case of data
* with high cardinality a single R&C might be too big to consume. Same for the case of empty OVER() clause
* with low cardinality a single R&C might be too big to consume. Same for the case of empty OVER() clause
* Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with 2 passes of the data.
* We might think to reimplement them in the MSQ way so that we do not have to materialize so much data
*/
@ -218,7 +218,6 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
final Frame frame = inputChannel.read();
frameCursor = FrameProcessors.makeCursor(frame, frameReader);
final ColumnSelectorFactory frameColumnSelectorFactory = frameCursor.getColumnSelectorFactory();
partitionColsIndex = findPartitionColumns(frameReader.signature());
final Supplier<Object>[] fieldSuppliers = new Supplier[frameReader.signature().size()];
for (int i = 0; i < fieldSuppliers.length; i++) {
final ColumnValueSelector<?> selector =
@ -259,18 +258,17 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
if (outputRow == null) {
outputRow = currentRow;
objectsOfASingleRac.add(currentRow);
} else if (comparePartitionKeys(outputRow, currentRow, partitionColsIndex)) {
} else if (comparePartitionKeys(outputRow, currentRow, partitionColumnNames)) {
// if they have the same partition key
// keep adding them after checking
// guardrails
objectsOfASingleRac.add(currentRow);
if (objectsOfASingleRac.size() > maxRowsMaterialized) {
throw new MSQException(new TooManyRowsInAWindowFault(
objectsOfASingleRac.size(),
maxRowsMaterialized
));
}
objectsOfASingleRac.add(currentRow);
} else {
// key change noted
// create rac from the rows seen before
@ -484,37 +482,36 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
frameRowsAndCols.add(ldrc);
}
private List<Integer> findPartitionColumns(RowSignature rowSignature)
{
List<Integer> indexList = new ArrayList<>();
for (OperatorFactory of : operatorFactoryList) {
if (of instanceof NaivePartitioningOperatorFactory) {
for (String s : ((NaivePartitioningOperatorFactory) of).getPartitionColumns()) {
indexList.add(rowSignature.indexOf(s));
}
}
}
return indexList;
}
/**
*
* Compare two rows based only the columns in the partitionIndices
* In case the parition indices is empty or null compare entire row
*
* Compare two rows based on the columns in partitionColumnNames.
* If the partitionColumnNames is empty or null, compare entire row.
* <p>
* For example, say:
* <ul>
* <li>partitionColumnNames = ["d1", "d2"]</li>
* <li>frameReader's row signature = {d1:STRING, d2:STRING, p0:STRING}</li>
* <li>frameReader.signature.indexOf("d1") = 0</li>
* <li>frameReader.signature.indexOf("d2") = 1</li>
* <li>row1 = [d1_row1, d2_row1, p0_row1]</li>
* <li>row2 = [d1_row2, d2_row2, p0_row2]</li>
* </ul>
* <p>
* Then this method will return true if d1_row1==d1_row2 && d2_row1==d2_row2, false otherwise.
* Returning true would indicate that these 2 rows can be put into the same partition for window function processing.
*/
private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List<Integer> partitionIndices)
private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List<String> partitionColumnNames)
{
if (partitionIndices == null || partitionIndices.isEmpty()) {
if (partitionColumnNames == null || partitionColumnNames.isEmpty()) {
return row1.equals(row2);
} else {
int match = 0;
for (int i : partitionIndices) {
for (String columnName : partitionColumnNames) {
int i = frameReader.signature().indexOf(columnName);
if (Objects.equals(row1.get(i), row2.get(i))) {
match++;
}
}
return match == partitionIndices.size();
return match == partitionColumnNames.size();
}
}
}

View File

@ -61,6 +61,7 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
private final RowSignature stageRowSignature;
private final boolean isEmptyOver;
private final int maxRowsMaterializedInWindow;
private final List<String> partitionColumnNames;
@JsonCreator
public WindowOperatorQueryFrameProcessorFactory(
@ -68,7 +69,8 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
@JsonProperty("operatorList") List<OperatorFactory> operatorFactoryList,
@JsonProperty("stageRowSignature") RowSignature stageRowSignature,
@JsonProperty("emptyOver") boolean emptyOver,
@JsonProperty("maxRowsMaterializedInWindow") int maxRowsMaterializedInWindow
@JsonProperty("maxRowsMaterializedInWindow") int maxRowsMaterializedInWindow,
@JsonProperty("partitionColumnNames") List<String> partitionColumnNames
)
{
this.query = Preconditions.checkNotNull(query, "query");
@ -76,6 +78,7 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
this.stageRowSignature = Preconditions.checkNotNull(stageRowSignature, "stageSignature");
this.isEmptyOver = emptyOver;
this.maxRowsMaterializedInWindow = maxRowsMaterializedInWindow;
this.partitionColumnNames = partitionColumnNames;
}
@JsonProperty("query")
@ -90,6 +93,12 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
return operatorList;
}
@JsonProperty("partitionColumnNames")
public List<String> getPartitionColumnNames()
{
return partitionColumnNames;
}
@JsonProperty("stageRowSignature")
public RowSignature getSignature()
{
@ -148,7 +157,6 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
readableInput -> {
final OutputChannel outputChannel =
outputChannels.get(readableInput.getStagePartition().getPartitionNumber());
return new WindowOperatorQueryFrameProcessor(
query,
readableInput.getChannel(),
@ -159,7 +167,8 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
operatorList,
stageRowSignature,
isEmptyOver,
maxRowsMaterializedInWindow
maxRowsMaterializedInWindow,
partitionColumnNames
);
}
);
@ -185,12 +194,13 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
&& maxRowsMaterializedInWindow == that.maxRowsMaterializedInWindow
&& Objects.equals(query, that.query)
&& Objects.equals(operatorList, that.operatorList)
&& Objects.equals(partitionColumnNames, that.partitionColumnNames)
&& Objects.equals(stageRowSignature, that.stageRowSignature);
}
@Override
public int hashCode()
{
return Objects.hash(query, operatorList, stageRowSignature, isEmptyOver, maxRowsMaterializedInWindow);
return Objects.hash(query, operatorList, partitionColumnNames, stageRowSignature, isEmptyOver, maxRowsMaterializedInWindow);
}
}

View File

@ -24,9 +24,12 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.frame.key.KeyOrder;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.input.stage.StageInputSpec;
import org.apache.druid.msq.kernel.HashShuffleSpec;
import org.apache.druid.msq.kernel.MixShuffleSpec;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
import org.apache.druid.msq.kernel.ShuffleSpec;
@ -39,6 +42,7 @@ import org.apache.druid.query.operator.NaiveSortOperatorFactory;
import org.apache.druid.query.operator.OperatorFactory;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.operator.window.WindowOperatorFactory;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import java.util.ArrayList;
@ -48,6 +52,7 @@ import java.util.Map;
public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
{
private static final Logger log = new Logger(WindowOperatorQueryKit.class);
private final ObjectMapper jsonMapper;
public WindowOperatorQueryKit(ObjectMapper jsonMapper)
@ -65,13 +70,22 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
int minStageNumber
)
{
// need to validate query first
// populate the group of operators to be processed as each stage
// the size of the operators is the number of serialized stages
// later we should also check if these can be parallelized
// check there is an empty over clause or not
List<List<OperatorFactory>> operatorList = new ArrayList<>();
boolean isEmptyOverFound = ifEmptyOverPresentInWindowOperstors(originalQuery, operatorList);
// Need to validate query first.
// Populate the group of operators to be processed at each stage.
// The size of the operators is the number of serialized stages.
// Later we should also check if these can be parallelized.
// Check if there is an empty OVER() clause or not.
RowSignature rowSignature = originalQuery.getRowSignature();
log.info("Row signature received for query is [%s].", rowSignature);
boolean isEmptyOverPresent = originalQuery.getOperators()
.stream()
.filter(of -> of instanceof NaivePartitioningOperatorFactory)
.map(of -> (NaivePartitioningOperatorFactory) of)
.anyMatch(of -> of.getPartitionColumns().isEmpty());
List<List<OperatorFactory>> operatorList = getOperatorListFromQuery(originalQuery);
log.info("Created operatorList with operator factories: [%s]", operatorList);
ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount);
// add this shuffle spec to the last stage of the inner query
@ -102,16 +116,14 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber());
final WindowOperatorQuery queryToRun = (WindowOperatorQuery) originalQuery.withDataSource(dataSourcePlan.getNewDataSource());
final int maxRowsMaterialized;
RowSignature rowSignature = queryToRun.getRowSignature();
if (originalQuery.context() != null && originalQuery.context().containsKey(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW)) {
maxRowsMaterialized = (int) originalQuery.context()
.get(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW);
maxRowsMaterialized = (int) originalQuery.context().get(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW);
} else {
maxRowsMaterialized = Limits.MAX_ROWS_MATERIALIZED_IN_WINDOW;
}
if (isEmptyOverFound) {
if (isEmptyOverPresent) {
// empty over clause found
// moving everything to a single partition
queryDefBuilder.add(
@ -125,28 +137,59 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
queryToRun.getOperators(),
rowSignature,
true,
maxRowsMaterialized
maxRowsMaterialized,
new ArrayList<>()
))
);
} else {
// there are multiple windows present in the query
// Create stages for each window in the query
// These stages will be serialized
// the partition by clause of the next window will be the shuffle key for the previous window
// There are multiple windows present in the query.
// Create stages for each window in the query.
// These stages will be serialized.
// The partition by clause of the next window will be the shuffle key for the previous window.
RowSignature.Builder bob = RowSignature.builder();
final int numberOfWindows = operatorList.size();
final int baseSize = rowSignature.size() - numberOfWindows;
for (int i = 0; i < baseSize; i++) {
bob.add(rowSignature.getColumnName(i), rowSignature.getColumnType(i).get());
RowSignature signatureFromInput = dataSourcePlan.getSubQueryDefBuilder().get().build().getFinalStageDefinition().getSignature();
log.info("Row signature received from last stage is [%s].", signatureFromInput);
for (int i = 0; i < signatureFromInput.getColumnNames().size(); i++) {
bob.add(signatureFromInput.getColumnName(i), signatureFromInput.getColumnType(i).get());
}
for (int i = 0; i < numberOfWindows; i++) {
bob.add(rowSignature.getColumnName(baseSize + i), rowSignature.getColumnType(baseSize + i).get()).build();
List<String> partitionColumnNames = new ArrayList<>();
/*
operatorList is a List<List<OperatorFactory>>, where each List<OperatorFactory> corresponds to the operator factories
to be used for a different window stage.
We iterate over operatorList, and add the definition for a window stage to QueryDefinitionBuilder.
*/
for (int i = 0; i < operatorList.size(); i++) {
for (OperatorFactory operatorFactory : operatorList.get(i)) {
if (operatorFactory instanceof WindowOperatorFactory) {
List<String> outputColumnNames = ((WindowOperatorFactory) operatorFactory).getProcessor().getOutputColumnNames();
// Need to add column names which are present in outputColumnNames and rowSignature but not in bob,
// since they need to be present in the row signature for this window stage.
for (String columnName : outputColumnNames) {
int indexInRowSignature = rowSignature.indexOf(columnName);
if (indexInRowSignature != -1 && bob.build().indexOf(columnName) == -1) {
ColumnType columnType = rowSignature.getColumnType(indexInRowSignature).get();
bob.add(columnName, columnType);
log.info("Added column [%s] of type [%s] to row signature for window stage.", columnName, columnType);
} else {
throw new ISE(
"Found unexpected column [%s] already present in row signature [%s].",
columnName,
rowSignature
);
}
}
}
}
// find the shuffle spec of the next stage
// if it is the last stage set the next shuffle spec to single partition
if (i + 1 == numberOfWindows) {
nextShuffleSpec = ShuffleSpecFactories.singlePartition()
.build(ClusterBy.none(), false);
if (i + 1 == operatorList.size()) {
nextShuffleSpec = MixShuffleSpec.instance();
} else {
nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(i + 1), maxWorkerCount);
}
@ -162,6 +205,28 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
);
}
log.info("Using row signature [%s] for window stage.", stageRowSignature);
boolean partitionOperatorExists = false;
List<String> currentPartitionColumns = new ArrayList<>();
for (OperatorFactory of : operatorList.get(i)) {
if (of instanceof NaivePartitioningOperatorFactory) {
for (String s : ((NaivePartitioningOperatorFactory) of).getPartitionColumns()) {
currentPartitionColumns.add(s);
partitionOperatorExists = true;
}
}
}
if (partitionOperatorExists) {
partitionColumnNames = currentPartitionColumns;
}
log.info(
"Columns which would be used to define partitioning boundaries for this window stage are [%s]",
partitionColumnNames
);
queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + i)
.inputs(new StageInputSpec(firstStageNumber + i - 1))
@ -173,7 +238,8 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
operatorList.get(i),
stageRowSignature,
false,
maxRowsMaterialized
maxRowsMaterialized,
partitionColumnNames
))
);
}
@ -184,14 +250,12 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
/**
*
* @param originalQuery
* @param operatorList
* @return true if the operator List has a partitioning operator with an empty OVER clause, false otherwise
* @return A list of list of operator factories, where each list represents the operator factories for a particular
* window stage.
*/
private boolean ifEmptyOverPresentInWindowOperstors(
WindowOperatorQuery originalQuery,
List<List<OperatorFactory>> operatorList
)
private List<List<OperatorFactory>> getOperatorListFromQuery(WindowOperatorQuery originalQuery)
{
List<List<OperatorFactory>> operatorList = new ArrayList<>();
final List<OperatorFactory> operators = originalQuery.getOperators();
List<OperatorFactory> operatorFactoryList = new ArrayList<>();
for (OperatorFactory of : operators) {
@ -203,18 +267,17 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
if (((NaivePartitioningOperatorFactory) of).getPartitionColumns().isEmpty()) {
operatorList.clear();
operatorList.add(originalQuery.getOperators());
return true;
return operatorList;
}
}
}
return false;
return operatorList;
}
private ShuffleSpec findShuffleSpecForNextWindow(List<OperatorFactory> operatorFactories, int maxWorkerCount)
{
NaivePartitioningOperatorFactory partition = null;
NaiveSortOperatorFactory sort = null;
List<KeyColumn> keyColsOfWindow = new ArrayList<>();
for (OperatorFactory of : operatorFactories) {
if (of instanceof NaivePartitioningOperatorFactory) {
partition = (NaivePartitioningOperatorFactory) of;
@ -222,29 +285,31 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
sort = (NaiveSortOperatorFactory) of;
}
}
Map<String, ColumnWithDirection.Direction> colMap = new HashMap<>();
Map<String, ColumnWithDirection.Direction> sortColumnsMap = new HashMap<>();
if (sort != null) {
for (ColumnWithDirection sortColumn : sort.getSortColumns()) {
colMap.put(sortColumn.getColumn(), sortColumn.getDirection());
sortColumnsMap.put(sortColumn.getColumn(), sortColumn.getDirection());
}
}
assert partition != null;
if (partition.getPartitionColumns().isEmpty()) {
if (partition == null || partition.getPartitionColumns().isEmpty()) {
// If operatorFactories doesn't have any partitioning factory, then we should keep the shuffle spec from previous stage.
// This indicates that we already have the data partitioned correctly, and hence we don't need to do any shuffling.
return null;
}
List<KeyColumn> keyColsOfWindow = new ArrayList<>();
for (String partitionColumn : partition.getPartitionColumns()) {
KeyColumn kc;
if (colMap.containsKey(partitionColumn)) {
if (colMap.get(partitionColumn) == ColumnWithDirection.Direction.ASC) {
kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING);
} else {
kc = new KeyColumn(partitionColumn, KeyOrder.DESCENDING);
}
if (sortColumnsMap.get(partitionColumn) == ColumnWithDirection.Direction.DESC) {
kc = new KeyColumn(partitionColumn, KeyOrder.DESCENDING);
} else {
kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING);
}
keyColsOfWindow.add(kc);
}
return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), maxWorkerCount);
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.querykit;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.junit.Test;
public class WindowOperatorQueryFrameProcessorFactoryTest
{
@Test
public void testEqualsAndHashcode()
{
EqualsVerifier.forClass(WindowOperatorQueryFrameProcessorFactory.class)
.withNonnullFields("query", "operatorList", "stageRowSignature", "isEmptyOver", "maxRowsMaterializedInWindow", "partitionColumnNames")
.usingGetClass()
.verify();
}
}

View File

@ -39,6 +39,7 @@ import org.apache.druid.guice.JoinableFactoryModule;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.math.expr.ExprMacroTable;
@ -62,12 +63,14 @@ import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.groupby.TestGroupByBuffers;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
@ -91,6 +94,7 @@ import javax.annotation.Nullable;
import java.io.File;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;
@ -99,6 +103,7 @@ import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE1;
import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE2;
import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE3;
import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE5;
import static org.apache.druid.sql.calcite.util.CalciteTests.WIKIPEDIA;
import static org.apache.druid.sql.calcite.util.TestDataBuilder.INDEX_SCHEMA_LOTS_O_COLUMNS;
import static org.apache.druid.sql.calcite.util.TestDataBuilder.INDEX_SCHEMA_NUMERIC_DIMS;
import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS1;
@ -205,6 +210,17 @@ public class CalciteMSQTestsHelper
{
final QueryableIndex index;
switch (segmentId.getDataSource()) {
case WIKIPEDIA:
try {
final File directory = new File(tempFolderProducer.apply("tmpDir"), StringUtils.format("wikipedia-index-%s", UUID.randomUUID()));
final IncrementalIndex incrementalIndex = TestIndex.makeWikipediaIncrementalIndex();
TestIndex.INDEX_MERGER.persist(incrementalIndex, directory, IndexSpec.DEFAULT, null);
index = TestIndex.INDEX_IO.loadIndex(directory);
}
catch (Exception e) {
throw new RuntimeException(e);
}
break;
case DATASOURCE1:
IncrementalIndexSchema foo1Schema = new IncrementalIndexSchema.Builder()
.withMetrics(

View File

@ -126,7 +126,7 @@ public interface Operator
*/
STOP,
/**
* Inidcates that the downstream processing should pause its pushing of results and instead return a
* Indicates that the downstream processing should pause its pushing of results and instead return a
* continuation object that encapsulates whatever state is required to resume processing. When this signal is
* received, Operators that are generating data might choose to exert backpressure or otherwise pause their
* processing efforts until called again with the returned continuation object.

View File

@ -23,7 +23,9 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class ComposingProcessor implements Processor
{
@ -37,6 +39,16 @@ public class ComposingProcessor implements Processor
this.processors = processors;
}
@Override
public List<String> getOutputColumnNames()
{
List<String> outputColumnNames = new ArrayList<>();
for (Processor processor : processors) {
outputColumnNames.addAll(processor.getOutputColumnNames());
}
return outputColumnNames;
}
@JsonProperty("processors")
public Processor[] getProcessors()
{

View File

@ -31,6 +31,8 @@ import org.apache.druid.query.operator.window.value.WindowLastProcessor;
import org.apache.druid.query.operator.window.value.WindowOffsetProcessor;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import java.util.List;
/**
* A Processor is a bit of logic that processes a single RowsAndColumns object to produce a new RowsAndColumns
* object. Generally speaking, it is used to add or alter columns in a batch-oriented fashion.
@ -80,4 +82,9 @@ public interface Processor
* @return boolean identifying if these processors should be considered equivalent to each other.
*/
boolean validateEquivalent(Processor otherProcessor);
/**
* @return List of output column names for the Processor.
*/
List<String> getOutputColumnNames();
}

View File

@ -27,7 +27,9 @@ import org.apache.druid.query.rowsandcols.semantic.DefaultFramedOnHeapAggregatab
import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
public class WindowFramedAggregateProcessor implements Processor
@ -45,6 +47,16 @@ public class WindowFramedAggregateProcessor implements Processor
private final WindowFrame frame;
private final AggregatorFactory[] aggregations;
@Override
public List<String> getOutputColumnNames()
{
List<String> outputColumnNames = new ArrayList<>();
for (AggregatorFactory aggregation : aggregations) {
outputColumnNames.add(aggregation.getName());
}
return outputColumnNames;
}
@JsonCreator
public WindowFramedAggregateProcessor(
@JsonProperty("frame") WindowFrame frame,

View File

@ -28,12 +28,20 @@ import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public class WindowPercentileProcessor implements Processor
{
private final int numBuckets;
private final String outputColumn;
@Override
public List<String> getOutputColumnNames()
{
return Collections.singletonList(outputColumn);
}
@JsonCreator
public WindowPercentileProcessor(
@JsonProperty("outputColumn") String outputColumn,

View File

@ -27,6 +27,7 @@ import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.ClusteredGroupPartitioner;
import org.apache.druid.query.rowsandcols.semantic.DefaultClusteredGroupPartitioner;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
@ -124,4 +125,9 @@ public abstract class WindowRankingProcessorBase implements Processor
return Objects.equals(groupingCols, other.groupingCols) && Objects.equals(outputColumn, other.outputColumn);
}
@Override
public List<String> getOutputColumnNames()
{
return Collections.singletonList(outputColumn);
}
}

View File

@ -28,6 +28,9 @@ import org.apache.druid.query.rowsandcols.column.ColumnAccessorBasedColumn;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import org.apache.druid.segment.column.ColumnType;
import java.util.Collections;
import java.util.List;
public class WindowRowNumberProcessor implements Processor
{
private final String outputColumn;
@ -128,4 +131,10 @@ public class WindowRowNumberProcessor implements Processor
"outputColumn='" + outputColumn + '\'' +
'}';
}
@Override
public List<String> getOutputColumnNames()
{
return Collections.singletonList(outputColumn);
}
}

View File

@ -26,6 +26,8 @@ import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
public abstract class WindowValueProcessorBase implements Processor
@ -100,4 +102,10 @@ public abstract class WindowValueProcessorBase implements Processor
return "inputColumn=" + inputColumn +
", outputColumn='" + outputColumn + '\'';
}
@Override
public List<String> getOutputColumnNames()
{
return Collections.singletonList(outputColumn);
}
}

View File

@ -27,6 +27,9 @@ import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
public class WindowProcessorOperatorTest
{
@Test
@ -53,6 +56,12 @@ public class WindowProcessorOperatorTest
{
return true;
}
@Override
public List<String> getOutputColumnNames()
{
return Collections.emptyList();
}
},
InlineScanOperator.make(rac)
);

View File

@ -23,6 +23,9 @@ import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
public class ComposingProcessorTest
{
@Test
@ -32,6 +35,7 @@ public class ComposingProcessorTest
final ProcessorForTesting secondProcessor = new ProcessorForTesting();
ComposingProcessor proc = new ComposingProcessor(firstProcessor, secondProcessor);
Assert.assertTrue(proc.getOutputColumnNames().isEmpty());
proc.process(null);
Assert.assertEquals(1, firstProcessor.processCounter);
@ -70,5 +74,11 @@ public class ComposingProcessorTest
++validateCounter;
return validationResult;
}
@Override
public List<String> getOutputColumnNames()
{
return Collections.emptyList();
}
}
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.query.operator.window;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.common.config.NullHandling;
@ -51,6 +52,7 @@ public class WindowFramedAggregateProcessorTest
new DoubleSumAggregatorFactory("cummSum", "doubleCol")
};
WindowFramedAggregateProcessor proc = new WindowFramedAggregateProcessor(theFrame, theAggs);
Assert.assertEquals(ImmutableList.of("cummMax", "cummSum"), proc.getOutputColumnNames());
final MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(ImmutableMap.of(
"yay", new IntArrayColumn(new int[]{1, 2, 3})

View File

@ -25,6 +25,7 @@ import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
@ -42,6 +43,7 @@ public class WindowCumeDistProcessorTest
MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(map);
Processor processor = new WindowCumeDistProcessor(Collections.singletonList("vals"), "CumeDist");
Assert.assertEquals(Collections.singletonList("CumeDist"), processor.getOutputColumnNames());
final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
.expectColumn("vals", new int[]{7, 18, 18, 30, 120, 121, 122, 122, 8290, 8290})

View File

@ -25,6 +25,7 @@ import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
@ -42,6 +43,7 @@ public class WindowDenseRankProcessorTest
MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(map);
Processor processor = new WindowDenseRankProcessor(Collections.singletonList("vals"), "DenseRank");
Assert.assertEquals(Collections.singletonList("DenseRank"), processor.getOutputColumnNames());
final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
.expectColumn("vals", new int[]{7, 18, 18, 30, 120, 121, 122, 122, 8290, 8290})

View File

@ -19,6 +19,7 @@
package org.apache.druid.query.operator.window.ranking;
import com.google.common.collect.ImmutableList;
import org.apache.druid.query.operator.window.ComposingProcessor;
import org.apache.druid.query.operator.window.Processor;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
@ -29,6 +30,7 @@ import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.segment.column.ColumnType;
import org.junit.Assert;
import org.junit.Test;
import java.util.LinkedHashMap;
@ -63,6 +65,11 @@ public class WindowPercentileProcessorTest
new WindowPercentileProcessor("10292", 10292)
);
Assert.assertEquals(
ImmutableList.of("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "10292"),
processor.getOutputColumnNames()
);
final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
.expectColumn("intCol", new int[]{88, 1, 2, 3, 4, 5, 6, 7, 8, 9})
.expectColumn("doubleCol", new double[]{0.4728, 1, 2, 3, 4, 5, 6, 7, 8, 9})

View File

@ -19,6 +19,7 @@
package org.apache.druid.query.operator.window.ranking;
import com.google.common.collect.ImmutableList;
import org.apache.druid.query.operator.window.ComposingProcessor;
import org.apache.druid.query.operator.window.Processor;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
@ -26,6 +27,7 @@ import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
@ -49,6 +51,8 @@ public class WindowRankProcessorTest
new WindowRankProcessor(orderingCols, "rankAsPercent", true)
);
Assert.assertEquals(ImmutableList.of("rank", "rankAsPercent"), processor.getOutputColumnNames());
final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
.expectColumn("vals", new int[]{7, 18, 18, 30, 120, 121, 122, 122, 8290, 8290})
.expectColumn("rank", new int[]{1, 2, 2, 4, 5, 6, 7, 7, 9, 9})

View File

@ -28,8 +28,10 @@ import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.segment.column.ColumnType;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
@ -49,6 +51,7 @@ public class WindowRowNumberProcessorTest
MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(map);
Processor processor = new WindowRowNumberProcessor("rowRow");
Assert.assertEquals(Collections.singletonList("rowRow"), processor.getOutputColumnNames());
final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
.expectColumn("intCol", new int[]{88, 1, 2, 3, 4, 5, 6, 7, 8, 9})

View File

@ -19,6 +19,7 @@
package org.apache.druid.query.operator.window.value;
import com.google.common.collect.ImmutableList;
import org.apache.druid.query.operator.window.ComposingProcessor;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
@ -28,6 +29,7 @@ import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.segment.column.ColumnType;
import org.junit.Assert;
import org.junit.Test;
import java.util.LinkedHashMap;
@ -59,6 +61,11 @@ public class WindowFirstProcessorTest
new WindowFirstProcessor("nullFirstCol", "NullFirstCol")
);
Assert.assertEquals(
ImmutableList.of("FirstIntCol", "FirstDoubleCol", "FirstObjectCol", "NullFirstCol"),
processor.getOutputColumnNames()
);
final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
.expectColumn("intCol", new int[]{88, 1, 2, 3, 4, 5, 6, 7, 8, 9})
.expectColumn("doubleCol", new double[]{0.4728, 1, 2, 3, 4, 5, 6, 7, 8, 9})

View File

@ -19,6 +19,7 @@
package org.apache.druid.query.operator.window.value;
import com.google.common.collect.ImmutableList;
import org.apache.druid.query.operator.window.ComposingProcessor;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
@ -28,6 +29,7 @@ import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.segment.column.ColumnType;
import org.junit.Assert;
import org.junit.Test;
import java.util.LinkedHashMap;
@ -58,6 +60,10 @@ public class WindowLastProcessorTest
new WindowLastProcessor("objectCol", "LastObjectCol"),
new WindowLastProcessor("nullLastCol", "NullLastCol")
);
Assert.assertEquals(
ImmutableList.of("LastIntCol", "LastDoubleCol", "LastObjectCol", "NullLastCol"),
processor.getOutputColumnNames()
);
final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()

View File

@ -7533,4 +7533,78 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
{
windowQueryTest();
}
/*
Druid query tests
*/
@DrillTest("druid_queries/same_window_across_columns/wikipedia_query_1")
@Test
public void test_same_window_wikipedia_query_1()
{
windowQueryTest();
}
@DrillTest("druid_queries/same_window_across_columns/wikipedia_query_1_named_window")
@Test
public void test_same_window_wikipedia_query_1_named_window()
{
windowQueryTest();
}
@DrillTest("druid_queries/multiple_windows/wikipedia_query_1")
@Test
public void test_multiple_windows_wikipedia_query_1()
{
windowQueryTest();
}
@DrillTest("druid_queries/multiple_windows/wikipedia_query_1_named_windows")
@Test
public void test_multiple_windows_wikipedia_query_1_named_windows()
{
windowQueryTest();
}
@DrillTest("druid_queries/shuffle_columns/wikipedia_query_1")
@Test
public void test_shuffle_columns_wikipedia_query_1()
{
windowQueryTest();
}
@DrillTest("druid_queries/shuffle_columns/wikipedia_query_1_shuffle_1")
@Test
public void test_shuffle_columns_wikipedia_query_1_shuffle_1()
{
windowQueryTest();
}
@DrillTest("druid_queries/shuffle_columns/wikipedia_query_2")
@Test
public void test_shuffle_columns_wikipedia_query_2()
{
windowQueryTest();
}
@DrillTest("druid_queries/shuffle_columns/wikipedia_query_2_shuffle_1")
@Test
public void test_shuffle_columns_wikipedia_query_2_shuffle_1()
{
windowQueryTest();
}
@DrillTest("druid_queries/partition_by_multiple_columns/wikipedia_query_1")
@Test
public void test_partition_by_multiple_columns_wikipedia_query_1()
{
windowQueryTest();
}
@DrillTest("druid_queries/partition_by_multiple_columns/wikipedia_query_2")
@Test
public void test_partition_by_multiple_columns_wikipedia_query_2()
{
windowQueryTest();
}
}

View File

@ -0,0 +1,13 @@
null Austria 1 1
null Republic of Korea 1 2
null Republic of Korea 2 3
null Republic of Korea 3 4
Horsching Austria 2 1
Jeonju Republic of Korea 4 1
Seongnam-si Republic of Korea 5 1
Seoul Republic of Korea 6 1
Suwon-si Republic of Korea 7 1
Vienna Austria 3 1
Vienna Austria 4 2
Vienna Austria 5 3
Yongsan-dong Republic of Korea 8 1

View File

@ -0,0 +1,6 @@
select cityName, countryName,
row_number() over (partition by countryName order by countryName, cityName, channel) as c1,
count(channel) over (partition by cityName order by countryName, cityName, channel) as c2
from wikipedia
where countryName in ('Austria', 'Republic of Korea')
group by countryName, cityName, channel

View File

@ -0,0 +1,13 @@
null Austria 1 1
null Republic of Korea 1 2
null Republic of Korea 2 3
null Republic of Korea 3 4
Horsching Austria 2 1
Jeonju Republic of Korea 4 1
Seongnam-si Republic of Korea 5 1
Seoul Republic of Korea 6 1
Suwon-si Republic of Korea 7 1
Vienna Austria 3 1
Vienna Austria 4 2
Vienna Austria 5 3
Yongsan-dong Republic of Korea 8 1

View File

@ -0,0 +1,9 @@
select cityName, countryName,
row_number() over w1 as c1,
count(channel) over w2 as c2
from wikipedia
where countryName in ('Austria', 'Republic of Korea')
group by countryName, cityName, channel
WINDOW
w1 AS (partition by countryName order by countryName, cityName, channel),
w2 AS (partition by cityName order by countryName, cityName, channel)

View File

@ -0,0 +1,15 @@
Austria null 94 7
Austria null 4685 7
Austria null 14 7
Austria null 0 7
Austria null 272 7
Austria null 0 7
Austria null 6979 7
Guatemala null 0 1
Guatemala El Salvador 1 1
Guatemala Guatemala City 173 1
Austria Horsching 0 1
Austria Vienna 93 4
Austria Vienna 72 4
Austria Vienna 0 4
Austria Vienna 0 4

View File

@ -0,0 +1,7 @@
SELECT
countryName,
cityName,
added,
count(added) OVER (PARTITION BY countryName, cityName)
FROM "wikipedia"
where countryName in ('Guatemala', 'Austria')

View File

@ -0,0 +1,15 @@
Austria null 0 7 12044 1
Austria null 0 7 12044 2
Austria null 14 7 12044 1
Austria null 94 7 12044 1
Austria null 272 7 12044 1
Austria null 4685 7 12044 1
Austria null 6979 7 12044 1
Guatemala null 0 1 0 1
Guatemala El Salvador 1 1 1 1
Guatemala Guatemala City 173 1 173 1
Austria Horsching 0 1 0 1
Austria Vienna 0 4 165 1
Austria Vienna 0 4 165 2
Austria Vienna 72 4 165 1
Austria Vienna 93 4 165 1

View File

@ -0,0 +1,9 @@
SELECT
countryName,
cityName,
added,
count(added) OVER (PARTITION BY countryName, cityName),
sum(added) OVER (PARTITION BY countryName, cityName),
ROW_NUMBER() OVER (PARTITION BY countryName, cityName, added)
FROM "wikipedia"
where countryName in ('Guatemala', 'Austria')

View File

@ -0,0 +1,15 @@
Austria 162 12047 12209
Austria 162 12047 12209
Austria 162 12047 12209
Austria 162 12047 12209
Austria 162 12047 12209
Austria 162 12047 12209
Austria 162 12047 12209
Austria 162 12047 12209
Austria 162 12047 12209
Austria 162 12047 12209
Austria 162 12047 12209
Austria 162 12047 12209
Guatemala 167 7 174
Guatemala 167 7 174
Guatemala 167 7 174

View File

@ -0,0 +1,6 @@
SELECT countryName,
sum("deleted") OVER (PARTITION BY countryName) as count_c3,
sum(delta) OVER (PARTITION BY countryName) as count_c1,
sum(added) OVER (PARTITION BY countryName) as count_c2
FROM "wikipedia"
where countryName in ('Guatemala', 'Austria')

View File

@ -0,0 +1,15 @@
Austria 162 12047 12209
Austria 162 12047 12209
Austria 162 12047 12209
Austria 162 12047 12209
Austria 162 12047 12209
Austria 162 12047 12209
Austria 162 12047 12209
Austria 162 12047 12209
Austria 162 12047 12209
Austria 162 12047 12209
Austria 162 12047 12209
Austria 162 12047 12209
Guatemala 167 7 174
Guatemala 167 7 174
Guatemala 167 7 174

View File

@ -0,0 +1,7 @@
SELECT countryName,
sum("deleted") OVER w as count_c3,
sum(delta) OVER w as count_c1,
sum(added) OVER w as count_c2
FROM "wikipedia"
where countryName in ('Guatemala', 'Austria')
WINDOW w AS (PARTITION BY countryName)

View File

@ -0,0 +1,15 @@
Austria 1017.4166666666666
Austria 1017.4166666666666
Austria 1017.4166666666666
Austria 1017.4166666666666
Austria 1017.4166666666666
Austria 1017.4166666666666
Austria 1017.4166666666666
Austria 1017.4166666666666
Austria 1017.4166666666666
Austria 1017.4166666666666
Austria 1017.4166666666666
Austria 1017.4166666666666
Guatemala 58
Guatemala 58
Guatemala 58

View File

@ -0,0 +1,5 @@
SELECT
countryName,
AVG(added) OVER(PARTITION BY countryName)
FROM wikipedia
where countryName in ('Guatemala', 'Austria')

View File

@ -0,0 +1,15 @@
1017.4166666666666 Austria
1017.4166666666666 Austria
1017.4166666666666 Austria
1017.4166666666666 Austria
1017.4166666666666 Austria
1017.4166666666666 Austria
1017.4166666666666 Austria
1017.4166666666666 Austria
1017.4166666666666 Austria
1017.4166666666666 Austria
1017.4166666666666 Austria
1017.4166666666666 Austria
58 Guatemala
58 Guatemala
58 Guatemala

View File

@ -0,0 +1,5 @@
SELECT
AVG(added) OVER(PARTITION BY countryName),
countryName
FROM wikipedia
where countryName in ('Guatemala', 'Austria')

View File

@ -0,0 +1,16 @@
Austria null 1 #de.wikipedia 1
Guatemala null 1 #es.wikipedia 2
Republic of Korea null 1 #en.wikipedia 3
Republic of Korea null 2 #ja.wikipedia 4
Republic of Korea null 3 #ko.wikipedia 5
Guatemala El Salvador 2 #es.wikipedia 1
Guatemala Guatemala City 3 #es.wikipedia 1
Austria Horsching 2 #de.wikipedia 1
Republic of Korea Jeonju 4 #ko.wikipedia 1
Republic of Korea Seongnam-si 5 #ko.wikipedia 1
Republic of Korea Seoul 6 #ko.wikipedia 1
Republic of Korea Suwon-si 7 #ko.wikipedia 1
Austria Vienna 3 #de.wikipedia 1
Austria Vienna 4 #es.wikipedia 2
Austria Vienna 5 #tr.wikipedia 3
Republic of Korea Yongsan-dong 8 #ko.wikipedia 1

View File

@ -0,0 +1,9 @@
SELECT
countryName,
cityName,
ROW_NUMBER() OVER(PARTITION BY countryName),
channel,
COUNT(channel) over (PARTITION BY cityName order by countryName, cityName, channel)
FROM wikipedia
where countryName in ('Guatemala', 'Austria', 'Republic of Korea')
group by countryName, cityName, channel

View File

@ -0,0 +1,16 @@
1 Austria null 1 #de.wikipedia
1 Guatemala null 2 #es.wikipedia
1 Republic of Korea null 3 #en.wikipedia
2 Republic of Korea null 4 #ja.wikipedia
3 Republic of Korea null 5 #ko.wikipedia
2 Guatemala El Salvador 1 #es.wikipedia
3 Guatemala Guatemala City 1 #es.wikipedia
2 Austria Horsching 1 #de.wikipedia
4 Republic of Korea Jeonju 1 #ko.wikipedia
5 Republic of Korea Seongnam-si 1 #ko.wikipedia
6 Republic of Korea Seoul 1 #ko.wikipedia
7 Republic of Korea Suwon-si 1 #ko.wikipedia
3 Austria Vienna 1 #de.wikipedia
4 Austria Vienna 2 #es.wikipedia
5 Austria Vienna 3 #tr.wikipedia
8 Republic of Korea Yongsan-dong 1 #ko.wikipedia

View File

@ -0,0 +1,9 @@
SELECT
ROW_NUMBER() OVER(PARTITION BY countryName),
countryName,
cityName,
COUNT(channel) over (PARTITION BY cityName order by countryName, cityName, channel),
channel
FROM wikipedia
where countryName in ('Guatemala', 'Austria', 'Republic of Korea')
group by countryName, cityName, channel