Add Sort Operator for Window Functions (#13619)

* Addition of NaiveSortMaker and Default implementation

Add the NaiveSortMaker which makes a sorter
object and a default implementation of the
interface.

This also allows us to plan multiple different window 
definitions on the same query.
This commit is contained in:
imply-cheddar 2023-01-06 17:27:18 +09:00 committed by GitHub
parent 4ee4d99b8d
commit f1821a7c18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 2553 additions and 178 deletions

View File

@ -65,7 +65,11 @@ public class MSQWarningsTest extends MSQTestBase
@Before
public void setUp3() throws IOException
{
toRead = MSQTestFileUtils.getResourceAsTemporaryFile(this, "/unparseable.gz");
File tempFile = MSQTestFileUtils.getResourceAsTemporaryFile(this, "/unparseable.gz");
// Rename the file and the file's extension from .tmp to .gz to prevent issues with 'parsing' the file
toRead = new File(tempFile.getParentFile(), "unparseable.gz");
tempFile.renameTo(toRead);
toReadFileNameAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
rowSignature = RowSignature.builder()

View File

@ -197,17 +197,17 @@ import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS2;
* Base test runner for running MSQ unit tests. It sets up multi stage query execution environment
* and populates data for the datasources. The runner does not go via the HTTP layer for communication between the
* various MSQ processes.
*
* <p>
* Controller -> Coordinator (Coordinator is mocked)
*
* <p>
* In the Ut's we go from:
* {@link MSQTaskQueryMaker} -> {@link MSQTestOverlordServiceClient} -> {@link Controller}
*
*
* <p>
* <p>
* Controller -> Worker communication happens in {@link MSQTestControllerContext}
*
* <p>
* Worker -> Controller communication happens in {@link MSQTestControllerClient}
*
* <p>
* Controller -> Overlord communication happens in {@link MSQTestTaskActionClient}
*/
public class MSQTestBase extends BaseCalciteQueryTest
@ -258,7 +258,8 @@ public class MSQTestBase extends BaseCalciteQueryTest
{
super.configureGuice(builder);
builder.addModule(new DruidModule() {
builder.addModule(new DruidModule()
{
// Small subset of MsqSqlModule
@Override

View File

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.operator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
public class ColumnWithDirection
{
public static ColumnWithDirection ascending(String column)
{
return new ColumnWithDirection(column, Direction.ASC);
}
public static ColumnWithDirection descending(String column)
{
return new ColumnWithDirection(column, Direction.DESC);
}
public enum Direction
{
ASC(1),
DESC(-1);
private final int directionInt;
Direction(int directionInt)
{
this.directionInt = directionInt;
}
public int getDirectionInt()
{
return directionInt;
}
}
private final String columnName;
private final Direction direction;
@JsonCreator
public ColumnWithDirection(
@JsonProperty("column") String columnName,
@JsonProperty("direction") Direction direction
)
{
this.columnName = columnName;
this.direction = direction;
}
@JsonProperty("column")
public String getColumn()
{
return columnName;
}
@JsonProperty("direction")
public Direction getDirection()
{
return direction;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof ColumnWithDirection)) {
return false;
}
ColumnWithDirection that = (ColumnWithDirection) o;
return Objects.equals(columnName, that.columnName) && direction == that.direction;
}
@Override
public int hashCode()
{
return Objects.hash(columnName, direction);
}
@Override
public String toString()
{
return "ColumnWithDirection{" +
"columnName='" + columnName + '\'' +
", direction=" + direction +
'}';
}
}

View File

@ -20,8 +20,8 @@
package org.apache.druid.query.operator;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.DefaultSortedGroupPartitioner;
import org.apache.druid.query.rowsandcols.semantic.SortedGroupPartitioner;
import org.apache.druid.query.rowsandcols.semantic.ClusteredGroupPartitioner;
import org.apache.druid.query.rowsandcols.semantic.DefaultClusteredGroupPartitioner;
import java.util.Iterator;
import java.util.List;
@ -60,9 +60,9 @@ public class NaivePartitioningOperator implements Operator
@Override
public boolean push(RowsAndColumns rac)
{
SortedGroupPartitioner groupPartitioner = rac.as(SortedGroupPartitioner.class);
ClusteredGroupPartitioner groupPartitioner = rac.as(ClusteredGroupPartitioner.class);
if (groupPartitioner == null) {
groupPartitioner = new DefaultSortedGroupPartitioner(rac);
groupPartitioner = new DefaultClusteredGroupPartitioner(rac);
}
partitionsIter = groupPartitioner.partitionOnBoundaries(partitionColumns).iterator();

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.operator;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.NaiveSortMaker;
import java.util.ArrayList;
/**
* A naive sort operator is an operation that sorts a stream of data in-place. Generally speaking this means
* that it has to accumulate all of the data of its child operator first before it can sort. This limitation
* means that hopefully this operator is only planned in a very small number of circumstances.
*/
public class NaiveSortOperator implements Operator
{
private final Operator child;
private final ArrayList<ColumnWithDirection> sortColumns;
public NaiveSortOperator(
Operator child,
ArrayList<ColumnWithDirection> sortColumns
)
{
this.child = child;
this.sortColumns = sortColumns;
}
@Override
public void go(Receiver receiver)
{
child.go(
new Receiver()
{
NaiveSortMaker.NaiveSorter sorter = null;
@Override
public boolean push(RowsAndColumns rac)
{
if (sorter == null) {
sorter = NaiveSortMaker.fromRAC(rac).make(sortColumns);
} else {
sorter.moreData(rac);
}
return true;
}
@Override
public void completed()
{
receiver.push(sorter.complete());
receiver.completed();
}
}
);
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.operator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.ArrayList;
public class NaiveSortOperatorFactory implements OperatorFactory
{
private final ArrayList<ColumnWithDirection> sortColumns;
@JsonCreator
public NaiveSortOperatorFactory(
@JsonProperty("columns") ArrayList<ColumnWithDirection> sortColumns
)
{
this.sortColumns = sortColumns;
}
@JsonProperty("columns")
public ArrayList<ColumnWithDirection> getSortColumns()
{
return sortColumns;
}
@Override
public Operator wrap(Operator op)
{
return new NaiveSortOperator(op, sortColumns);
}
@Override
public boolean validateEquivalent(OperatorFactory other)
{
if (other instanceof NaiveSortOperatorFactory) {
return sortColumns.equals(((NaiveSortOperatorFactory) other).getSortColumns());
}
return false;
}
}

View File

@ -53,7 +53,7 @@ public interface Operator
{
/**
* Used to push data. Return value indicates if more data will be accepted. If false, push should not
* be called anymore.
* be called anymore. If push is called after it returned false, undefined things will happen.
*
* @param rac {@link RowsAndColumns} of data
* @return a boolean value indicating if more data will be accepted. If false, push should never be called
@ -62,7 +62,12 @@ public interface Operator
boolean push(RowsAndColumns rac);
/**
* Used to indicate that no more data will ever come
* Used to indicate that no more data will ever come. This is only used during the happy path and is not
* equivalent to a {@link java.io.Closeable#close()} method. Namely, there is no guarantee that this method
* will be called if execution halts due to an exception from push.
*
* It is acceptable for an implementation to eagerly close resources from this method, but it is not acceptable
* for this method to be the sole method of managing the lifecycle of resources held by the Operator
*/
void completed();
}

View File

@ -30,6 +30,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "naivePartition", value = NaivePartitioningOperatorFactory.class),
@JsonSubTypes.Type(name = "naiveSort", value = NaiveSortOperatorFactory.class),
@JsonSubTypes.Type(name = "window", value = WindowOperatorFactory.class),
})
public interface OperatorFactory

View File

@ -35,6 +35,7 @@ import org.apache.druid.query.spec.LegacySegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -65,6 +66,11 @@ public class WindowOperatorQuery extends BaseQuery<RowsAndColumns>
super(dataSource, new LegacySegmentSpec(Intervals.ETERNITY), false, context);
this.rowSignature = rowSignature;
this.operators = operators;
// At this point, we can also reach into a QueryDataSource and validate that the ordering expected by the
// partitioning at least aligns with the ordering coming from the underlying query. We unfortunately don't
// have enough information to validate that the underlying ordering aligns with expectations for the actual
// window operator queries, but maybe we could get that and validate it here too.
if (!(dataSource instanceof QueryDataSource || dataSource instanceof InlineDataSource)) {
throw new IAE("WindowOperatorQuery must run on top of a query or inline data source, got [%s]", dataSource);
}
@ -89,6 +95,7 @@ public class WindowOperatorQuery extends BaseQuery<RowsAndColumns>
}
@Override
@Nullable
public DimFilter getFilter()
{
return null;

View File

@ -24,8 +24,8 @@ import org.apache.druid.query.operator.window.Processor;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.DefaultSortedGroupPartitioner;
import org.apache.druid.query.rowsandcols.semantic.SortedGroupPartitioner;
import org.apache.druid.query.rowsandcols.semantic.ClusteredGroupPartitioner;
import org.apache.druid.query.rowsandcols.semantic.DefaultClusteredGroupPartitioner;
import java.util.List;
import java.util.function.Function;
@ -67,9 +67,9 @@ public abstract class WindowRankingProcessorBase implements Processor
{
final AppendableRowsAndColumns retVal = RowsAndColumns.expectAppendable(incomingPartition);
SortedGroupPartitioner groupPartitioner = incomingPartition.as(SortedGroupPartitioner.class);
ClusteredGroupPartitioner groupPartitioner = incomingPartition.as(ClusteredGroupPartitioner.class);
if (groupPartitioner == null) {
groupPartitioner = new DefaultSortedGroupPartitioner(incomingPartition);
groupPartitioner = new DefaultClusteredGroupPartitioner(incomingPartition);
}
retVal.addColumn(outputColumn, fn.apply(groupPartitioner.computeBoundaries(groupingCols)));

View File

@ -0,0 +1,241 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.Map;
public class ConcatRowsAndColumns implements RowsAndColumns
{
private final ArrayList<RowsAndColumns> racBuffer;
private final Map<String, Column> columnCache = new LinkedHashMap<>();
private final int[][] rowPointers;
private final int numRows;
public ConcatRowsAndColumns(
ArrayList<RowsAndColumns> racBuffer
)
{
this.racBuffer = racBuffer;
int numRows = 0;
for (RowsAndColumns rac : racBuffer) {
numRows += rac.numRows();
}
this.numRows = numRows;
this.rowPointers = new int[2][numRows];
int index = 0;
for (int i = 0; i < racBuffer.size(); ++i) {
final RowsAndColumns rac = racBuffer.get(i);
Arrays.fill(rowPointers[0], index, index + rac.numRows(), i);
for (int j = 0; j < rac.numRows(); ++j) {
rowPointers[1][index + j] = j;
}
index += rac.numRows();
}
}
@Override
public Collection<String> getColumnNames()
{
return racBuffer.get(0).getColumnNames();
}
@Override
public int numRows()
{
return numRows;
}
@Override
@Nullable
public Column findColumn(String name)
{
if (columnCache.containsKey(name)) {
return columnCache.get(name);
} else {
final Column firstCol = racBuffer.get(0).findColumn(name);
if (firstCol == null) {
for (int i = 1; i < racBuffer.size(); ++i) {
RowsAndColumns rac = racBuffer.get(i);
if (rac.findColumn(name) != null) {
throw new ISE("Column[%s] was not always null...", name);
}
}
columnCache.put(name, null);
return null;
} else {
ArrayList<ColumnAccessor> accessors = new ArrayList<>(racBuffer.size());
final ColumnAccessor firstAccessor = firstCol.toAccessor();
accessors.add(firstAccessor);
final ColumnType type = firstAccessor.getType();
for (int i = 1; i < racBuffer.size(); ++i) {
RowsAndColumns rac = racBuffer.get(i);
final Column col = rac.findColumn(name);
final ColumnAccessor accessor = col.toAccessor();
if (!type.equals(accessor.getType())) {
throw new ISE("Type mismatch, expected[%s], got[%s] on entry[%,d]", type, accessor.getType(), i);
}
accessors.add(accessor);
}
final ConcatedidColumn retVal = new ConcatedidColumn(type, type.getStrategy(), accessors);
columnCache.put(name, retVal);
return retVal;
}
}
}
@Nullable
@Override
public <T> T as(Class<T> clazz)
{
return null;
}
private class ConcatedidColumn implements Column
{
private final ArrayList<ColumnAccessor> accessors;
private final ColumnType type;
private final Comparator<Object> comp;
public ConcatedidColumn(
ColumnType type,
Comparator<Object> comp,
ArrayList<ColumnAccessor> accessors
)
{
this.accessors = accessors;
this.type = type;
this.comp = comp;
}
@Nonnull
@Override
public ColumnAccessor toAccessor()
{
return new ColumnAccessor()
{
@Override
public ColumnType getType()
{
return type;
}
@Override
public int numRows()
{
return numRows;
}
@Override
public boolean isNull(int rowNum)
{
final ColumnAccessor localAccessor = getLocalAccessor(rowNum);
final int localIndex = getLocalIndex(rowNum);
return localAccessor.isNull(localIndex);
}
@Nullable
@Override
public Object getObject(int rowNum)
{
final ColumnAccessor localAccessor = getLocalAccessor(rowNum);
final int localIndex = getLocalIndex(rowNum);
return localAccessor.getObject(localIndex);
}
@Override
public double getDouble(int rowNum)
{
final ColumnAccessor localAccessor = getLocalAccessor(rowNum);
final int localIndex = getLocalIndex(rowNum);
return localAccessor.getDouble(localIndex);
}
@Override
public float getFloat(int rowNum)
{
final ColumnAccessor localAccessor = getLocalAccessor(rowNum);
final int localIndex = getLocalIndex(rowNum);
return localAccessor.getFloat(localIndex);
}
@Override
public long getLong(int rowNum)
{
final ColumnAccessor localAccessor = getLocalAccessor(rowNum);
final int localIndex = getLocalIndex(rowNum);
return localAccessor.getLong(localIndex);
}
@Override
public int getInt(int rowNum)
{
final ColumnAccessor localAccessor = getLocalAccessor(rowNum);
final int localIndex = getLocalIndex(rowNum);
return localAccessor.getInt(localIndex);
}
@Override
public int compareRows(int lhsRowNum, int rhsRowNum)
{
final ColumnAccessor lhsAccessor = getLocalAccessor(lhsRowNum);
final int lhsIndex = getLocalIndex(lhsRowNum);
final ColumnAccessor rhsAccessor = getLocalAccessor(rhsRowNum);
final int rhsIndex = getLocalIndex(rhsRowNum);
return comp.compare(lhsAccessor.getObject(lhsIndex), rhsAccessor.getObject(rhsIndex));
}
private int getLocalIndex(int rowNum)
{
return rowPointers[1][rowNum];
}
private ColumnAccessor getLocalAccessor(int rowNum)
{
return accessors.get(rowPointers[0][rowNum]);
}
};
}
@Override
public <T> T as(Class<? extends T> clazz)
{
return null;
}
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols;
import org.apache.druid.query.rowsandcols.column.Column;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
public class EmptyRowsAndColumns implements RowsAndColumns
{
@Override
public Collection<String> getColumnNames()
{
return new ArrayList<>();
}
@Override
public int numRows()
{
return 0;
}
@Override
public Column findColumn(String name)
{
return null;
}
@Nullable
@Override
public <T> T as(Class<T> clazz)
{
return null;
}
}

View File

@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.column.ColumnAccessorBasedColumn;
import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
public class RearrangedRowsAndColumns implements RowsAndColumns
{
private final Map<String, Column> columnCache = new LinkedHashMap<>();
private final int[] pointers;
private final RowsAndColumns rac;
public RearrangedRowsAndColumns(
int[] pointers,
RowsAndColumns rac
)
{
if (pointers.length != rac.numRows()) {
throw new IAE("length mismatch, pointers[%,d], rac[%,d]", pointers.length, rac.numRows());
}
this.pointers = pointers;
this.rac = rac;
}
@Override
public Collection<String> getColumnNames()
{
return rac.getColumnNames();
}
@Override
public int numRows()
{
return pointers.length;
}
@Override
@Nullable
public Column findColumn(String name)
{
if (columnCache.containsKey(name)) {
return columnCache.get(name);
} else {
final Column column = rac.findColumn(name);
if (column == null) {
columnCache.put(name, null);
return null;
}
final ColumnAccessor accessor = column.toAccessor();
return new ColumnAccessorBasedColumn(
new ColumnAccessor()
{
@Override
public ColumnType getType()
{
return accessor.getType();
}
@Override
public int numRows()
{
return pointers.length;
}
@Override
public boolean isNull(int rowNum)
{
return accessor.isNull(pointers[rowNum]);
}
@Nullable
@Override
public Object getObject(int rowNum)
{
return accessor.getObject(pointers[rowNum]);
}
@Override
public double getDouble(int rowNum)
{
return accessor.getDouble(pointers[rowNum]);
}
@Override
public float getFloat(int rowNum)
{
return accessor.getFloat(pointers[rowNum]);
}
@Override
public long getLong(int rowNum)
{
return accessor.getLong(pointers[rowNum]);
}
@Override
public int getInt(int rowNum)
{
return accessor.getInt(pointers[rowNum]);
}
@Override
public int compareRows(int lhsRowNum, int rhsRowNum)
{
return accessor.compareRows(pointers[lhsRowNum], pointers[rhsRowNum]);
}
}
);
}
}
@Nullable
@Override
public <T> T as(Class<T> clazz)
{
return null;
}
}

View File

@ -95,6 +95,7 @@ public interface RowsAndColumns
* @param name the name of the column to find
* @return the Column, if found. null if not found.
*/
@Nullable
Column findColumn(String name);
/**

View File

@ -25,17 +25,21 @@ import java.util.ArrayList;
import java.util.List;
/**
* A semantic interface used to partition a data set based on a given set of dimensions.
* A semantic interface used to partition a data set based on a given set of columns.
* <p>
* This specifically assumes that it is working with sorted data and, as such, the groups returned
* This specifically assumes that it is working with pre-clustered data and, as such, the groups returned
* should be contiguous and unique (that is, all rows for a given combination of values exist in only one grouping)
*/
public interface SortedGroupPartitioner
public interface ClusteredGroupPartitioner
{
/**
* Computes and returns a list of contiguous boundaries for independent groups. All rows in a specific grouping
* should have the same values for the identified columns. Additionally, as this is assuming it is dealing with
* sorted data, there should only be a single entry in the return value for a given set of values of the columns.
* clustered data, there should only be a single entry in the return value for a given set of values of the columns.
* <p>
* Note that implementations are not expected to do any validation that the data is pre-clustered. There is no
* expectation that an implementation will identify that the same cluster existed non-contiguously. It is up to
* the caller to ensure that data is clustered correctly before invoking this method.
*
* @param columns the columns to partition on
* @return an int[] representing the start (inclusive) and stop (exclusive) offsets of boundaries. Boundaries are

View File

@ -21,7 +21,6 @@ package org.apache.druid.query.rowsandcols.semantic;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.operator.LimitedRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
@ -30,11 +29,11 @@ import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import java.util.ArrayList;
import java.util.List;
public class DefaultSortedGroupPartitioner implements SortedGroupPartitioner
public class DefaultClusteredGroupPartitioner implements ClusteredGroupPartitioner
{
private final RowsAndColumns rac;
public DefaultSortedGroupPartitioner(
public DefaultClusteredGroupPartitioner(
RowsAndColumns rac
)
{
@ -63,10 +62,8 @@ public class DefaultSortedGroupPartitioner implements SortedGroupPartitioner
int end = boundaries.getInt(i);
for (int j = start + 1; j < end; ++j) {
int comparison = accessor.compareRows(j - 1, j);
if (comparison < 0) {
if (comparison != 0) {
newBoundaries.add(j);
} else if (comparison > 0) {
throw new ISE("Pre-sorted data required, rows[%s] and [%s] were not in order", j - 1, j);
}
}
newBoundaries.add(end);

View File

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.semantic;
import it.unimi.dsi.fastutil.Arrays;
import org.apache.druid.query.operator.ColumnWithDirection;
import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
import org.apache.druid.query.rowsandcols.EmptyRowsAndColumns;
import org.apache.druid.query.rowsandcols.RearrangedRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import java.util.ArrayList;
public class DefaultNaiveSortMaker implements NaiveSortMaker
{
private final RowsAndColumns rac;
public DefaultNaiveSortMaker(
RowsAndColumns rac
)
{
this.rac = rac;
}
@Override
public NaiveSorter make(ArrayList<ColumnWithDirection> ordering)
{
final NaiveSorter retVal = new DefaultNaiveSorter(ordering);
retVal.moreData(rac);
return retVal;
}
private static class DefaultNaiveSorter implements NaiveSorter
{
final ArrayList<RowsAndColumns> racBuffer;
private final ArrayList<ColumnWithDirection> ordering;
public DefaultNaiveSorter(ArrayList<ColumnWithDirection> ordering)
{
this.ordering = ordering;
racBuffer = new ArrayList<>();
}
@Override
public RowsAndColumns moreData(RowsAndColumns rac)
{
racBuffer.add(rac);
return null;
}
@Override
public RowsAndColumns complete()
{
if (racBuffer.isEmpty()) {
return new EmptyRowsAndColumns();
}
ConcatRowsAndColumns rac = new ConcatRowsAndColumns(racBuffer);
// One int for the racBuffer, another for the rowIndex
int[] sortedPointers = new int[rac.numRows()];
for (int i = 0; i < sortedPointers.length; ++i) {
sortedPointers[i] = i;
}
int index = 0;
int[] direction = new int[ordering.size()];
ColumnAccessor[] accessors = new ColumnAccessor[ordering.size()];
for (ColumnWithDirection orderByColumnSpec : ordering) {
final Column col = rac.findColumn(orderByColumnSpec.getColumn());
// Null columns are all the same value: null. Therefore, it is already sorted, so just ignore
if (col != null) {
accessors[index] = col.toAccessor();
direction[index] = orderByColumnSpec.getDirection().getDirectionInt();
++index;
}
}
final int numColsToCompare = index;
Arrays.quickSort(
0,
rac.numRows(),
(k1, k2) -> {
for (int i = 0; i < numColsToCompare; ++i) {
final ColumnAccessor accessy = accessors[i];
int val = accessy.compareRows(sortedPointers[k1], sortedPointers[k2]);
if (val != 0) {
return val * direction[i];
}
}
return 0;
},
(a, b) -> {
int bufPos = sortedPointers[a];
sortedPointers[a] = sortedPointers[b];
sortedPointers[b] = bufPos;
}
);
return new RearrangedRowsAndColumns(sortedPointers, rac);
}
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.semantic;
import com.google.common.collect.Lists;
import org.apache.druid.query.operator.ColumnWithDirection;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import java.util.ArrayList;
/**
* A NaiveSorter sorts a stream of data in-place. In the worst case, that means it needs to buffer up all
* RowsAndColumns received before it can return anything. This semantic interface is setup to allow an
* implementation of RowsAndColumns to know that it is pre-sorted and potentially return sorted data early.
* <p>
* The default implementation cannot actually do this, however, so it is up to the specific concrete RowsAndColumns
* classes to provide their own implementations that can do this.
*/
public interface NaiveSortMaker
{
static NaiveSortMaker fromRAC(RowsAndColumns rac)
{
NaiveSortMaker retVal = rac.as(NaiveSortMaker.class);
if (retVal == null) {
retVal = new DefaultNaiveSortMaker(rac);
}
return retVal;
}
interface NaiveSorter
{
/**
* Adds more data to the sort. This method can optionally return a RowsAndColumns object. If it does return
* a RowsAndColumns object, any data included in the return is assumed to be in sorted-order.
*
* @param rac the data to include in the sort
* @return optionally, a RowsAndColumns object of data that is known to be in sorted order, null if nothing yet.
*/
RowsAndColumns moreData(RowsAndColumns rac);
/**
* Indicate that there is no more data coming.
*
* @return A RowsAndColumns object of sorted data that has not been returned already from {@link #moreData} calls.
*/
RowsAndColumns complete();
}
/**
* Makes the NaiveSorter that will actually do the sort. This method uses {@code List<OrderByColumnSpec>} to avoid
* littering the code with extra objects for the same thing. {@code OrderByColumnSpec} is only used to identify
* which column should be sorted and in which direction. Specifically, it has a "dimensionComparator" field which
* seems to indicate that it's possible to provide a specific comparator ordering, this should be completely ignored
* by implementations of the NaiveSorter interface.
*
* @param ordering a specification of which columns to sort in which direction
* @return a NaiveSorter that will sort according to the provided spec
*/
NaiveSorter make(ArrayList<ColumnWithDirection> ordering);
default NaiveSorter make(ColumnWithDirection... ordering)
{
return make(Lists.newArrayList(ordering));
}
}

View File

@ -21,14 +21,14 @@ package org.apache.druid.query.operator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.junit.Assert;
import org.junit.Test;
import java.util.function.BiFunction;
public class NaivePartitioningOperatorTest
{
@Test
@ -50,16 +50,20 @@ public class NaivePartitioningOperatorTest
.expectRowsAndColumns(
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{0, 0, 0})
.expectColumn("unsorted", new int[]{3, 54, 21}),
.expectColumn("unsorted", new int[]{3, 54, 21})
.allColumnsRegistered(),
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{1, 1})
.expectColumn("unsorted", new int[]{1, 5}),
.expectColumn("unsorted", new int[]{1, 5})
.allColumnsRegistered(),
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{2})
.expectColumn("unsorted", new int[]{54}),
.expectColumn("unsorted", new int[]{54})
.allColumnsRegistered(),
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{4, 4, 4})
.expectColumn("unsorted", new int[]{2, 3, 92})
.allColumnsRegistered()
)
.runToCompletion(op);
}
@ -92,8 +96,15 @@ public class NaivePartitioningOperatorTest
}
@Test
public void testFailUnsorted()
public void testDoesNotValidateSort()
{
BiFunction<Integer, Integer, RowsAndColumnsHelper> singleHelperMaker =
(sorted, unsorted) ->
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{sorted})
.expectColumn("unsorted", new int[]{unsorted})
.allColumnsRegistered();
RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(
ImmutableMap.of(
"sorted", new IntArrayColumn(new int[]{0, 0, 0, 1, 1, 2, 4, 4, 4}),
@ -106,22 +117,18 @@ public class NaivePartitioningOperatorTest
InlineScanOperator.make(rac)
);
boolean exceptionThrown = false;
try {
new OperatorTestHelper()
.withPushFn(
rac1 -> {
Assert.fail("I shouldn't be called, an exception should've been thrown.");
return true;
}
)
.runToCompletion(op);
}
catch (ISE ex) {
Assert.assertEquals("Pre-sorted data required, rows[1] and [2] were not in order", ex.getMessage());
exceptionThrown = true;
}
Assert.assertTrue(exceptionThrown);
new OperatorTestHelper()
.expectRowsAndColumns(
singleHelperMaker.apply(0, 3),
singleHelperMaker.apply(0, 54),
singleHelperMaker.apply(0, 21),
singleHelperMaker.apply(1, 1),
singleHelperMaker.apply(1, 5),
singleHelperMaker.apply(2, 54),
singleHelperMaker.apply(4, 2),
singleHelperMaker.apply(4, 3),
singleHelperMaker.apply(4, 92)
)
.runToCompletion(op);
}
}

View File

@ -32,6 +32,7 @@ import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
public class RowsAndColumnsHelper
{
@ -70,6 +71,7 @@ public class RowsAndColumnsHelper
private final Map<String, ColumnHelper> helpers = new LinkedHashMap<>();
private Set<String> fullColumnSet;
private AtomicReference<Integer> expectedSize = new AtomicReference<>();
public RowsAndColumnsHelper()
{
@ -112,6 +114,10 @@ public class RowsAndColumnsHelper
public ColumnHelper columnHelper(String column, int expectedSize, ColumnType expectedType)
{
if (this.expectedSize.get() == null) {
this.expectedSize.set(expectedSize);
}
Assert.assertEquals("Columns should be defined with same size", this.expectedSize.get().intValue(), expectedSize);
ColumnHelper retVal = helpers.get(column);
if (retVal == null) {
retVal = new ColumnHelper(expectedSize, expectedType);

View File

@ -30,6 +30,7 @@ import org.junit.Test;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
public class WindowRankProcessorTest
@ -42,9 +43,10 @@ public class WindowRankProcessorTest
MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(map);
final List<String> orderingCols = Collections.singletonList("vals");
Processor processor = new ComposingProcessor(
new WindowRankProcessor(Collections.singletonList("vals"), "rank", false),
new WindowRankProcessor(Collections.singletonList("vals"), "rankAsPercent", true)
new WindowRankProcessor(orderingCols, "rank", false),
new WindowRankProcessor(orderingCols, "rankAsPercent", true)
);
final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
@ -68,9 +70,10 @@ public class WindowRankProcessorTest
MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(map);
final List<String> orderingCols = Collections.singletonList("vals");
Processor processor = new ComposingProcessor(
new WindowRankProcessor(Collections.singletonList("vals"), "rank", false),
new WindowRankProcessor(Collections.singletonList("vals"), "rankAsPercent", true)
new WindowRankProcessor(orderingCols, "rank", false),
new WindowRankProcessor(orderingCols, "rankAsPercent", true)
);
final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols;
import org.apache.druid.query.operator.LimitedRowsAndColumns;
import java.util.ArrayList;
import java.util.function.Function;
public class ConcatRowsAndColumnsTest extends RowsAndColumnsTestBase
{
public ConcatRowsAndColumnsTest()
{
super(ConcatRowsAndColumns.class);
}
public static Function<MapOfColumnsRowsAndColumns, ConcatRowsAndColumns> MAKER = input -> {
int rowsPerChunk = Math.max(1, input.numRows() / 4);
ArrayList<RowsAndColumns> theRac = new ArrayList<>();
int startId = 0;
while (startId < input.numRows()) {
theRac.add(new LimitedRowsAndColumns(input, startId, Math.min(input.numRows(), startId + rowsPerChunk)));
startId += rowsPerChunk;
}
return new ConcatRowsAndColumns(theRac);
};
}

View File

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.junit.Test;
import java.util.AbstractList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Function;
public class RearrangedRowsAndColumnsTest extends RowsAndColumnsTestBase
{
public RearrangedRowsAndColumnsTest()
{
super(RearrangedRowsAndColumns.class);
}
public static Function<MapOfColumnsRowsAndColumns, RearrangedRowsAndColumns> MAKER = input -> {
int[] pointers = new int[input.numRows()];
for (int i = 0; i < pointers.length; ++i) {
pointers[i] = i;
}
shuffleArray(pointers);
Map<String, Column> shuffledMap = new LinkedHashMap<>();
for (String columnName : input.getColumnNames()) {
final ColumnAccessor accessor = input.findColumn(columnName).toAccessor();
Object[] theVals = new Object[pointers.length];
for (int i = 0; i < pointers.length; ++i) {
theVals[pointers[i]] = accessor.getObject(i);
}
shuffledMap.put(columnName, new ObjectArrayColumn(theVals, accessor.getType()));
}
return new RearrangedRowsAndColumns(pointers, new MapOfColumnsRowsAndColumns(shuffledMap, pointers.length));
};
@SuppressWarnings("UnusedCollectionModifiedInPlace")
private static void shuffleArray(int[] pointers)
{
Collections.shuffle(new AbstractList<Object>()
{
@Override
public Object get(int index)
{
return pointers[index];
}
@Override
public Object set(int index, Object element)
{
int retVal = pointers[index];
pointers[index] = (Integer) element;
return retVal;
}
@Override
public int size()
{
return pointers.length;
}
});
}
@Test
public void testSanity()
{
new RowsAndColumnsHelper()
.expectColumn("int", new int[]{0, 1, 2, 3, 4})
.allColumnsRegistered()
.validate(
new RearrangedRowsAndColumns(
new int[]{4, 2, 0, 3, 1},
MapOfColumnsRowsAndColumns.of(
"int",
new IntArrayColumn(new int[]{2, 4, 1, 3, 0})
)
));
}
}

View File

@ -25,6 +25,8 @@ import org.apache.druid.java.util.common.ISE;
import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@ -35,14 +37,14 @@ import java.util.function.Function;
* <p>
* These test suites are combined a bit precariously, so there is work that the developer needs to do to make sure
* that things are wired up correctly. Specifically, a developer must register their RowsAndColumns implementation
* by adding an entry to the static {@link #getMakers()} method on this base class. The developer should *also*
* by adding an entry to the static {@link #makerFeeder()} method on this base class. The developer should *also*
* create a test class for their RowsAndColumns object that extends this class. By creating the test class that
* extends this class, there will be an extra validation done that ensures that the list of makers includes their
* RowsAndColumns class.
* <p>
* The semantic interfaces, on the other hand, should all create a test that extends
* {@link org.apache.druid.query.rowsandcols.semantic.SemanticTestBase}. That test sets up a parameterized test,
* using the results of {@link #getMakers()} to do the parameterization.
* using the results of {@link #makerFeeder()} to do the parameterization.
*/
public abstract class RowsAndColumnsTestBase
{
@ -54,14 +56,22 @@ public abstract class RowsAndColumnsTestBase
private static final AtomicReference<Iterable<Object[]>> MAKERS = new AtomicReference<>();
public static Iterable<Object[]> getMakers()
@Nonnull
private static ArrayList<Object[]> getMakers()
{
return Lists.newArrayList(
new Object[]{MapOfColumnsRowsAndColumns.class, Function.identity()},
new Object[]{ArrayListRowsAndColumns.class, ArrayListRowsAndColumnsTest.MAKER},
new Object[]{ConcatRowsAndColumns.class, ConcatRowsAndColumnsTest.MAKER},
new Object[]{RearrangedRowsAndColumns.class, RearrangedRowsAndColumnsTest.MAKER}
);
}
public static Iterable<Object[]> makerFeeder()
{
Iterable<Object[]> retVal = MAKERS.get();
if (retVal == null) {
retVal = Lists.newArrayList(
new Object[]{MapOfColumnsRowsAndColumns.class, Function.identity()},
new Object[]{ArrayListRowsAndColumns.class, ArrayListRowsAndColumnsTest.MAKER}
);
retVal = getMakers();
for (Object[] objects : retVal) {
Class<?> aClazz = (Class<?>) objects[0];
final String expectedName = aClazz.getName() + "Test";
@ -92,7 +102,7 @@ public abstract class RowsAndColumnsTestBase
public void testInListOfMakers()
{
boolean inList = false;
for (Object[] objs : getMakers()) {
for (Object[] objs : makerFeeder()) {
if (expectedClass.equals(objs[0])) {
inList = true;
break;

View File

@ -20,7 +20,6 @@
package org.apache.druid.query.rowsandcols.semantic;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
@ -32,11 +31,12 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
public class SortedGroupPartitionerTest extends SemanticTestBase
public class ClusteredGroupPartitionerTest extends SemanticTestBase
{
public SortedGroupPartitionerTest(
public ClusteredGroupPartitionerTest(
String name,
Function<MapOfColumnsRowsAndColumns, RowsAndColumns> fn
)
@ -45,7 +45,7 @@ public class SortedGroupPartitionerTest extends SemanticTestBase
}
@Test
public void testDefaultSortedGroupPartitioner()
public void testDefaultClusteredGroupPartitioner()
{
RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(
ImmutableMap.of(
@ -54,9 +54,9 @@ public class SortedGroupPartitionerTest extends SemanticTestBase
)
));
SortedGroupPartitioner parter = rac.as(SortedGroupPartitioner.class);
ClusteredGroupPartitioner parter = rac.as(ClusteredGroupPartitioner.class);
if (parter == null) {
parter = new DefaultSortedGroupPartitioner(rac);
parter = new DefaultClusteredGroupPartitioner(rac);
}
int[] expectedBounds = new int[]{0, 3, 5, 6, 9};
@ -90,14 +90,31 @@ public class SortedGroupPartitionerTest extends SemanticTestBase
}
Assert.assertFalse(partedChunks.hasNext());
boolean exceptionThrown = false;
try {
parter.partitionOnBoundaries(Collections.singletonList("unsorted"));
BiFunction<Integer, Integer, RowsAndColumnsHelper> singleHelperMaker =
(sorted, unsorted) ->
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{sorted})
.expectColumn("unsorted", new int[]{unsorted})
.allColumnsRegistered();
List<RowsAndColumnsHelper> unsortedExcpectations = Arrays.asList(
singleHelperMaker.apply(0, 3),
singleHelperMaker.apply(0, 54),
singleHelperMaker.apply(0, 21),
singleHelperMaker.apply(1, 1),
singleHelperMaker.apply(1, 5),
singleHelperMaker.apply(2, 54),
singleHelperMaker.apply(4, 2),
singleHelperMaker.apply(4, 3),
singleHelperMaker.apply(4, 92)
);
final List<String> unsorted = Collections.singletonList("unsorted");
final Iterator<RowsAndColumns> unsortedChunks = parter.partitionOnBoundaries(unsorted).iterator();
for (RowsAndColumnsHelper expectation : unsortedExcpectations) {
Assert.assertTrue(unsortedChunks.hasNext());
expectation.validate(unsortedChunks.next());
}
catch (ISE ex) {
Assert.assertEquals("Pre-sorted data required, rows[1] and [2] were not in order", ex.getMessage());
exceptionThrown = true;
}
Assert.assertTrue(exceptionThrown);
Assert.assertFalse(unsortedChunks.hasNext());
}
}

View File

@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.semantic;
import org.apache.druid.query.operator.ColumnWithDirection;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.segment.column.ColumnType;
import org.junit.Test;
import java.util.function.Function;
public class NaiveSortMakerTest extends SemanticTestBase
{
public NaiveSortMakerTest(
String name,
Function<MapOfColumnsRowsAndColumns, RowsAndColumns> fn
)
{
super(name, fn);
}
@Test
public void testSortMultipleChunks()
{
final RowsAndColumns first = make(MapOfColumnsRowsAndColumns.of(
"ints", new IntArrayColumn(new int[]{1, 7, 13, 0, 19}),
"strs", new ObjectArrayColumn(new Object[]{"b", "h", "n", "a", "t"}, ColumnType.STRING)
));
NaiveSortMaker maker = first.as(NaiveSortMaker.class);
if (maker == null) {
maker = new DefaultNaiveSortMaker(first);
}
final NaiveSortMaker.NaiveSorter intSorter = maker.make(
ColumnWithDirection.ascending("ints")
);
final NaiveSortMaker.NaiveSorter stringSorter = maker.make(
ColumnWithDirection.ascending("strs")
);
final NaiveSortMaker.NaiveSorter intSorterDesc = maker.make(
ColumnWithDirection.descending("ints")
);
final NaiveSortMaker.NaiveSorter stringSorterDesc = maker.make(
ColumnWithDirection.descending("strs")
);
RowsAndColumns intermediate = make(MapOfColumnsRowsAndColumns.of(
"ints", new IntArrayColumn(new int[]{2, 3, 16, 4, 5}),
"strs", new ObjectArrayColumn(new Object[]{"c", "d", "q", "e", "f"}, ColumnType.STRING)
));
intSorter.moreData(intermediate);
stringSorter.moreData(intermediate);
intSorterDesc.moreData(intermediate);
stringSorterDesc.moreData(intermediate);
intermediate = make(MapOfColumnsRowsAndColumns.of(
"ints", new IntArrayColumn(new int[]{10, 17, 12, 8, 14, 15}),
"strs", new ObjectArrayColumn(new Object[]{"k", "r", "m", "i", "o", "p"}, ColumnType.STRING)
));
intSorter.moreData(intermediate);
stringSorter.moreData(intermediate);
intSorterDesc.moreData(intermediate);
stringSorterDesc.moreData(intermediate);
intermediate = make(MapOfColumnsRowsAndColumns.of(
"ints", new IntArrayColumn(new int[]{6, 18, 11, 14, 9}),
"strs", new ObjectArrayColumn(new Object[]{"g", "s", "l", "o", "j"}, ColumnType.STRING)
));
intSorter.moreData(intermediate);
stringSorter.moreData(intermediate);
intSorterDesc.moreData(intermediate);
stringSorterDesc.moreData(intermediate);
final RowsAndColumnsHelper helper = new RowsAndColumnsHelper()
.expectColumn("ints", new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 14, 15, 16, 17, 18, 19})
.expectColumn(
"strs",
new Object[]{
"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "o", "p", "q", "r", "s", "t"
},
ColumnType.STRING
)
.allColumnsRegistered();
final RowsAndColumns intSorted = intSorter.complete();
helper.validate(intSorted);
final RowsAndColumns strSorted = stringSorter.complete();
helper.validate(strSorted);
final RowsAndColumnsHelper descendingHelper = new RowsAndColumnsHelper()
.expectColumn("ints", new int[]{19, 18, 17, 16, 15, 14, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0})
.expectColumn(
"strs",
new Object[]{
"t", "s", "r", "q", "p", "o", "o", "n", "m", "l", "k", "j", "i", "h", "g", "f", "e", "d", "c", "b", "a"
},
ColumnType.STRING
)
.allColumnsRegistered();
final RowsAndColumns intSortedDesc = intSorterDesc.complete();
descendingHelper.validate(intSortedDesc);
final RowsAndColumns strSortedDesc = stringSorterDesc.complete();
descendingHelper.validate(strSortedDesc);
}
@Test
public void testSortOneChunk()
{
final RowsAndColumns first = make(MapOfColumnsRowsAndColumns.of(
"ints", new IntArrayColumn(new int[]{1, 7, 13, 0, 19}),
"strs", new ObjectArrayColumn(new Object[]{"b", "h", "n", "a", "t"}, ColumnType.STRING)
));
NaiveSortMaker maker = first.as(NaiveSortMaker.class);
if (maker == null) {
maker = new DefaultNaiveSortMaker(first);
}
final NaiveSortMaker.NaiveSorter sorter = maker.make(
ColumnWithDirection.ascending("ints")
);
final NaiveSortMaker.NaiveSorter stringSorter = maker.make(
ColumnWithDirection.ascending("strs")
);
final NaiveSortMaker.NaiveSorter sorterDesc = maker.make(
ColumnWithDirection.descending("ints")
);
final NaiveSortMaker.NaiveSorter stringSorterDesc = maker.make(
ColumnWithDirection.descending("strs")
);
final RowsAndColumnsHelper helper = new RowsAndColumnsHelper()
.expectColumn("ints", new int[]{0, 1, 7, 13, 19})
.expectColumn("strs", new Object[]{"a", "b", "h", "n", "t"}, ColumnType.STRING)
.allColumnsRegistered();
final RowsAndColumns sorted = sorter.complete();
helper.validate(sorted);
final RowsAndColumns stringSorted = stringSorter.complete();
helper.validate(stringSorted);
final RowsAndColumnsHelper descendingHelper = new RowsAndColumnsHelper()
.expectColumn("ints", new int[]{19, 13, 7, 1, 0})
.expectColumn("strs", new Object[]{"t", "n", "h", "b", "a"}, ColumnType.STRING)
.allColumnsRegistered();
final RowsAndColumns sortedDesc = sorterDesc.complete();
descendingHelper.validate(sortedDesc);
final RowsAndColumns stringSortedDesc = stringSorterDesc.complete();
descendingHelper.validate(stringSortedDesc);
}
}

View File

@ -48,7 +48,7 @@ public abstract class SemanticTestBase
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> parameterFeed()
{
return FluentIterable.from(RowsAndColumnsTestBase.getMakers())
return FluentIterable.from(RowsAndColumnsTestBase.makerFeeder())
.transformAndConcat(input -> {
final String name = ((Class<?>) input[0]).getSimpleName();
return Arrays.asList(

View File

@ -32,8 +32,12 @@ import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexWindowBound;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.QueryException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.operator.ColumnWithDirection;
import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
import org.apache.druid.query.operator.NaiveSortOperatorFactory;
import org.apache.druid.query.operator.OperatorFactory;
import org.apache.druid.query.operator.WindowOperatorFactory;
import org.apache.druid.query.operator.window.ComposingProcessor;
@ -59,12 +63,21 @@ import org.apache.druid.sql.calcite.table.RowSignatures;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
/**
* Maps onto a {@link org.apache.druid.query.operator.WindowOperatorQuery}.
* <p>
* Known sharp-edges/limitations:
* 1. The support is not yet fully aware of the difference between RANGE and ROWS when evaluating peers. (Note: The
* built-in functions are all implemented with the correctly defined semantics, so ranking functions that are
* defined to use RANGE do the right thing)
* 2. No support for framing last/first functions
* 3. No nth function
* 4. No finalization, meaning that aggregators like sketches that rely on finalization might return surprising results
* 5. No big big test suite of loveliness
*/
public class Windowing
{
@ -74,17 +87,20 @@ public class Windowing
.put("LEAD", (agg) -> new WindowOffsetProcessor(agg.getColumn(0), agg.getOutputName(), agg.getConstantInt(1)))
.put("FIRST_VALUE", (agg) -> new WindowFirstProcessor(agg.getColumn(0), agg.getOutputName()))
.put("LAST_VALUE", (agg) -> new WindowLastProcessor(agg.getColumn(0), agg.getOutputName()))
.put("CUME_DIST", (agg) -> new WindowCumeDistProcessor(agg.getGroup().getOrderingColumns(), agg.getOutputName()))
.put(
"CUME_DIST",
(agg) -> new WindowCumeDistProcessor(agg.getGroup().getOrderingColumNames(), agg.getOutputName())
)
.put(
"DENSE_RANK",
(agg) -> new WindowDenseRankProcessor(agg.getGroup().getOrderingColumns(), agg.getOutputName())
(agg) -> new WindowDenseRankProcessor(agg.getGroup().getOrderingColumNames(), agg.getOutputName())
)
.put("NTILE", (agg) -> new WindowPercentileProcessor(agg.getOutputName(), agg.getConstantInt(0)))
.put(
"PERCENT_RANK",
(agg) -> new WindowRankProcessor(agg.getGroup().getOrderingColumns(), agg.getOutputName(), true)
(agg) -> new WindowRankProcessor(agg.getGroup().getOrderingColumNames(), agg.getOutputName(), true)
)
.put("RANK", (agg) -> new WindowRankProcessor(agg.getGroup().getOrderingColumns(), agg.getOutputName(), false))
.put("RANK", (agg) -> new WindowRankProcessor(agg.getGroup().getOrderingColumNames(), agg.getOutputName(), false))
.put("ROW_NUMBER", (agg) -> new WindowRowNumberProcessor(agg.getOutputName()))
.build();
private final List<OperatorFactory> ops;
@ -99,93 +115,103 @@ public class Windowing
{
final Window window = Preconditions.checkNotNull(partialQuery.getWindow(), "window");
// Right now, we assume that there is only a single grouping as our code cannot handle re-sorting and
// re-partitioning. As we relax that restriction, we will be able to plan multiple different groupings.
if (window.groups.size() != 1) {
plannerContext.setPlanningError("Multiple windows are not supported");
throw new CannotBuildQueryException(window);
}
final WindowGroup group = new WindowGroup(window, Iterables.getOnlyElement(window.groups), rowSignature);
ArrayList<OperatorFactory> ops = new ArrayList<>();
// Presently, the order by keys are not validated to ensure that the incoming query has pre-sorted the data
// as required by the window query. This should be done. In order to do it, we will need to know what the
// sub-query that we are running against actually looks like in order to then validate that the data will
// come back in the order expected...
// Aggregations.
final String outputNamePrefix = Calcites.findUnusedPrefixForDigits("w", rowSignature.getColumnNames());
final List<AggregateCall> aggregateCalls = group.getAggregateCalls();
final List<Processor> processors = new ArrayList<>();
final List<AggregatorFactory> aggregations = new ArrayList<>();
final List<String> expectedOutputColumns = new ArrayList<>(rowSignature.getColumnNames());
final String outputNamePrefix = Calcites.findUnusedPrefixForDigits("w", rowSignature.getColumnNames());
int outputNameCounter = 0;
for (int i = 0; i < window.groups.size(); ++i) {
final WindowGroup group = new WindowGroup(window, window.groups.get(i), rowSignature);
for (int i = 0; i < aggregateCalls.size(); i++) {
final String aggName = outputNamePrefix + i;
expectedOutputColumns.add(aggName);
final AggregateCall aggCall = aggregateCalls.get(i);
ProcessorMaker maker = KNOWN_WINDOW_FNS.get(aggCall.getAggregation().getName());
if (maker == null) {
final Aggregation aggregation = GroupByRules.translateAggregateCall(
plannerContext,
rowSignature,
null,
rexBuilder,
partialQuery.getSelectProject(),
Collections.emptyList(),
aggName,
aggCall,
false // Windowed aggregations don't currently finalize. This means that sketches won't work as expected.
);
if (aggregation == null
|| aggregation.getPostAggregator() != null
|| aggregation.getAggregatorFactories().size() != 1) {
if (null == plannerContext.getPlanningError()) {
plannerContext.setPlanningError("Aggregation [%s] is not supported", aggCall);
}
throw new CannotBuildQueryException(window, aggCall);
if (i > 0) {
LinkedHashSet<ColumnWithDirection> sortColumns = new LinkedHashSet<>();
for (String partitionColumn : group.getPartitionColumns()) {
sortColumns.add(ColumnWithDirection.ascending(partitionColumn));
}
sortColumns.addAll(group.getOrdering());
aggregations.add(Iterables.getOnlyElement(aggregation.getAggregatorFactories()));
} else {
processors.add(maker.make(
new WindowAggregate(
aggName,
aggCall,
rowSignature,
plannerContext,
partialQuery.getSelectProject(),
window.constants,
group
)
));
ops.add(new NaiveSortOperatorFactory(new ArrayList<>(sortColumns)));
}
}
if (!aggregations.isEmpty()) {
processors.add(
new WindowFramedAggregateProcessor(
group.getWindowFrame(),
aggregations.toArray(new AggregatorFactory[0])
)
);
}
// Presently, the order by keys are not validated to ensure that the incoming query has pre-sorted the data
// as required by the window query. This should be done. In order to do it, we will need to know what the
// sub-query that we are running against actually looks like in order to then validate that the data will
// come back in the order expected. Unfortunately, the way that the queries are re-written to DruidRels
// loses all the context of sub-queries, making it not possible to validate this without changing how the
// various Druid rules work (i.e. a very large blast radius change). For now, it is easy enough to validate
// this when we build the native query, so we validate it there.
if (processors.isEmpty()) {
throw new ISE("No processors from Window[%s], why was this code called?", window);
}
// Aggregations.
final List<AggregateCall> aggregateCalls = group.getAggregateCalls();
final List<OperatorFactory> ops = Arrays.asList(
new NaivePartitioningOperatorFactory(group.getPartitionColumns()),
new WindowOperatorFactory(
processors.size() == 1 ?
processors.get(0) : new ComposingProcessor(processors.toArray(new Processor[0]))
)
);
final List<Processor> processors = new ArrayList<>();
final List<AggregatorFactory> aggregations = new ArrayList<>();
for (AggregateCall aggregateCall : aggregateCalls) {
final String aggName = outputNamePrefix + outputNameCounter++;
expectedOutputColumns.add(aggName);
ProcessorMaker maker = KNOWN_WINDOW_FNS.get(aggregateCall.getAggregation().getName());
if (maker == null) {
final Aggregation aggregation = GroupByRules.translateAggregateCall(
plannerContext,
rowSignature,
null,
rexBuilder,
partialQuery.getSelectProject(),
Collections.emptyList(),
aggName,
aggregateCall,
false // Windowed aggregations don't currently finalize. This means that sketches won't work as expected.
);
if (aggregation == null
|| aggregation.getPostAggregator() != null
|| aggregation.getAggregatorFactories().size() != 1) {
if (null == plannerContext.getPlanningError()) {
plannerContext.setPlanningError("Aggregation [%s] is not supported", aggregateCall);
}
throw new CannotBuildQueryException(window, aggregateCall);
}
aggregations.add(Iterables.getOnlyElement(aggregation.getAggregatorFactories()));
} else {
processors.add(maker.make(
new WindowAggregate(
aggName,
aggregateCall,
rowSignature,
plannerContext,
partialQuery.getSelectProject(),
window.constants,
group
)
));
}
}
if (!aggregations.isEmpty()) {
processors.add(
new WindowFramedAggregateProcessor(
group.getWindowFrame(),
aggregations.toArray(new AggregatorFactory[0])
)
);
}
if (processors.isEmpty()) {
throw new ISE("No processors from Window[%s], why was this code called?", window);
}
// The ordering required for partitioning is actually not important for the semantics. However, it *is*
// important that it be consistent across the query. Because if the incoming data is sorted descending
// and we try to partition on an ascending sort, we will think the data is not sorted correctly
ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns()));
ops.add(new WindowOperatorFactory(
processors.size() == 1 ?
processors.get(0) : new ComposingProcessor(processors.toArray(new Processor[0]))
));
}
return new Windowing(
RowSignatures.fromRelDataType(expectedOutputColumns, window.getRowType()),
@ -241,12 +267,38 @@ public class Windowing
return retVal;
}
public ArrayList<String> getOrderingColumns()
public ArrayList<ColumnWithDirection> getOrdering()
{
final List<RelFieldCollation> fields = group.orderKeys.getFieldCollations();
ArrayList<String> retVal = new ArrayList<>(fields.size());
ArrayList<ColumnWithDirection> retVal = new ArrayList<>(fields.size());
for (RelFieldCollation field : fields) {
retVal.add(sig.getColumnName(field.getFieldIndex()));
final ColumnWithDirection.Direction direction;
switch (field.direction) {
case ASCENDING:
direction = ColumnWithDirection.Direction.ASC;
break;
case DESCENDING:
direction = ColumnWithDirection.Direction.DESC;
break;
default:
throw new QueryException(
QueryException.SQL_QUERY_UNSUPPORTED_ERROR_CODE,
StringUtils.format("Cannot handle ordering with direction[%s]", field.direction),
null,
null
);
}
retVal.add(new ColumnWithDirection(sig.getColumnName(field.getFieldIndex()), direction));
}
return retVal;
}
public ArrayList<String> getOrderingColumNames()
{
final ArrayList<ColumnWithDirection> ordering = getOrdering();
final ArrayList<String> retVal = new ArrayList<>(ordering.size());
for (ColumnWithDirection column : ordering) {
retVal.add(column.getColumn());
}
return retVal;
}

View File

@ -891,6 +891,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
results.get(i)
);
}
Assert.assertEquals(expectedResults.size(), results.size());
}
public void testQueryThrows(final String sql, Consumer<ExpectedException> expectedExceptionInitializer)

View File

@ -28,7 +28,6 @@ import org.apache.druid.common.config.NullHandling;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.query.Query;
import org.apache.druid.query.operator.OperatorFactory;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.segment.column.ColumnType;
@ -54,8 +53,8 @@ import java.util.function.Function;
public class CalciteWindowQueryTest extends BaseCalciteQueryTest
{
public static final boolean DUMP_EXPECTED_RESULTS = Boolean.parseBoolean(
System.getProperty("druid.tests.sql.dumpExpectedResults")
public static final boolean DUMP_ACTUAL_RESULTS = Boolean.parseBoolean(
System.getProperty("druid.tests.sql.dumpActualResults")
);
static {
@ -116,6 +115,10 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
}
};
if ("failingTest".equals(input.type)) {
return;
}
if ("operatorValidation".equals(input.type)) {
testBuilder()
.skipVectorize(true)
@ -154,9 +157,9 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
Assert.assertEquals(types[i], results.signature.getColumnType(i).get());
}
maybeDumpExpectedResults(jacksonToString, results.results);
maybeDumpActualResults(jacksonToString, results.results);
for (Object[] result : input.expectedResults) {
for (int i = 0; i < types.length; i++) {
for (int i = 0; i < result.length; i++) {
// Jackson deserializes numbers as the minimum size required to store the value. This means that
// Longs can become Integer objects and then they fail equality checks. We read the expected
// results using Jackson, so, we coerce the expected results to the type expected.
@ -187,11 +190,11 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
}
}
private void maybeDumpExpectedResults(
private void maybeDumpActualResults(
Function<Object, String> toStrFn, List<Object[]> results
)
{
if (DUMP_EXPECTED_RESULTS) {
if (DUMP_ACTUAL_RESULTS) {
for (Object[] result : results) {
System.out.println(" - " + toStrFn.apply(result));
}
@ -206,9 +209,6 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
@JsonProperty
public String sql;
@JsonProperty
public Query nativeQuery;
@JsonProperty
public List<OperatorFactory> expectedOperators;

View File

@ -0,0 +1,25 @@
type: "failingTest"
sql: |
SELECT
countryIsoCode,
CAST (FLOOR(__time TO HOUR) AS BIGINT) t,
SUM(delta) delta,
SUM(SUM(delta)) OVER (PARTITION BY countryIsoCode ORDER BY CAST (FLOOR(__time TO HOUR) AS BIGINT) DESC RANGE BETWEEN 3 PRECEDING AND 2 FOLLOWING) windowedDelta,
ROW_NUMBER() OVER (PARTITION BY CAST (FLOOR(__time TO HOUR) AS BIGINT) ORDER BY SUM(delta) DESC) AS hourlyRank
FROM wikipedia
GROUP BY 1, 2
ORDER BY 1 DESC, 2 DESC
expectedOperators:
- { type: "naivePartition", partitionColumns: [ "d0" ] }
- type: "window"
processor:
type: "framedAgg"
frame: { peerType: "ROWS", lowUnbounded: false, lowOffset: 3, uppUnbounded: false, uppOffset: 2 }
aggregations:
- { type: "longSum", name: "w0", fieldName: "a0" }
- { type: "naiveSort", columns: [ { column: "d1", direction: "DESC" }, { column: "a0", direction: "DESC"} ]}
- { type: "naivePartition", partitionColumns: [ "d1" ] }
- type: "window"
processor: { type: "rowNumber", outputColumn: "w1"}

View File

@ -1,13 +1,13 @@
type: "operatorValidation"
sql: "
sql: |
SELECT
countryIsoCode,
FLOOR(__time TO HOUR) t,
CAST (FLOOR(__time TO HOUR) AS BIGINT) t,
SUM(delta) delta,
SUM(SUM(delta)) OVER (PARTITION BY countryIsoCode ORDER BY SUM(delta) RANGE BETWEEN 3 PRECEDING AND 2 FOLLOWING) windowedDelta
SUM(SUM(delta)) OVER (PARTITION BY countryIsoCode ORDER BY CAST (FLOOR(__time TO HOUR) AS BIGINT) RANGE BETWEEN 3 PRECEDING AND 2 FOLLOWING) windowedDelta
FROM wikipedia
GROUP BY 1, 2"
GROUP BY 1, 2
expectedOperators:
- { type: "naivePartition", partitionColumns: [ "d0" ] }