Support Framing for Window Aggregations (#13514)

* Support Framing for Window Aggregations

This adds support for framing  over ROWS
for window aggregations.

Still not implemented as yet:
1. RANGE frames
2. Multiple different frames in the same query
3. Frames on last/first functions
This commit is contained in:
imply-cheddar 2022-12-15 11:04:39 +09:00 committed by GitHub
parent 2729e25295
commit 089d8da561
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
69 changed files with 5352 additions and 2420 deletions

View File

@ -109,6 +109,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import javax.annotation.Nullable;
import java.io.File;
@ -201,6 +202,9 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public final TestName testName = new TestName();
/**
* Transient task failure rate emulated by the taskKiller in {@link SimpleThreadingTaskRunner}.
* Per {@link SubTaskSpec}, there could be at most one task failure.
@ -239,7 +243,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
public void setUpAbstractParallelIndexSupervisorTaskTest() throws IOException
{
localDeepStorage = temporaryFolder.newFolder("localStorage");
taskRunner = new SimpleThreadingTaskRunner();
taskRunner = new SimpleThreadingTaskRunner(testName.getMethodName());
objectMapper = getObjectMapper();
indexingServiceClient = new LocalOverlordClient(objectMapper, taskRunner);
intermediaryDataManager = new LocalIntermediaryDataManager(
@ -358,15 +362,14 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
public class SimpleThreadingTaskRunner
{
private final ConcurrentMap<String, TaskContainer> tasks = new ConcurrentHashMap<>();
private final ListeningExecutorService service = MoreExecutors.listeningDecorator(
Execs.multiThreaded(5, "simple-threading-task-runner-%d")
);
private final ListeningExecutorService service;
private final ScheduledExecutorService taskKiller = Execs.scheduledSingleThreaded("simple-threading-task-killer");
private final Set<String> killedSubtaskSpecs = new HashSet<>();
SimpleThreadingTaskRunner()
SimpleThreadingTaskRunner(String threadNameBase)
{
service = MoreExecutors.listeningDecorator(Execs.multiThreaded(5, threadNameBase + "-%d"));
taskKiller.scheduleAtFixedRate(
() -> {
for (TaskContainer taskContainer : tasks.values()) {

View File

@ -73,12 +73,12 @@ public interface DataSource
/**
* Returns true if all servers have a full copy of this datasource. True for things like inline, lookup, etc, or
* for queries of those.
*
* <p>
* Currently this is coupled with joinability - if this returns true then the query engine expects there exists a
* {@link org.apache.druid.segment.join.JoinableFactory} which might build a
* {@link org.apache.druid.segment.join.Joinable} for this datasource directly. If a subquery 'inline' join is
* required to join this datasource on the right hand side, then this value must be false for now.
*
* <p>
* In the future, instead of directly using this method, the query planner and engine should consider
* {@link org.apache.druid.segment.join.JoinableFactory#isDirectlyJoinable(DataSource)} when determining if the
* right hand side is directly joinable, which would allow decoupling this property from joins.
@ -97,7 +97,7 @@ public interface DataSource
/**
* Returns a segment function on to how to segment should be modified.
*
* @param query the input query
* @param query the input query
* @param cpuTimeAcc the cpu time accumulator
* @return the segment function
*/

View File

@ -81,9 +81,9 @@ public class LimitedRowsAndColumns implements RowsAndColumns
}
@Override
protected int getActualCell(int cell)
protected int getActualValue(int rowNum)
{
int retVal = start + cell;
int retVal = start + rowNum;
if (retVal >= end) {
throw new ISE("Index out of bounds[%d] >= [%d], start[%s]", retVal, end, start);
}
@ -91,7 +91,7 @@ public class LimitedRowsAndColumns implements RowsAndColumns
}
@Override
protected boolean outsideBounds(int cell)
protected boolean outsideBounds(int rowNum)
{
return false;
}

View File

@ -20,9 +20,9 @@
package org.apache.druid.query.operator;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.rowsandcols.DefaultSortedGroupPartitioner;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.SortedGroupPartitioner;
import org.apache.druid.query.rowsandcols.semantic.DefaultSortedGroupPartitioner;
import org.apache.druid.query.rowsandcols.semantic.SortedGroupPartitioner;
import java.util.Iterator;
import java.util.List;

View File

@ -56,6 +56,7 @@ import org.apache.druid.query.rowsandcols.RowsAndColumns;
@JsonSubTypes.Type(name = "last", value = WindowLastProcessor.class),
@JsonSubTypes.Type(name = "offset", value = WindowOffsetProcessor.class),
@JsonSubTypes.Type(name = "aggregate", value = WindowAggregateProcessor.class),
@JsonSubTypes.Type(name = "framedAgg", value = WindowFramedAggregateProcessor.class)
})
public interface Processor
{

View File

@ -22,13 +22,13 @@ package org.apache.druid.query.operator.window;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.rowsandcols.AppendableRowsAndColumns;
import org.apache.druid.query.rowsandcols.DefaultOnHeapAggregatable;
import org.apache.druid.query.rowsandcols.OnHeapAggregatable;
import org.apache.druid.query.rowsandcols.OnHeapCumulativeAggregatable;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.ConstantObjectColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.DefaultOnHeapAggregatable;
import org.apache.druid.query.rowsandcols.semantic.OnHeapAggregatable;
import javax.annotation.Nullable;
import java.util.ArrayList;

View File

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.operator.window;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
public class WindowFrame
{
@SuppressWarnings("unused")
public enum PeerType
{
ROWS,
RANGE
}
// Will likely need to add the order by columns to also be able to deal with RANGE peer type.
private final PeerType peerType;
private final boolean lowerUnbounded;
private final int lowerOffset;
private final boolean upperUnbounded;
private final int upperOffset;
@JsonCreator
public WindowFrame(
@JsonProperty("peerType") PeerType peerType,
@JsonProperty("lowUnbounded") boolean lowerUnbounded,
@JsonProperty("lowOffset") int lowerOffset,
@JsonProperty("uppUnbounded") boolean upperUnbounded,
@JsonProperty("uppOffset") int upperOffset
)
{
this.peerType = peerType;
this.lowerUnbounded = lowerUnbounded;
this.lowerOffset = lowerOffset;
this.upperUnbounded = upperUnbounded;
this.upperOffset = upperOffset;
}
@JsonProperty("peerType")
public PeerType getPeerType()
{
return peerType;
}
@JsonProperty("lowUnbounded")
public boolean isLowerUnbounded()
{
return lowerUnbounded;
}
@JsonProperty("lowOffset")
public int getLowerOffset()
{
return lowerOffset;
}
@JsonProperty("uppUnbounded")
public boolean isUpperUnbounded()
{
return upperUnbounded;
}
@JsonProperty("uppOffset")
public int getUpperOffset()
{
return upperOffset;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof WindowFrame)) {
return false;
}
WindowFrame that = (WindowFrame) o;
return lowerUnbounded == that.lowerUnbounded
&& lowerOffset == that.lowerOffset
&& upperUnbounded == that.upperUnbounded
&& upperOffset == that.upperOffset
&& peerType == that.peerType;
}
@Override
public int hashCode()
{
return Objects.hash(peerType, lowerUnbounded, lowerOffset, upperUnbounded, upperOffset);
}
@Override
public String toString()
{
return "WindowFrame{" +
"peerType=" + peerType +
", lowerUnbounded=" + lowerUnbounded +
", lowerOffset=" + lowerOffset +
", upperUnbounded=" + upperUnbounded +
", upperOffset=" + upperOffset +
'}';
}
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.operator.window;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.DefaultFramedOnHeapAggregatable;
import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable;
import javax.annotation.Nullable;
import java.util.Arrays;
public class WindowFramedAggregateProcessor implements Processor
{
@Nullable
private static <T> T[] emptyToNull(T[] arr)
{
if (arr == null || arr.length == 0) {
return null;
} else {
return arr;
}
}
private final WindowFrame frame;
private final AggregatorFactory[] aggregations;
@JsonCreator
public WindowFramedAggregateProcessor(
@JsonProperty("frame") WindowFrame frame,
@JsonProperty("aggregations") AggregatorFactory[] aggregations
)
{
this.frame = frame;
this.aggregations = emptyToNull(aggregations);
}
@JsonProperty("frame")
public WindowFrame getFrame()
{
return frame;
}
@JsonProperty("aggregations")
public AggregatorFactory[] getAggregations()
{
return aggregations;
}
@Override
public RowsAndColumns process(RowsAndColumns inputPartition)
{
FramedOnHeapAggregatable agger = inputPartition.as(FramedOnHeapAggregatable.class);
if (agger == null) {
agger = new DefaultFramedOnHeapAggregatable(RowsAndColumns.expectAppendable(inputPartition));
}
return agger.aggregateAll(frame, aggregations);
}
@Override
public boolean validateEquivalent(Processor otherProcessor)
{
if (otherProcessor instanceof WindowFramedAggregateProcessor) {
WindowFramedAggregateProcessor other = (WindowFramedAggregateProcessor) otherProcessor;
return frame.equals(other.frame) && Arrays.equals(aggregations, other.aggregations);
}
return false;
}
@Override
public String toString()
{
return "WindowFramedAggregateProcessor{" +
"frame=" + frame +
", aggregations=" + Arrays.toString(aggregations) +
'}';
}
}

View File

@ -23,9 +23,9 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.query.operator.window.Processor;
import org.apache.druid.query.rowsandcols.AppendableRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import java.util.Arrays;

View File

@ -21,11 +21,11 @@ package org.apache.druid.query.operator.window.ranking;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.query.operator.window.Processor;
import org.apache.druid.query.rowsandcols.AppendableRowsAndColumns;
import org.apache.druid.query.rowsandcols.DefaultSortedGroupPartitioner;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.SortedGroupPartitioner;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.DefaultSortedGroupPartitioner;
import org.apache.druid.query.rowsandcols.semantic.SortedGroupPartitioner;
import java.util.List;
import java.util.function.Function;

View File

@ -22,10 +22,10 @@ package org.apache.druid.query.operator.window.ranking;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.query.operator.window.Processor;
import org.apache.druid.query.rowsandcols.AppendableRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.column.ColumnAccessorBasedColumn;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import org.apache.druid.segment.column.ColumnType;
public class WindowRowNumberProcessor implements Processor
@ -105,7 +105,7 @@ public class WindowRowNumberProcessor implements Processor
}
@Override
public int compareCells(int lhsRowNum, int rhsRowNum)
public int compareRows(int lhsRowNum, int rhsRowNum)
{
return Integer.compare(lhsRowNum, rhsRowNum);
}

View File

@ -46,7 +46,7 @@ public abstract class ShiftedColumnAccessorBase implements ColumnAccessor
@Override
public boolean isNull(int rowNum)
{
final int actualCell = getActualCell(rowNum);
final int actualCell = getActualValue(rowNum);
if (outsideBounds(actualCell)) {
return true;
}
@ -56,7 +56,7 @@ public abstract class ShiftedColumnAccessorBase implements ColumnAccessor
@Override
public Object getObject(int rowNum)
{
final int actualCell = getActualCell(rowNum);
final int actualCell = getActualValue(rowNum);
if (outsideBounds(actualCell)) {
return null;
}
@ -66,7 +66,7 @@ public abstract class ShiftedColumnAccessorBase implements ColumnAccessor
@Override
public double getDouble(int rowNum)
{
final int actualCell = getActualCell(rowNum);
final int actualCell = getActualValue(rowNum);
if (outsideBounds(actualCell)) {
return 0.0D;
}
@ -76,7 +76,7 @@ public abstract class ShiftedColumnAccessorBase implements ColumnAccessor
@Override
public float getFloat(int rowNum)
{
final int actualCell = getActualCell(rowNum);
final int actualCell = getActualValue(rowNum);
if (outsideBounds(actualCell)) {
return 0.0F;
}
@ -86,7 +86,7 @@ public abstract class ShiftedColumnAccessorBase implements ColumnAccessor
@Override
public long getLong(int rowNum)
{
final int actualCell = getActualCell(rowNum);
final int actualCell = getActualValue(rowNum);
if (outsideBounds(actualCell)) {
return 0L;
}
@ -96,7 +96,7 @@ public abstract class ShiftedColumnAccessorBase implements ColumnAccessor
@Override
public int getInt(int rowNum)
{
final int actualCell = getActualCell(rowNum);
final int actualCell = getActualValue(rowNum);
if (outsideBounds(actualCell)) {
return 0;
}
@ -104,10 +104,10 @@ public abstract class ShiftedColumnAccessorBase implements ColumnAccessor
}
@Override
public int compareCells(int lhsRowNum, int rhsRowNum)
public int compareRows(int lhsRowNum, int rhsRowNum)
{
int actualLhsCell = getActualCell(lhsRowNum);
int actualRhsCell = getActualCell(rhsRowNum);
int actualLhsCell = getActualValue(lhsRowNum);
int actualRhsCell = getActualValue(rhsRowNum);
if (outsideBounds(actualLhsCell)) {
if (outsideBounds(actualRhsCell)) {
// Both are null
@ -119,12 +119,12 @@ public abstract class ShiftedColumnAccessorBase implements ColumnAccessor
if (outsideBounds(actualRhsCell)) {
return accessor.isNull(actualLhsCell) ? 0 : 1;
} else {
return accessor.compareCells(actualLhsCell, actualRhsCell);
return accessor.compareRows(actualLhsCell, actualRhsCell);
}
}
}
protected abstract int getActualCell(int cell);
protected abstract int getActualValue(int rowNum);
protected abstract boolean outsideBounds(int cell);
protected abstract boolean outsideBounds(int rowNum);
}

View File

@ -55,15 +55,15 @@ public class WindowOffsetProcessor extends WindowValueProcessorBase
new ShiftedColumnAccessorBase(column.toAccessor())
{
@Override
protected int getActualCell(int cell)
protected int getActualValue(int rowNum)
{
return cell + offset;
return rowNum + offset;
}
@Override
protected boolean outsideBounds(int actualCell)
protected boolean outsideBounds(int rowNum)
{
return actualCell < 0 || actualCell >= numRows;
return rowNum < 0 || rowNum >= numRows;
}
}));
}

View File

@ -22,9 +22,9 @@ package org.apache.druid.query.operator.window.value;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.operator.window.Processor;
import org.apache.druid.query.rowsandcols.AppendableRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import java.util.function.Function;

View File

@ -17,12 +17,11 @@
* under the License.
*/
package org.apache.druid.query.rowsandcols.frame;
package org.apache.druid.query.rowsandcols;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.rowsandcols.AppendableRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import java.util.Collection;
import java.util.LinkedHashMap;

View File

@ -83,9 +83,9 @@ public class ArrayListRowsAndColumns<RowType> implements RowsAndColumns
return new ObjectColumnAccessorBase()
{
@Override
protected Object getVal(int cell)
protected Object getVal(int rowNum)
{
return adapterForValue.apply(rows.get(cell));
return adapterForValue.apply(rows.get(rowNum));
}
@Override

View File

@ -17,13 +17,12 @@
* under the License.
*/
package org.apache.druid.query.rowsandcols.frame;
package org.apache.druid.query.rowsandcols;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.rowsandcols.AppendableRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import java.util.Iterator;
import java.util.Map;

View File

@ -20,7 +20,8 @@
package org.apache.druid.query.rowsandcols;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.frame.AppendableMapOfColumns;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.OnHeapAggregatable;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

View File

@ -100,5 +100,5 @@ public interface ColumnAccessor
* @param rhsRowNum the cell id of the right-hand-side of the comparison
* @return the result of the comparison of the two cells
*/
int compareCells(int lhsRowNum, int rhsRowNum);
int compareRows(int lhsRowNum, int rhsRowNum);
}

View File

@ -88,7 +88,7 @@ public class ConstantObjectColumn implements Column
}
@Override
public int compareCells(int lhsRowNum, int rhsRowNum)
public int compareRows(int lhsRowNum, int rhsRowNum)
{
return 0;
}

View File

@ -86,7 +86,7 @@ public class DoubleArrayColumn implements Column
}
@Override
public int compareCells(int lhsRowNum, int rhsRowNum)
public int compareRows(int lhsRowNum, int rhsRowNum)
{
return Double.compare(lhsRowNum, rhsRowNum);
}

View File

@ -86,7 +86,7 @@ public class IntArrayColumn implements Column
}
@Override
public int compareCells(int lhsRowNum, int rhsRowNum)
public int compareRows(int lhsRowNum, int rhsRowNum)
{
return Integer.compare(vals[lhsRowNum], vals[rhsRowNum]);
}

View File

@ -89,7 +89,7 @@ public class NullColumnAccessor implements ColumnAccessor
}
@Override
public int compareCells(int lhsRowNum, int rhsRowNum)
public int compareRows(int lhsRowNum, int rhsRowNum)
{
return 0;
}

View File

@ -47,9 +47,9 @@ public class ObjectArrayColumn implements Column
return new ObjectColumnAccessorBase()
{
@Override
protected Object getVal(int cell)
protected Object getVal(int rowNum)
{
return objects[cell];
return objects[rowNum];
}
@Override

View File

@ -110,12 +110,12 @@ public abstract class ObjectColumnAccessorBase implements ColumnAccessor
}
@Override
public int compareCells(int lhsRowNum, int rhsRowNum)
public int compareRows(int lhsRowNum, int rhsRowNum)
{
return getComparator().compare(getVal(lhsRowNum), getVal(rhsRowNum));
}
protected abstract Object getVal(int cell);
protected abstract Object getVal(int rowNum);
protected abstract Comparator<Object> getComparator();
}

View File

@ -17,8 +17,9 @@
* under the License.
*/
package org.apache.druid.query.rowsandcols;
package org.apache.druid.query.rowsandcols.semantic;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
/**

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.semantic;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.segment.ColumnSelectorFactory;
import java.util.concurrent.atomic.AtomicInteger;
public interface ColumnSelectorFactoryMaker
{
static ColumnSelectorFactoryMaker fromRAC(RowsAndColumns rac)
{
ColumnSelectorFactoryMaker retVal = rac.as(ColumnSelectorFactoryMaker.class);
if (retVal == null) {
retVal = new DefaultColumnSelectorFactoryMaker(rac);
}
return retVal;
}
ColumnSelectorFactory make(AtomicInteger rowIdProvider);
}

View File

@ -17,13 +17,12 @@
* under the License.
*/
package org.apache.druid.query.rowsandcols;
package org.apache.druid.query.rowsandcols.semantic;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.segment.BaseSingleValueDimensionSelector;
@ -38,7 +37,6 @@ import org.apache.druid.segment.serde.ComplexMetrics;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -46,81 +44,35 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
public class DefaultOnHeapAggregatable implements OnHeapAggregatable, OnHeapCumulativeAggregatable
public class DefaultColumnSelectorFactoryMaker implements ColumnSelectorFactoryMaker
{
private final RowsAndColumns rac;
public DefaultOnHeapAggregatable(
RowsAndColumns rac
)
public DefaultColumnSelectorFactoryMaker(RowsAndColumns rac)
{
this.rac = rac;
}
@Override
public ArrayList<Object> aggregateAll(
List<AggregatorFactory> aggFactories
)
public ColumnSelectorFactory make(AtomicInteger rowIdProvider)
{
Aggregator[] aggs = new Aggregator[aggFactories.size()];
AtomicInteger currRow = new AtomicInteger(0);
int index = 0;
for (AggregatorFactory aggFactory : aggFactories) {
aggs[index++] = aggFactory.factorize(new ColumnAccessorBasedColumnSelectorFactory(currRow));
}
int numRows = rac.numRows();
int rowId = currRow.get();
while (rowId < numRows) {
for (Aggregator agg : aggs) {
agg.aggregate();
}
rowId = currRow.incrementAndGet();
}
ArrayList<Object> retVal = new ArrayList<>(aggs.length);
for (Aggregator agg : aggs) {
retVal.add(agg.get());
}
return retVal;
return new ColumnAccessorBasedColumnSelectorFactory(rowIdProvider, rac);
}
@Override
public ArrayList<Object[]> aggregateCumulative(List<AggregatorFactory> aggFactories)
{
Aggregator[] aggs = new Aggregator[aggFactories.size()];
ArrayList<Object[]> retVal = new ArrayList<>(aggFactories.size());
int numRows = rac.numRows();
AtomicInteger currRow = new AtomicInteger(0);
int index = 0;
for (AggregatorFactory aggFactory : aggFactories) {
aggs[index++] = aggFactory.factorize(new ColumnAccessorBasedColumnSelectorFactory(currRow));
retVal.add(new Object[numRows]);
}
int rowId = currRow.get();
while (rowId < numRows) {
for (int i = 0; i < aggs.length; ++i) {
aggs[i].aggregate();
retVal.get(i)[rowId] = aggs[i].get();
}
rowId = currRow.incrementAndGet();
}
return retVal;
}
private class ColumnAccessorBasedColumnSelectorFactory implements ColumnSelectorFactory
public static class ColumnAccessorBasedColumnSelectorFactory implements ColumnSelectorFactory
{
private final Map<String, ColumnAccessor> accessorCache = new HashMap<>();
private final AtomicInteger cellIdSupplier;
private final RowsAndColumns rac;
public ColumnAccessorBasedColumnSelectorFactory(AtomicInteger cellIdSupplier)
public ColumnAccessorBasedColumnSelectorFactory(
AtomicInteger cellIdSupplier,
RowsAndColumns rac
)
{
this.cellIdSupplier = cellIdSupplier;
this.rac = rac;
}
@Override

View File

@ -0,0 +1,581 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.semantic;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.query.operator.window.WindowFrame;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.ConstantObjectColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.ObjectColumnSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
{
private final AppendableRowsAndColumns rac;
public DefaultFramedOnHeapAggregatable(
AppendableRowsAndColumns rac
)
{
this.rac = rac;
}
@Override
public RowsAndColumns aggregateAll(
WindowFrame frame,
AggregatorFactory[] aggFactories
)
{
if (frame.isLowerUnbounded() && frame.isUpperUnbounded()) {
return computeUnboundedAggregates(aggFactories);
}
if (frame.getPeerType() == WindowFrame.PeerType.ROWS) {
if (frame.isLowerUnbounded()) {
return computeCumulativeAggregates(aggFactories, frame.getUpperOffset());
} else if (frame.isUpperUnbounded()) {
return computeReverseCumulativeAggregates(aggFactories, frame.getLowerOffset());
} else {
final int numRows = rac.numRows();
int lowerOffset = frame.getLowerOffset();
int upperOffset = frame.getUpperOffset();
if (numRows < lowerOffset + upperOffset + 1) {
// In this case, there are not enough rows to completely build up the full window aperture before it needs to
// also start contracting the aperture because of the upper offset. So we use a method that specifically
// handles checks for both expanding and reducing the aperture on every iteration.
return aggregateWindowApertureInFlux(aggFactories, lowerOffset, upperOffset);
} else {
// In this case, there are 3 distinct phases that allow us to loop with less
// branches, so we have a method that specifically does that.
return aggregateWindowApertureWellBehaved(aggFactories, lowerOffset, upperOffset);
}
}
} else {
throw new UOE("RANGE peer groupings are unsupported");
}
}
private AppendableRowsAndColumns computeUnboundedAggregates(AggregatorFactory[] aggFactories)
{
Aggregator[] aggs = new Aggregator[aggFactories.length];
AtomicInteger currRow = new AtomicInteger(0);
final ColumnSelectorFactory columnSelectorFactory = ColumnSelectorFactoryMaker.fromRAC(rac).make(currRow);
for (int i = 0; i < aggFactories.length; i++) {
aggs[i] = aggFactories[i].factorize(columnSelectorFactory);
}
int numRows = rac.numRows();
int rowId = currRow.get();
while (rowId < numRows) {
for (Aggregator agg : aggs) {
agg.aggregate();
}
rowId = currRow.incrementAndGet();
}
for (int i = 0; i < aggFactories.length; ++i) {
rac.addColumn(
aggFactories[i].getName(),
new ConstantObjectColumn(aggs[i].get(), numRows, aggFactories[i].getIntermediateType())
);
aggs[i].close();
}
return rac;
}
private AppendableRowsAndColumns computeCumulativeAggregates(AggregatorFactory[] aggFactories, int upperOffset)
{
int numRows = rac.numRows();
if (upperOffset > numRows) {
return computeUnboundedAggregates(aggFactories);
}
// We store the results in an Object array for convenience. This is definitely sub-par from a memory management
// point of view as we should use native arrays when possible. This will be fine for now, but it probably makes
// sense to look at optimizing this in the future. That said, such an optimization might best come by having
// a specialized implementation of this interface against, say, a Frame object that can deal with arrays instead
// of trying to optimize this generic implementation.
Object[][] results = new Object[aggFactories.length][numRows];
int resultStorageIndex = 0;
AtomicInteger rowIdProvider = new AtomicInteger(0);
final ColumnSelectorFactory columnSelectorFactory = ColumnSelectorFactoryMaker.fromRAC(rac).make(rowIdProvider);
AggregatorFactory[] combiningFactories = new AggregatorFactory[aggFactories.length];
Aggregator[] aggs = new Aggregator[aggFactories.length];
for (int i = 0; i < aggFactories.length; i++) {
combiningFactories[i] = aggFactories[i].getCombiningFactory();
aggs[i] = aggFactories[i].factorize(columnSelectorFactory);
}
// If there is an upper offset, we accumulate those aggregations before starting to generate results
for (int i = 0; i < upperOffset; ++i) {
for (Aggregator agg : aggs) {
agg.aggregate();
}
rowIdProvider.incrementAndGet();
}
// Prime the results
if (rowIdProvider.get() < numRows) {
for (int i = 0; i < aggs.length; i++) {
aggs[i].aggregate();
results[i][resultStorageIndex] = aggs[i].get();
aggs[i].close();
aggs[i] = aggFactories[i].factorize(columnSelectorFactory);
}
++resultStorageIndex;
rowIdProvider.incrementAndGet();
}
// From here out, we want to aggregate, peel off a row of results and then accumulate the aggregation
for (int rowId = rowIdProvider.get(); rowId < numRows; ++rowId) {
for (int i = 0; i < aggs.length; i++) {
aggs[i].aggregate();
results[i][resultStorageIndex] = aggs[i].get();
aggs[i].close();
// Use a combining aggregator to combine the result we just got with the result from the previous row
// This is a lot of hoops to jump through just to combine two values, but AggregatorFactory.combine
// allows for mutation of either of the arguments passed in, so it cannot be meaningfully used in this
// context. Instead, we have to jump through these hoops to make sure that we are generating a new object.
// It would've been nice if the AggregatorFactory interface had methods that were more usable for this,
// but it doesn't so :shrug:
final CumulativeColumnSelectorFactory combiningFactory = new CumulativeColumnSelectorFactory(
aggFactories[i],
results[i],
resultStorageIndex - 1
);
final Aggregator combiningAgg = combiningFactories[i].factorize(combiningFactory);
combiningAgg.aggregate();
combiningFactory.increment();
combiningAgg.aggregate();
results[i][resultStorageIndex] = combiningAgg.get();
combiningAgg.close();
aggs[i] = aggFactories[i].factorize(columnSelectorFactory);
}
++resultStorageIndex;
rowIdProvider.incrementAndGet();
}
// If we haven't filled up all of the results yet, there are no more rows, so just point the rest of the results
// at the last result that we generated
for (Object[] resultArr : results) {
Arrays.fill(resultArr, resultStorageIndex, resultArr.length, resultArr[resultStorageIndex - 1]);
}
return makeReturnRAC(aggFactories, results);
}
private AppendableRowsAndColumns computeReverseCumulativeAggregates(AggregatorFactory[] aggFactories, int lowerOffset)
{
int numRows = rac.numRows();
if (lowerOffset > numRows) {
return computeUnboundedAggregates(aggFactories);
}
// We store the results in an Object array for convenience. This is definitely sub-par from a memory management
// point of view as we should use native arrays when possible. This will be fine for now, but it probably makes
// sense to look at optimizing this in the future. That said, such an optimization might best come by having
// a specialized implementation of this interface against, say, a Frame object that can deal with arrays instead
// of trying to optimize this generic implementation.
Object[][] results = new Object[aggFactories.length][numRows];
int resultStorageIndex = numRows - 1;
AtomicInteger rowIdProvider = new AtomicInteger(numRows - 1);
final ColumnSelectorFactory columnSelectorFactory = ColumnSelectorFactoryMaker.fromRAC(rac).make(rowIdProvider);
AggregatorFactory[] combiningFactories = new AggregatorFactory[aggFactories.length];
Aggregator[] aggs = new Aggregator[aggFactories.length];
for (int i = 0; i < aggFactories.length; i++) {
combiningFactories[i] = aggFactories[i].getCombiningFactory();
aggs[i] = aggFactories[i].factorize(columnSelectorFactory);
}
// If there is a lower offset, we accumulate those aggregations before starting to generate results
for (int i = 0; i < lowerOffset; ++i) {
for (Aggregator agg : aggs) {
agg.aggregate();
}
rowIdProvider.decrementAndGet();
}
// Prime the results
if (rowIdProvider.get() >= 0) {
for (int i = 0; i < aggs.length; i++) {
aggs[i].aggregate();
results[i][resultStorageIndex] = aggs[i].get();
aggs[i].close();
aggs[i] = aggFactories[i].factorize(columnSelectorFactory);
}
--resultStorageIndex;
rowIdProvider.decrementAndGet();
}
// From here out, we want to aggregate, peel off a row of results and then accumulate the aggregation
for (int rowId = rowIdProvider.get(); rowId >= 0; --rowId) {
for (int i = 0; i < aggs.length; i++) {
aggs[i].aggregate();
results[i][resultStorageIndex] = aggs[i].get();
aggs[i].close();
// Use a combining aggregator to combine the result we just got with the result from the previous row
// This is a lot of hoops to jump through just to combine two values, but AggregatorFactory.combine
// allows for mutation of either of the arguments passed in, so it cannot be meaningfully used in this
// context. Instead, we have to jump through these hoops to make sure that we are generating a new object.
// It would've been nice if the AggregatorFactory interface had methods that were more usable for this,
// but it doesn't so :shrug:
final CumulativeColumnSelectorFactory combiningFactory = new CumulativeColumnSelectorFactory(
aggFactories[i],
results[i],
resultStorageIndex + 1
);
final Aggregator combiningAgg = combiningFactories[i].factorize(combiningFactory);
combiningAgg.aggregate();
combiningFactory.decrement();
combiningAgg.aggregate();
results[i][resultStorageIndex] = combiningAgg.get();
combiningAgg.close();
aggs[i] = aggFactories[i].factorize(columnSelectorFactory);
}
--resultStorageIndex;
rowIdProvider.decrementAndGet();
}
// If we haven't filled up all of the results yet, there are no more rows, so just point the rest of the results
// at the last result that we generated
for (Object[] resultArr : results) {
Arrays.fill(resultArr, 0, resultStorageIndex + 1, resultArr[resultStorageIndex + 1]);
}
return makeReturnRAC(aggFactories, results);
}
private AppendableRowsAndColumns aggregateWindowApertureWellBehaved(
AggregatorFactory[] aggFactories,
int lowerOffset,
int upperOffset
)
{
/**
* There are 3 different phases of operation when we have more rows than our window size
* 1. Our window is not full, as we walk the rows we build up towards filling it
* 2. Our window is full, as we walk the rows we take a value off and add a new aggregation
* 3. We are nearing the end of the rows, we need to start shrinking the window aperture
*/
int numRows = rac.numRows();
int windowSize = lowerOffset + upperOffset + 1;
// We store the results in an Object array for convenience. This is definitely sub-par from a memory management
// point of view as we should use native arrays when possible. This will be fine for now, but it probably makes
// sense to look at optimizing this in the future. That said, such an optimization might best come by having
// a specialized implementation of this interface against, say, a Frame object that can deal with arrays instead
// of trying to optimize this generic implementation.
Object[][] results = new Object[aggFactories.length][numRows];
int resultStorageIndex = 0;
AtomicInteger rowIdProvider = new AtomicInteger(0);
final ColumnSelectorFactory columnSelectorFactory = ColumnSelectorFactoryMaker.fromRAC(rac).make(rowIdProvider);
// This is the number of aggregators to actually aggregate for the current row.
// Which also doubles as the nextIndex to roll through as we roll things in and out of the window
int nextIndex = lowerOffset + 1;
Aggregator[][] aggregators = new Aggregator[aggFactories.length][windowSize];
for (int i = 0; i < aggregators.length; i++) {
final AggregatorFactory aggFactory = aggFactories[i];
// instantiate the aggregators that need to be read on the first row.
for (int j = 0; j < nextIndex; j++) {
aggregators[i][j] = aggFactory.factorize(columnSelectorFactory);
}
}
// The first few rows will slowly build out the window to consume the upper-offset. The window will not
// be full until we have walked upperOffset number of rows, so phase 1 runs until we have consumed
// upperOffset number of rows.
for (int upperIndex = 0; upperIndex < upperOffset; ++upperIndex) {
for (Aggregator[] aggregator : aggregators) {
for (int j = 0; j < nextIndex; ++j) {
aggregator[j].aggregate();
}
}
for (int i = 0; i < aggFactories.length; ++i) {
aggregators[i][nextIndex] = aggFactories[i].factorize(columnSelectorFactory);
}
++nextIndex;
rowIdProvider.incrementAndGet();
}
// End Phase 1, Enter Phase 2. At this point, nextIndex == windowSize, rowIdProvider is the same as
// upperOffset and the aggregators matrix is entirely non-null. We need to iterate until our window has all of
// the aggregators in it to fill up the final result set.
int endResultStorageIndex = numRows - windowSize;
for (; resultStorageIndex < endResultStorageIndex; ++resultStorageIndex) {
for (Aggregator[] aggregator : aggregators) {
for (Aggregator value : aggregator) {
value.aggregate();
}
}
if (nextIndex == windowSize) {
// Wrap back around and start pruning from the beginning of the window
nextIndex = 0;
}
for (int i = 0; i < aggFactories.length; ++i) {
results[i][resultStorageIndex] = aggregators[i][nextIndex].get();
aggregators[i][nextIndex].close();
aggregators[i][nextIndex] = aggFactories[i].factorize(columnSelectorFactory);
}
++nextIndex;
rowIdProvider.incrementAndGet();
}
if (nextIndex == windowSize) {
nextIndex = 0;
}
// End Phase 2, enter Phase 3. At this point, our window has enough aggregators in it to fill up our final
// result set. This means that for each new row that we complete, the window will "shrink" until we hit numRows,
// at which point we will collect anything yet remaining and be done.
if (nextIndex != 0) {
// Start by organizing the aggregators so that we are 0-indexed from nextIndex. This trades off creating
// a new array of references in exchange for removing branches inside of the loop. It also makes the logic
// simpler to understand.
Aggregator[][] reorganizedAggs = new Aggregator[aggFactories.length][windowSize];
for (int i = 0; i < aggFactories.length; i++) {
System.arraycopy(aggregators[i], nextIndex, reorganizedAggs[i], 0, windowSize - nextIndex);
System.arraycopy(aggregators[i], 0, reorganizedAggs[i], windowSize - nextIndex, nextIndex);
}
aggregators = reorganizedAggs;
nextIndex = 0;
}
for (int rowId = rowIdProvider.get(); rowId < numRows; ++rowId) {
for (Aggregator[] aggregator : aggregators) {
for (int j = nextIndex; j < aggregator.length; ++j) {
aggregator[j].aggregate();
}
}
for (int i = 0; i < aggFactories.length; ++i) {
results[i][resultStorageIndex] = aggregators[i][nextIndex].get();
aggregators[i][nextIndex].close();
aggregators[i][nextIndex] = null;
}
++nextIndex;
++resultStorageIndex;
rowIdProvider.incrementAndGet();
}
// End Phase 3, anything left in the window needs to be collected and put into our results
for (; nextIndex < windowSize; ++nextIndex) {
for (int i = 0; i < aggFactories.length; ++i) {
results[i][resultStorageIndex] = aggregators[i][nextIndex].get();
aggregators[i][nextIndex] = null;
}
++resultStorageIndex;
}
return makeReturnRAC(aggFactories, results);
}
private AppendableRowsAndColumns aggregateWindowApertureInFlux(
AggregatorFactory[] aggFactories,
int lowerOffset,
int upperOffset
)
{
int numRows = rac.numRows();
int windowSize = numRows;
// We store the results in an Object array for convenience. This is definitely sub-par from a memory management
// point of view as we should use native arrays when possible. This will be fine for now, but it probably makes
// sense to look at optimizing this in the future. That said, such an optimization might best come by having
// a specialized implementation of this interface against, say, a Frame object that can deal with arrays instead
// of trying to optimize this generic implementation.
Object[][] results = new Object[aggFactories.length][numRows];
int resultStorageIndex = 0;
AtomicInteger rowIdProvider = new AtomicInteger(0);
final ColumnSelectorFactory columnSelectorFactory = ColumnSelectorFactoryMaker.fromRAC(rac).make(rowIdProvider);
Aggregator[][] aggregators = new Aggregator[aggFactories.length][windowSize];
for (int i = 0; i < aggregators.length; i++) {
final AggregatorFactory aggFactory = aggFactories[i];
for (int j = 0; j < aggregators[i].length; j++) {
aggregators[i][j] = aggFactory.factorize(columnSelectorFactory);
}
}
// This is the index to stop at for the current window aperture
// The first row is used by all of the results for the lowerOffset num results, plus 1 for the "current row"
int stopIndex = Math.min(lowerOffset + 1, numRows);
int startIndex = 0;
int rowId = rowIdProvider.get();
while (rowId < numRows) {
for (Aggregator[] aggregator : aggregators) {
for (int j = startIndex; j < stopIndex; ++j) {
aggregator[j].aggregate();
}
}
if (rowId >= upperOffset) {
for (int i = 0; i < aggregators.length; ++i) {
results[i][resultStorageIndex] = aggregators[i][startIndex].get();
aggregators[i][startIndex].close();
aggregators[i][startIndex] = null;
}
++resultStorageIndex;
++startIndex;
}
if (stopIndex < windowSize) {
++stopIndex;
}
rowId = rowIdProvider.incrementAndGet();
}
for (; startIndex < windowSize; ++startIndex) {
for (int i = 0; i < aggregators.length; ++i) {
results[i][resultStorageIndex] = aggregators[i][startIndex].get();
aggregators[i][startIndex].close();
aggregators[i][startIndex] = null;
}
++resultStorageIndex;
}
return makeReturnRAC(aggFactories, results);
}
private AppendableRowsAndColumns makeReturnRAC(AggregatorFactory[] aggFactories, Object[][] results)
{
for (int i = 0; i < aggFactories.length; ++i) {
rac.addColumn(
aggFactories[i].getName(), new ObjectArrayColumn(results[i], aggFactories[i].getIntermediateType())
);
}
return rac;
}
private static class CumulativeColumnSelectorFactory implements ColumnSelectorFactory
{
private final ColumnCapabilitiesImpl columnCapabilities;
private final Object[] results;
private int index;
public CumulativeColumnSelectorFactory(AggregatorFactory factory, Object[] results, int initialIndex)
{
this.results = results;
this.index = initialIndex;
this.columnCapabilities = new ColumnCapabilitiesImpl()
.setHasBitmapIndexes(false)
.setDictionaryEncoded(false)
.setHasMultipleValues(false)
.setDictionaryValuesUnique(false)
.setFilterable(false)
.setType(factory.getIntermediateType());
}
public void increment()
{
++index;
}
public void decrement()
{
--index;
}
@Override
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
{
throw new UOE("combining factory shouldn't need dimensions, just columnValue, dim[%s]", dimensionSpec);
}
@Override
public ColumnValueSelector makeColumnValueSelector(String columnName)
{
return new ObjectColumnSelector()
{
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
@Nullable
@Override
public Object getObject()
{
return results[index];
}
@Override
public Class classOfObject()
{
return results[index].getClass();
}
};
}
@Nullable
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
return columnCapabilities;
}
}
}

View File

@ -17,9 +17,10 @@
* under the License.
*/
package org.apache.druid.query.rowsandcols;
package org.apache.druid.query.rowsandcols.semantic;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
@ -55,7 +56,7 @@ public class DefaultGroupPartitioner implements GroupPartitioner
int prevGroupVal = 0;
for (int i = 1; i < retVal.length; ++i) {
if (retVal[i] == prevGroupVal) {
int comparison = accessor.compareCells(i - 1, i);
int comparison = accessor.compareRows(i - 1, i);
if (comparison == 0) {
retVal[i] = currGroup;
continue;

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.semantic;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.rowsandcols.OnHeapCumulativeAggregatable;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class DefaultOnHeapAggregatable implements OnHeapAggregatable, OnHeapCumulativeAggregatable
{
private final RowsAndColumns rac;
public DefaultOnHeapAggregatable(
RowsAndColumns rac
)
{
this.rac = rac;
}
@Override
public ArrayList<Object> aggregateAll(
List<AggregatorFactory> aggFactories
)
{
Aggregator[] aggs = new Aggregator[aggFactories.size()];
AtomicInteger currRow = new AtomicInteger(0);
int index = 0;
for (AggregatorFactory aggFactory : aggFactories) {
aggs[index++] = aggFactory.factorize(new DefaultColumnSelectorFactoryMaker.ColumnAccessorBasedColumnSelectorFactory(
currRow,
rac
));
}
int numRows = rac.numRows();
int rowId = currRow.get();
while (rowId < numRows) {
for (Aggregator agg : aggs) {
agg.aggregate();
}
rowId = currRow.incrementAndGet();
}
ArrayList<Object> retVal = new ArrayList<>(aggs.length);
for (Aggregator agg : aggs) {
retVal.add(agg.get());
agg.close();
}
return retVal;
}
@Override
public ArrayList<Object[]> aggregateCumulative(List<AggregatorFactory> aggFactories)
{
Aggregator[] aggs = new Aggregator[aggFactories.size()];
ArrayList<Object[]> retVal = new ArrayList<>(aggFactories.size());
int numRows = rac.numRows();
AtomicInteger currRow = new AtomicInteger(0);
int index = 0;
for (AggregatorFactory aggFactory : aggFactories) {
aggs[index++] = aggFactory.factorize(new DefaultColumnSelectorFactoryMaker.ColumnAccessorBasedColumnSelectorFactory(
currRow,
rac
));
retVal.add(new Object[numRows]);
}
int rowId = currRow.get();
while (rowId < numRows) {
for (int i = 0; i < aggs.length; ++i) {
aggs[i].aggregate();
retVal.get(i)[rowId] = aggs[i].get();
}
rowId = currRow.incrementAndGet();
}
for (Aggregator agg : aggs) {
agg.close();
}
return retVal;
}
}

View File

@ -17,12 +17,13 @@
* under the License.
*/
package org.apache.druid.query.rowsandcols;
package org.apache.druid.query.rowsandcols.semantic;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.operator.LimitedRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
@ -61,7 +62,7 @@ public class DefaultSortedGroupPartitioner implements SortedGroupPartitioner
int start = boundaries.getInt(i - 1);
int end = boundaries.getInt(i);
for (int j = start + 1; j < end; ++j) {
int comparison = accessor.compareCells(j - 1, j);
int comparison = accessor.compareRows(j - 1, j);
if (comparison < 0) {
newBoundaries.add(j);
} else if (comparison > 0) {

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.semantic;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.operator.window.WindowFrame;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
/**
* A semantic interface used to aggregate a list of AggregatorFactories across a given set of data
* <p>
* The aggregation specifically happens on-heap and should be used in places where it is known that the data
* set can be worked with entirely on-heap. There is support for frame definitions, frames aggregate certain
* subsets of rows in a rolling fashion like a windowed average. Frames are defined in terms of boundaries
* where a boundary could be based on rows or it could be based on "PEER" groupings.
* <p>
* A peer grouping is defined as a set of rows that are the same based on the ORDER BY columns specified. As such
* peer-grouped values must also come with a set of ORDER BY columns.
*/
public interface FramedOnHeapAggregatable
{
static FramedOnHeapAggregatable fromRAC(RowsAndColumns rac)
{
FramedOnHeapAggregatable retVal = rac.as(FramedOnHeapAggregatable.class);
if (retVal == null) {
retVal = new DefaultFramedOnHeapAggregatable(RowsAndColumns.expectAppendable(rac));
}
return retVal;
}
/**
* Aggregates the data according to the {@link WindowFrame} using the {@code AggregatorFactory} objects provided.
*
* @param frame window frame definition
* @param aggFactories definition of aggregations to be done
* @return a RowsAndColumns that contains columns representing the results of the aggregation
* from the AggregatorFactories
*/
RowsAndColumns aggregateAll(WindowFrame frame, AggregatorFactory[] aggFactories);
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.druid.query.rowsandcols;
package org.apache.druid.query.rowsandcols.semantic;
import java.util.List;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.druid.query.rowsandcols;
package org.apache.druid.query.rowsandcols.semantic;
import org.apache.druid.query.aggregation.AggregatorFactory;

View File

@ -17,7 +17,9 @@
* under the License.
*/
package org.apache.druid.query.rowsandcols;
package org.apache.druid.query.rowsandcols.semantic;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import java.util.ArrayList;
import java.util.List;

View File

@ -23,9 +23,9 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.frame.MapOfColumnsRowsAndColumns;
import org.junit.Assert;
import org.junit.Test;

View File

@ -21,9 +21,9 @@ package org.apache.druid.query.operator;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.frame.MapOfColumnsRowsAndColumns;
import org.junit.Assert;
import org.junit.Test;

View File

@ -21,8 +21,8 @@ package org.apache.druid.query.operator;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.frame.MapOfColumnsRowsAndColumns;
import org.junit.Assert;
import org.junit.Test;

View File

@ -21,9 +21,9 @@ package org.apache.druid.query.operator;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.query.operator.window.Processor;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.frame.MapOfColumnsRowsAndColumns;
import org.junit.Assert;
import org.junit.Test;

View File

@ -96,6 +96,13 @@ public class RowsAndColumnsHelper
return this;
}
public RowsAndColumnsHelper expectColumn(String col, Object[] expectedVals, ColumnType type)
{
final ColumnHelper helper = columnHelper(col, expectedVals.length, type);
helper.setExpectation(expectedVals);
return this;
}
public ColumnHelper columnHelper(String column, int expectedSize, ColumnType expectedType)
{
ColumnHelper retVal = helpers.get(column);
@ -216,51 +223,43 @@ public class RowsAndColumnsHelper
Assert.assertTrue(msg, accessor.isNull(i));
Assert.assertNull(msg, accessor.getObject(i));
}
Assert.assertEquals(msg + " is null?", expectedNulls[i], accessor.isNull(i));
if (expectedVal instanceof Float) {
if (expectedNulls[i]) {
Assert.assertTrue(msg, accessor.isNull(i));
Assert.assertEquals(msg, 0.0f, accessor.getFloat(i), 0.0);
} else {
Assert.assertFalse(msg, accessor.isNull(i));
Assert.assertEquals(msg, (Float) expectedVal, accessor.getFloat(i), 0.0);
}
} else if (expectedVal instanceof Double) {
if (expectedNulls[i]) {
Assert.assertTrue(msg, accessor.isNull(i));
Assert.assertEquals(msg, 0.0d, accessor.getDouble(i), 0.0);
} else {
Assert.assertFalse(msg, accessor.isNull(i));
Assert.assertEquals(msg, (Double) expectedVal, accessor.getDouble(i), 0.0);
}
} else if (expectedVal instanceof Integer) {
if (expectedNulls[i]) {
Assert.assertTrue(msg, accessor.isNull(i));
Assert.assertEquals(msg, 0, accessor.getInt(i));
} else {
Assert.assertFalse(msg, accessor.isNull(i));
Assert.assertEquals(msg, ((Integer) expectedVal).intValue(), accessor.getInt(i));
}
} else if (expectedVal instanceof Long) {
if (expectedNulls[i]) {
Assert.assertTrue(msg, accessor.isNull(i));
Assert.assertEquals(msg, 0, accessor.getLong(i));
} else {
Assert.assertFalse(msg, accessor.isNull(i));
Assert.assertEquals(msg, ((Long) expectedVal).longValue(), accessor.getLong(i));
}
} else {
if (expectedNulls[i]) {
Assert.assertTrue(msg, accessor.isNull(i));
Assert.assertNull(msg, accessor.getObject(i));
// asserting null on the expected value is here for consistency in the tests. If it fails, it's most
// likely indicative of something wrong with the test setup than the actual logic, we keep it for
// sanity's sake to things consistent.
Assert.assertNull(msg, expectedVals[i]);
Assert.assertNull(msg, expectedVal);
} else {
final Object obj = accessor.getObject(i);
Assert.assertFalse(msg, accessor.isNull(i));
Assert.assertNotNull(msg, obj);
Assert.assertEquals(msg, expectedVals[i], obj);
Assert.assertEquals(msg, expectedVal, obj);
}
}
}

View File

@ -25,12 +25,12 @@ import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.operator.window.ranking.WindowRowNumberProcessor;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.query.rowsandcols.frame.MapOfColumnsRowsAndColumns;
import org.apache.druid.segment.column.ColumnType;
import org.junit.Assert;
import org.junit.Test;

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.operator.window;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.rowsandcols.AsOnlyTestRowsAndColumns;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.NoAsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable;
import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nullable;
@SuppressWarnings("unchecked")
public class WindowFramedAggregateProcessorTest
{
@Test
public void testIsPassThruWhenRACReturnsSemanticInterface()
{
final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, false, 0);
final AggregatorFactory[] theAggs = {
new LongMaxAggregatorFactory("cummMax", "intCol"),
new DoubleSumAggregatorFactory("cummSum", "doubleCol")
};
WindowFramedAggregateProcessor proc = new WindowFramedAggregateProcessor(theFrame, theAggs);
final MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(ImmutableMap.of(
"yay", new IntArrayColumn(new int[]{1, 2, 3})
));
final RowsAndColumns processed = proc.process(new AsOnlyTestRowsAndColumns(theFrame, theAggs, rac)
{
@Nullable
@Override
public <T> T as(Class<T> clazz)
{
Assert.assertEquals(clazz, FramedOnHeapAggregatable.class);
return (T) (FramedOnHeapAggregatable) (frame, aggFactories) -> {
Assert.assertEquals(theFrame, frame);
Assert.assertArrayEquals(theAggs, aggFactories);
return rac;
};
}
});
Assert.assertSame(rac, processed);
}
@Test
public void testDoesStuffWhenNoSemanticInterfacesAvailable()
{
final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, false, 0);
final AggregatorFactory[] theAggs = {
new LongSumAggregatorFactory("sum", "intCol")
};
WindowFramedAggregateProcessor proc = new WindowFramedAggregateProcessor(theFrame, theAggs);
final MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(ImmutableMap.of(
"intCol", new IntArrayColumn(new int[]{1, 2, 3})
));
final RowsAndColumns processed = proc.process(new NoAsRowsAndColumns(rac));
new RowsAndColumnsHelper()
.expectColumn("intCol", new int[]{1, 2, 3})
.expectColumn("sum", new int[]{1, 3, 6})
.allColumnsRegistered()
.validate(processed);
}
}

View File

@ -21,10 +21,10 @@ package org.apache.druid.query.operator.window.ranking;
import org.apache.druid.query.operator.window.Processor;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.frame.MapOfColumnsRowsAndColumns;
import org.junit.Test;
import java.util.Collections;

View File

@ -21,10 +21,10 @@ package org.apache.druid.query.operator.window.ranking;
import org.apache.druid.query.operator.window.Processor;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.frame.MapOfColumnsRowsAndColumns;
import org.junit.Test;
import java.util.Collections;

View File

@ -22,12 +22,12 @@ package org.apache.druid.query.operator.window.ranking;
import org.apache.druid.query.operator.window.ComposingProcessor;
import org.apache.druid.query.operator.window.Processor;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.query.rowsandcols.frame.MapOfColumnsRowsAndColumns;
import org.apache.druid.segment.column.ColumnType;
import org.junit.Test;

View File

@ -22,10 +22,10 @@ package org.apache.druid.query.operator.window.ranking;
import org.apache.druid.query.operator.window.ComposingProcessor;
import org.apache.druid.query.operator.window.Processor;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.frame.MapOfColumnsRowsAndColumns;
import org.junit.Test;
import java.util.Collections;

View File

@ -21,12 +21,12 @@ package org.apache.druid.query.operator.window.ranking;
import org.apache.druid.query.operator.window.Processor;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.query.rowsandcols.frame.MapOfColumnsRowsAndColumns;
import org.apache.druid.segment.column.ColumnType;
import org.junit.Test;

View File

@ -21,12 +21,12 @@ package org.apache.druid.query.operator.window.value;
import org.apache.druid.query.operator.window.ComposingProcessor;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.query.rowsandcols.frame.MapOfColumnsRowsAndColumns;
import org.apache.druid.segment.column.ColumnType;
import org.junit.Test;

View File

@ -21,12 +21,12 @@ package org.apache.druid.query.operator.window.value;
import org.apache.druid.query.operator.window.ComposingProcessor;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.query.rowsandcols.frame.MapOfColumnsRowsAndColumns;
import org.apache.druid.segment.column.ColumnType;
import org.junit.Test;

View File

@ -21,12 +21,12 @@ package org.apache.druid.query.operator.window.value;
import org.apache.druid.query.operator.window.ComposingProcessor;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.query.rowsandcols.frame.MapOfColumnsRowsAndColumns;
import org.apache.druid.segment.column.ColumnType;
import org.junit.Test;

View File

@ -21,12 +21,12 @@ package org.apache.druid.query.operator.window.value;
import org.apache.druid.query.operator.window.ComposingProcessor;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.query.rowsandcols.frame.MapOfColumnsRowsAndColumns;
import org.apache.druid.segment.column.ColumnType;
import org.junit.Test;

View File

@ -21,17 +21,21 @@ package org.apache.druid.query.rowsandcols;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.frame.MapOfColumnsRowsAndColumns;
import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.function.Function;
public class ArrayListRowsAndColumnsTest extends RowsAndColumnsTestBase<ArrayListRowsAndColumns<Object[]>>
public class ArrayListRowsAndColumnsTest extends RowsAndColumnsTestBase
{
@Override
public ArrayListRowsAndColumns<Object[]> makeRowsAndColumns(MapOfColumnsRowsAndColumns input)
public ArrayListRowsAndColumnsTest()
{
super(ArrayListRowsAndColumns.class);
}
@Nonnull
public static Function<MapOfColumnsRowsAndColumns, ArrayListRowsAndColumns<Object[]>> MAKER = input -> {
ArrayList<Object[]> rows = new ArrayList<>(input.numRows());
ArrayList<String> cols = new ArrayList<>(input.getColumnNames());
@ -62,5 +66,5 @@ public class ArrayListRowsAndColumnsTest extends RowsAndColumnsTestBase<ArrayLis
},
sigBob.build()
);
}
};
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.operator.window.WindowFrame;
import org.apache.druid.query.rowsandcols.column.Column;
import java.util.Collection;
public abstract class AsOnlyTestRowsAndColumns implements RowsAndColumns
{
private final WindowFrame theFrame;
private final AggregatorFactory[] theAggs;
private final MapOfColumnsRowsAndColumns rac;
public AsOnlyTestRowsAndColumns(
WindowFrame theFrame,
AggregatorFactory[] theAggs,
MapOfColumnsRowsAndColumns rac
)
{
this.theFrame = theFrame;
this.theAggs = theAggs;
this.rac = rac;
}
@Override
public Collection<String> getColumnNames()
{
throw new UOE("not called");
}
@Override
public int numRows()
{
throw new UOE("not called");
}
@Override
public Column findColumn(String name)
{
throw new UOE("not called");
}
}

View File

@ -21,18 +21,16 @@ package org.apache.druid.query.rowsandcols;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.frame.MapOfColumnsRowsAndColumns;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
public class MapOfColumnsRowsAndColumnsTest extends RowsAndColumnsTestBase<MapOfColumnsRowsAndColumns>
public class MapOfColumnsRowsAndColumnsTest extends RowsAndColumnsTestBase
{
@Override
public MapOfColumnsRowsAndColumns makeRowsAndColumns(MapOfColumnsRowsAndColumns input)
public MapOfColumnsRowsAndColumnsTest()
{
return input;
super(MapOfColumnsRowsAndColumns.class);
}
@Test

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols;
import org.apache.druid.query.rowsandcols.column.Column;
import javax.annotation.Nullable;
import java.util.Collection;
public class NoAsRowsAndColumns implements RowsAndColumns
{
private final RowsAndColumns rac;
public NoAsRowsAndColumns(RowsAndColumns rac)
{
this.rac = rac;
}
@Override
public Collection<String> getColumnNames()
{
return rac.getColumnNames();
}
@Override
public int numRows()
{
return rac.numRows();
}
@Override
public Column findColumn(String name)
{
return rac.findColumn(name);
}
@Nullable
@Override
public <T> T as(Class<T> clazz)
{
// Pretend like this doesn't implement any semantic interfaces
return null;
}
}

View File

@ -19,170 +19,85 @@
package org.apache.druid.query.rowsandcols;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.frame.AppendableMapOfColumns;
import org.apache.druid.query.rowsandcols.frame.MapOfColumnsRowsAndColumns;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
/**
* This base class is intended to serve as a common set of tests to validate specific RowsAndColumns implementations.
* This test base exists to enable testing of RowsAndColumns objects. When an implementation adds itself to this test
* it will automatically be tested against every semantic interface that also participates in this test suite (should
* be all of them).
* <p>
* Different RowsAndColumns implementations will implement different of the semantic interfaces, this base class should
* test all of the possible semantic interfaces that can be implemented. By doing it this way, we can ensure that
* new RowsAndColumns implementations meet all of the corners cases and other issues that have been previously found.
* These test suites are combined a bit precariously, so there is work that the developer needs to do to make sure
* that things are wired up correctly. Specifically, a developer must register their RowsAndColumns implementation
* by adding an entry to the static {@link #getMakers()} method on this base class. The developer should *also*
* create a test class for their RowsAndColumns object that extends this class. By creating the test class that
* extends this class, there will be an extra validation done that ensures that the list of makers includes their
* RowsAndColumns class.
* <p>
* It is expected that this base class is going to grow quite large. As it gets extra large, we could perhaps look
* into whether one of the JUnit test runners could allow us to further sub-divide the test functionality into
* semantic-interface-specific tests. The ultimate goal, however, should be that a new RowsAndColumns implementation
* can very simply take advantage of all of the tests by implementing the abstract
* {@link #makeRowsAndColumns(MapOfColumnsRowsAndColumns)} method and be done.
*
* @param <T>
* The semantic interfaces, on the other hand, should all create a test that extends
* {@link org.apache.druid.query.rowsandcols.semantic.SemanticTestBase}. That test sets up a parameterized test,
* using the results of {@link #getMakers()} to do the parameterization.
*/
public abstract class RowsAndColumnsTestBase<T extends RowsAndColumns>
public abstract class RowsAndColumnsTestBase
{
static {
NullHandling.initializeForTests();
}
public abstract T makeRowsAndColumns(MapOfColumnsRowsAndColumns input);
private final Class<?> expectedClass;
@Test
public void testDefaultSortedGroupPartitioner()
private static final AtomicReference<Iterable<Object[]>> MAKERS = new AtomicReference<>();
public static Iterable<Object[]> getMakers()
{
T rac = makeRowsAndColumns(MapOfColumnsRowsAndColumns.fromMap(
ImmutableMap.of(
"sorted", new IntArrayColumn(new int[]{0, 0, 0, 1, 1, 2, 4, 4, 4}),
"unsorted", new IntArrayColumn(new int[]{3, 54, 21, 1, 5, 54, 2, 3, 92})
)
));
Iterable<Object[]> retVal = MAKERS.get();
if (retVal == null) {
retVal = Lists.newArrayList(
new Object[]{MapOfColumnsRowsAndColumns.class, Function.identity()},
new Object[]{ArrayListRowsAndColumns.class, ArrayListRowsAndColumnsTest.MAKER}
);
for (Object[] objects : retVal) {
Class<?> aClazz = (Class<?>) objects[0];
final String expectedName = aClazz.getName() + "Test";
try {
final Class<?> testClass = Class.forName(expectedName);
if (!RowsAndColumnsTestBase.class.isAssignableFrom(testClass)) {
throw new ISE("testClass[%s] doesn't extend RowsAndColumnsTestBase, please extend it.", testClass);
}
}
catch (ClassNotFoundException e) {
throw new ISE("aClazz[%s] didn't have test class[%s], please make it", aClazz, expectedName);
}
}
validateSortedGroupPartitioner("default", new DefaultSortedGroupPartitioner(rac));
SortedGroupPartitioner specialized = rac.as(SortedGroupPartitioner.class);
if (specialized != null) {
validateSortedGroupPartitioner("specialized", specialized);
MAKERS.set(retVal);
}
return retVal;
}
private void validateSortedGroupPartitioner(String name, SortedGroupPartitioner parter)
public RowsAndColumnsTestBase(
Class<?> expectedClass
)
{
int[] expectedBounds = new int[]{0, 3, 5, 6, 9};
List<RowsAndColumnsHelper> expectations = Arrays.asList(
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{0, 0, 0})
.expectColumn("unsorted", new int[]{3, 54, 21})
.allColumnsRegistered(),
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{1, 1})
.expectColumn("unsorted", new int[]{1, 5})
.allColumnsRegistered(),
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{2})
.expectColumn("unsorted", new int[]{54})
.allColumnsRegistered(),
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{4, 4, 4})
.expectColumn("unsorted", new int[]{2, 3, 92})
.allColumnsRegistered()
);
final List<String> partCols = Collections.singletonList("sorted");
Assert.assertArrayEquals(name, expectedBounds, parter.computeBoundaries(partCols));
final Iterator<RowsAndColumns> partedChunks = parter.partitionOnBoundaries(partCols).iterator();
for (RowsAndColumnsHelper expectation : expectations) {
Assert.assertTrue(name, partedChunks.hasNext());
expectation.validate(name, partedChunks.next());
}
Assert.assertFalse(name, partedChunks.hasNext());
boolean exceptionThrown = false;
try {
parter.partitionOnBoundaries(Collections.singletonList("unsorted"));
}
catch (ISE ex) {
Assert.assertEquals("Pre-sorted data required, rows[1] and [2] were not in order", ex.getMessage());
exceptionThrown = true;
}
Assert.assertTrue(exceptionThrown);
this.expectedClass = expectedClass;
}
@Test
public void testOnHeapAggregatable()
public void testInListOfMakers()
{
T rac = makeRowsAndColumns(MapOfColumnsRowsAndColumns.fromMap(
ImmutableMap.of(
"incremented", new IntArrayColumn(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
"zeroesOut", new IntArrayColumn(new int[]{4, -4, 3, -3, 4, 82, -90, 4, 0, 0})
)
));
validateOnHeapAggregatable("default", new DefaultOnHeapAggregatable(rac));
OnHeapAggregatable specialized = rac.as(OnHeapAggregatable.class);
if (specialized != null) {
validateOnHeapAggregatable("specialized", specialized);
boolean inList = false;
for (Object[] objs : getMakers()) {
if (expectedClass.equals(objs[0])) {
inList = true;
break;
}
}
}
private void validateOnHeapAggregatable(String name, OnHeapAggregatable agger)
{
final ArrayList<Object> results = agger.aggregateAll(Arrays.asList(
new LongSumAggregatorFactory("incremented", "incremented"),
new LongMaxAggregatorFactory("zeroesOutMax", "zeroesOut"),
new LongMinAggregatorFactory("zeroesOutMin", "zeroesOut")
));
Assert.assertEquals(name, 3, results.size());
Assert.assertEquals(name, 55L, results.get(0));
Assert.assertEquals(name, 82L, results.get(1));
Assert.assertEquals(name, -90L, results.get(2));
}
@Test
public void testAppendableRowsAndColumns()
{
T rac = makeRowsAndColumns(MapOfColumnsRowsAndColumns.fromMap(
ImmutableMap.of(
"colA", new IntArrayColumn(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
"colB", new IntArrayColumn(new int[]{4, -4, 3, -3, 4, 82, -90, 4, 0, 0})
)
));
validateAppendableRowsAndColumns("default", new AppendableMapOfColumns(rac));
AppendableRowsAndColumns specialized = rac.as(AppendableRowsAndColumns.class);
if (specialized != null) {
validateAppendableRowsAndColumns("specialized", specialized);
}
}
public void validateAppendableRowsAndColumns(String name, AppendableRowsAndColumns appender)
{
appender.addColumn("newCol", new IntArrayColumn(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}));
new RowsAndColumnsHelper()
.expectColumn("colA", new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
.expectColumn("colB", new int[]{4, -4, 3, -3, 4, 82, -90, 4, 0, 0})
.expectColumn("newCol", new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
.allColumnsRegistered()
.validate(name, appender);
Assert.assertTrue(inList);
}
}

View File

@ -39,7 +39,7 @@ public class NullColumnAccessorTest
Assert.assertEquals(0.0, accessor.getFloat(i), 0);
Assert.assertEquals(0.0, accessor.getDouble(i), 0);
for (int j = 0; j < i; ++j) {
Assert.assertEquals(0, accessor.compareCells(j, i));
Assert.assertEquals(0, accessor.compareRows(j, i));
}
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.semantic;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.junit.Test;
import java.util.function.Function;
public class AppendableRowsAndColumnsTest extends SemanticTestBase
{
public AppendableRowsAndColumnsTest(
String name,
Function<MapOfColumnsRowsAndColumns, RowsAndColumns> fn
)
{
super(name, fn);
}
@Test
public void testAppendableRowsAndColumns()
{
RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(
ImmutableMap.of(
"colA", new IntArrayColumn(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
"colB", new IntArrayColumn(new int[]{4, -4, 3, -3, 4, 82, -90, 4, 0, 0})
)
));
AppendableRowsAndColumns appender = RowsAndColumns.expectAppendable(rac);
appender.addColumn("newCol", new IntArrayColumn(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}));
new RowsAndColumnsHelper()
.expectColumn("colA", new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
.expectColumn("colB", new int[]{4, -4, 3, -3, 4, 82, -90, 4, 0, 0})
.expectColumn("newCol", new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
.allColumnsRegistered()
.validate(appender);
}
}

View File

@ -0,0 +1,459 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.semantic;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.operator.window.WindowFrame;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.segment.column.ColumnType;
import org.junit.Test;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Function;
public class FramedOnHeapAggregatableTest extends SemanticTestBase
{
public FramedOnHeapAggregatableTest(
String name,
Function<MapOfColumnsRowsAndColumns, RowsAndColumns> fn
)
{
super(name, fn);
}
@Test
public void testWindowedAggregationWindowSmallerThanRowsNoOffsets()
{
Map<String, Column> map = new LinkedHashMap<>();
map.put("intCol", new IntArrayColumn(new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(map));
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
final RowsAndColumns results = agger.aggregateAll(
new WindowFrame(WindowFrame.PeerType.ROWS, false, 0, false, 0),
new AggregatorFactory[]{
new LongSumAggregatorFactory("sumFromLong", "intCol"),
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
}
);
new RowsAndColumnsHelper()
.expectColumn("intCol", new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
.expectColumn("sumFromLong", new long[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
.expectColumn("maxFromInt", new double[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
.allColumnsRegistered()
.validate(results);
}
@Test
public void testWindowedAggregationWindowSmallerThanRows()
{
Map<String, Column> map = new LinkedHashMap<>();
map.put("intCol", new IntArrayColumn(new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(map));
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
final RowsAndColumns results = agger.aggregateAll(
new WindowFrame(WindowFrame.PeerType.ROWS, false, 1, false, 2),
new AggregatorFactory[]{
new LongSumAggregatorFactory("sumFromLong", "intCol"),
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
}
);
new RowsAndColumnsHelper()
.expectColumn("intCol", new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
.expectColumn("sumFromLong", new long[]{3, 6, 10, 14, 18, 22, 26, 30, 24, 17})
.expectColumn("maxFromInt", new double[]{2, 3, 4, 5, 6, 7, 8, 9, 9, 9})
.allColumnsRegistered()
.validate(results);
}
@Test
public void testWindowedAggregationWindowSmallerThanRowsOnlyUpper()
{
Map<String, Column> map = new LinkedHashMap<>();
map.put("intCol", new IntArrayColumn(new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(map));
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
final RowsAndColumns results = agger.aggregateAll(
new WindowFrame(WindowFrame.PeerType.ROWS, false, 0, false, 2),
new AggregatorFactory[]{
new LongSumAggregatorFactory("sumFromLong", "intCol"),
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
}
);
new RowsAndColumnsHelper()
.expectColumn("intCol", new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
.expectColumn("sumFromLong", new long[]{3, 6, 9, 12, 15, 18, 21, 24, 17, 9})
.expectColumn("maxFromInt", new double[]{2, 3, 4, 5, 6, 7, 8, 9, 9, 9})
.allColumnsRegistered()
.validate(results);
}
@Test
public void testWindowedAggregationWindowSmallerThanRowsOnlyLower()
{
Map<String, Column> map = new LinkedHashMap<>();
map.put("intCol", new IntArrayColumn(new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(map));
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
final RowsAndColumns results = agger.aggregateAll(
new WindowFrame(WindowFrame.PeerType.ROWS, false, 2, false, 0),
new AggregatorFactory[]{
new LongSumAggregatorFactory("sumFromLong", "intCol"),
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
}
);
new RowsAndColumnsHelper()
.expectColumn("intCol", new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
.expectColumn("sumFromLong", new long[]{0, 1, 3, 6, 9, 12, 15, 18, 21, 24})
.expectColumn("maxFromInt", new double[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
.allColumnsRegistered()
.validate(results);
}
@Test
public void testWindowedAggregationWindowLargerThanRows()
{
Map<String, Column> map = new LinkedHashMap<>();
map.put("intCol", new IntArrayColumn(new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(map));
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
final RowsAndColumns results = agger.aggregateAll(
new WindowFrame(WindowFrame.PeerType.ROWS, false, 5, false, 7),
new AggregatorFactory[]{
new LongSumAggregatorFactory("sumFromLong", "intCol"),
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
new LongMinAggregatorFactory("longMin", "intCol"),
}
);
new RowsAndColumnsHelper()
.expectColumn("intCol", new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
.expectColumn("sumFromLong", new long[]{28, 36, 45, 45, 45, 45, 45, 44, 42, 39})
.expectColumn("maxFromInt", new double[]{7, 8, 9, 9, 9, 9, 9, 9, 9, 9})
.expectColumn("longMin", new long[]{0, 0, 0, 0, 0, 0, 1, 2, 3, 4})
.allColumnsRegistered()
.validate(results);
}
@Test
public void testWindowedAggregationLowerLargerThanRows()
{
Map<String, Column> map = new LinkedHashMap<>();
map.put("intCol", new IntArrayColumn(new int[]{0, 1, 2}));
RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(map));
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
final RowsAndColumns results = agger.aggregateAll(
new WindowFrame(WindowFrame.PeerType.ROWS, false, 5, false, 1),
new AggregatorFactory[]{
new LongSumAggregatorFactory("sumFromLong", "intCol"),
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
new LongMinAggregatorFactory("longMin", "intCol"),
}
);
new RowsAndColumnsHelper()
.expectColumn("intCol", new int[]{0, 1, 2})
.expectColumn("sumFromLong", new long[]{1, 3, 3})
.expectColumn("maxFromInt", new double[]{1, 2, 2})
.expectColumn("longMin", new long[]{0, 0, 0})
.allColumnsRegistered()
.validate(results);
}
@Test
public void testWindowedAggregationLowerLargerThanRowsNoUpper()
{
Map<String, Column> map = new LinkedHashMap<>();
map.put("intCol", new IntArrayColumn(new int[]{0, 1, 2}));
RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(map));
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
final RowsAndColumns results = agger.aggregateAll(
new WindowFrame(WindowFrame.PeerType.ROWS, false, 5, false, 0),
new AggregatorFactory[]{
new LongSumAggregatorFactory("sumFromLong", "intCol"),
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
new LongMinAggregatorFactory("longMin", "intCol"),
}
);
new RowsAndColumnsHelper()
.expectColumn("intCol", new int[]{0, 1, 2})
.expectColumn("sumFromLong", new long[]{0, 1, 3})
.expectColumn("maxFromInt", new double[]{0, 1, 2})
.expectColumn("longMin", new long[]{0, 0, 0})
.allColumnsRegistered()
.validate(results);
}
@Test
public void testWindowedAggregationUpperLargerThanRows()
{
Map<String, Column> map = new LinkedHashMap<>();
map.put("intCol", new IntArrayColumn(new int[]{0, 1, 2}));
RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(map));
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
final RowsAndColumns results = agger.aggregateAll(
new WindowFrame(WindowFrame.PeerType.ROWS, false, 1, false, 7),
new AggregatorFactory[]{
new LongSumAggregatorFactory("sumFromLong", "intCol"),
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
new LongMinAggregatorFactory("longMin", "intCol"),
}
);
new RowsAndColumnsHelper()
.expectColumn("intCol", new int[]{0, 1, 2})
.expectColumn("sumFromLong", new long[]{3, 3, 3})
.expectColumn("maxFromInt", new double[]{2, 2, 2})
.expectColumn("longMin", new long[]{0, 0, 1})
.allColumnsRegistered()
.validate(results);
}
@Test
public void testWindowedAggregationUpperLargerThanRowsNoLower()
{
Map<String, Column> map = new LinkedHashMap<>();
map.put("intCol", new IntArrayColumn(new int[]{0, 1, 2}));
RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(map));
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
final RowsAndColumns results = agger.aggregateAll(
new WindowFrame(WindowFrame.PeerType.ROWS, false, 0, false, 7),
new AggregatorFactory[]{
new LongSumAggregatorFactory("sumFromLong", "intCol"),
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
new LongMinAggregatorFactory("longMin", "intCol"),
}
);
new RowsAndColumnsHelper()
.expectColumn("intCol", new int[]{0, 1, 2})
.expectColumn("sumFromLong", new long[]{3, 3, 2})
.expectColumn("maxFromInt", new double[]{2, 2, 2})
.expectColumn("longMin", new long[]{0, 1, 2})
.allColumnsRegistered()
.validate(results);
}
@Test
public void testWindowedAggregationWindowLargerThanRowsOnlyUpper()
{
Map<String, Column> map = new LinkedHashMap<>();
map.put("intCol", new IntArrayColumn(new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(map));
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
final RowsAndColumns results = agger.aggregateAll(
new WindowFrame(WindowFrame.PeerType.ROWS, false, 0, false, 7),
new AggregatorFactory[]{
new LongSumAggregatorFactory("sumFromLong", "intCol"),
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
new LongMinAggregatorFactory("longMin", "intCol"),
}
);
new RowsAndColumnsHelper()
.expectColumn("intCol", new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
.expectColumn("sumFromLong", new long[]{28, 36, 44, 42, 39, 35, 30, 24, 17, 9})
.expectColumn("maxFromInt", new double[]{7, 8, 9, 9, 9, 9, 9, 9, 9, 9})
.expectColumn("longMin", new long[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
.allColumnsRegistered()
.validate(results);
}
@Test
public void testWindowedAggregationWindowLargerThanRowsOnlyLower()
{
Map<String, Column> map = new LinkedHashMap<>();
map.put("intCol", new IntArrayColumn(new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(map));
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
final RowsAndColumns results = agger.aggregateAll(
new WindowFrame(WindowFrame.PeerType.ROWS, false, 5, false, 0),
new AggregatorFactory[]{
new LongSumAggregatorFactory("sumFromLong", "intCol"),
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
new LongMinAggregatorFactory("longMin", "intCol"),
}
);
new RowsAndColumnsHelper()
.expectColumn("intCol", new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
.expectColumn("sumFromLong", new long[]{0, 1, 3, 6, 10, 15, 21, 27, 33, 39})
.expectColumn("maxFromInt", new double[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
.expectColumn("longMin", new long[]{0, 0, 0, 0, 0, 0, 1, 2, 3, 4})
.allColumnsRegistered()
.validate(results);
}
@Test
public void testUnboundedWindowedAggregation()
{
Map<String, Column> map = new LinkedHashMap<>();
map.put("intCol", new IntArrayColumn(new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
map.put("doubleCol", new DoubleArrayColumn(new double[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
map.put("objectCol", new ObjectArrayColumn(
new String[]{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"},
ColumnType.STRING
)
);
RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(map));
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
final RowsAndColumns results = agger.aggregateAll(
new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0),
new AggregatorFactory[]{
new LongSumAggregatorFactory("sumFromLong", "intCol"),
new LongSumAggregatorFactory("sumFromDouble", "doubleCol"),
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
new DoubleMaxAggregatorFactory("maxFromDouble", "doubleCol")
}
);
new RowsAndColumnsHelper()
.expectColumn("intCol", new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
.expectColumn("doubleCol", new double[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
.expectColumn("objectCol", new String[]{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}, ColumnType.STRING)
.expectColumn("sumFromLong", new long[]{45, 45, 45, 45, 45, 45, 45, 45, 45, 45})
.expectColumn("sumFromDouble", new long[]{45, 45, 45, 45, 45, 45, 45, 45, 45, 45})
.expectColumn("maxFromInt", new double[]{9, 9, 9, 9, 9, 9, 9, 9, 9, 9})
.expectColumn("maxFromDouble", new double[]{9, 9, 9, 9, 9, 9, 9, 9, 9, 9})
.allColumnsRegistered()
.validate(results);
}
@Test
public void testCumulativeAggregation()
{
Map<String, Column> map = new LinkedHashMap<>();
map.put("intCol", new IntArrayColumn(new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
map.put("doubleCol", new DoubleArrayColumn(new double[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
map.put("objectCol", new ObjectArrayColumn(
new String[]{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"},
ColumnType.STRING
)
);
RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(map));
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
final RowsAndColumns results = agger.aggregateAll(
new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, false, 0),
new AggregatorFactory[]{
new LongMaxAggregatorFactory("cummMax", "intCol"),
new DoubleSumAggregatorFactory("cummSum", "doubleCol")
}
);
new RowsAndColumnsHelper()
.expectColumn("intCol", new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
.expectColumn("doubleCol", new double[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
.expectColumn("objectCol", new String[]{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}, ColumnType.STRING)
.expectColumn("cummMax", new long[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
.expectColumn("cummSum", new double[]{0, 1, 3, 6, 10, 15, 21, 28, 36, 45})
.allColumnsRegistered()
.validate(results);
}
@Test
public void testReverseCumulativeAggregation()
{
Map<String, Column> map = new LinkedHashMap<>();
map.put("intCol", new IntArrayColumn(new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
map.put("doubleCol", new DoubleArrayColumn(new double[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
map.put("objectCol", new ObjectArrayColumn(
new String[]{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"},
ColumnType.STRING
)
);
RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(map));
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
final RowsAndColumns results = agger.aggregateAll(
new WindowFrame(WindowFrame.PeerType.ROWS, false, 0, true, 0),
new AggregatorFactory[]{
new LongMaxAggregatorFactory("cummMax", "intCol"),
new DoubleSumAggregatorFactory("cummSum", "doubleCol")
}
);
new RowsAndColumnsHelper()
.expectColumn("intCol", new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
.expectColumn("doubleCol", new double[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
.expectColumn("objectCol", new String[]{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}, ColumnType.STRING)
.expectColumn("cummMax", new long[]{9, 9, 9, 9, 9, 9, 9, 9, 9, 9})
.expectColumn("cummSum", new double[]{45, 45, 44, 42, 39, 35, 30, 24, 17, 9})
.allColumnsRegistered()
.validate(results);
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.semantic;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.function.Function;
public class OnHeapAggregatableTest extends SemanticTestBase
{
public OnHeapAggregatableTest(
String name,
Function<MapOfColumnsRowsAndColumns, RowsAndColumns> fn
)
{
super(name, fn);
}
@Test
public void testOnHeapAggregatable()
{
RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(
ImmutableMap.of(
"incremented", new IntArrayColumn(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
"zeroesOut", new IntArrayColumn(new int[]{4, -4, 3, -3, 4, 82, -90, 4, 0, 0})
)
));
OnHeapAggregatable agger = rac.as(OnHeapAggregatable.class);
if (agger == null) {
agger = new DefaultOnHeapAggregatable(rac);
}
final ArrayList<Object> results = agger.aggregateAll(Arrays.asList(
new LongSumAggregatorFactory("incremented", "incremented"),
new LongMaxAggregatorFactory("zeroesOutMax", "zeroesOut"),
new LongMinAggregatorFactory("zeroesOutMin", "zeroesOut")
));
Assert.assertEquals(3, results.size());
Assert.assertEquals(55L, results.get(0));
Assert.assertEquals(82L, results.get(1));
Assert.assertEquals(-90L, results.get(2));
}
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.semantic;
import com.google.common.collect.FluentIterable;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.NoAsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumnsTestBase;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.function.Function;
/**
* This base class exists to provide standard parameterization for Semantic interfaces. The idea is that the test
* will be fed a function that can be used to build a RowsAndColumns and then the test should do whatever it
* needs with the RowsAndColumns. By extending this base class, the test will end up running against every
* independent implementation of RowsAndColumns that has been registered with {@link RowsAndColumnsTestBase}.
*/
@RunWith(Parameterized.class)
public abstract class SemanticTestBase
{
static {
NullHandling.initializeForTests();
}
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> parameterFeed()
{
return FluentIterable.from(RowsAndColumnsTestBase.getMakers())
.transformAndConcat(input -> {
final String name = ((Class<?>) input[0]).getSimpleName();
return Arrays.asList(
new Object[]{name, input[1]},
new Object[]{"NoAs-" + name, wrapNoAs(input[1])}
);
});
}
private final String name;
private final Function<MapOfColumnsRowsAndColumns, RowsAndColumns> fn;
public SemanticTestBase(
String name,
Function<MapOfColumnsRowsAndColumns, RowsAndColumns> fn
)
{
this.name = name;
this.fn = fn;
}
public RowsAndColumns make(MapOfColumnsRowsAndColumns rac)
{
try {
return fn.apply(rac);
}
catch (RuntimeException e) {
throw new RE(e, "using name[%s]", name);
}
}
@SuppressWarnings("unchecked")
private static Function<MapOfColumnsRowsAndColumns, RowsAndColumns> wrapNoAs(Object obj)
{
return ((Function<MapOfColumnsRowsAndColumns, RowsAndColumns>) obj).andThen(NoAsRowsAndColumns::new);
}
}

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.semantic;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
public class SortedGroupPartitionerTest extends SemanticTestBase
{
public SortedGroupPartitionerTest(
String name,
Function<MapOfColumnsRowsAndColumns, RowsAndColumns> fn
)
{
super(name, fn);
}
@Test
public void testDefaultSortedGroupPartitioner()
{
RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(
ImmutableMap.of(
"sorted", new IntArrayColumn(new int[]{0, 0, 0, 1, 1, 2, 4, 4, 4}),
"unsorted", new IntArrayColumn(new int[]{3, 54, 21, 1, 5, 54, 2, 3, 92})
)
));
SortedGroupPartitioner parter = rac.as(SortedGroupPartitioner.class);
if (parter == null) {
parter = new DefaultSortedGroupPartitioner(rac);
}
int[] expectedBounds = new int[]{0, 3, 5, 6, 9};
List<RowsAndColumnsHelper> expectations = Arrays.asList(
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{0, 0, 0})
.expectColumn("unsorted", new int[]{3, 54, 21})
.allColumnsRegistered(),
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{1, 1})
.expectColumn("unsorted", new int[]{1, 5})
.allColumnsRegistered(),
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{2})
.expectColumn("unsorted", new int[]{54})
.allColumnsRegistered(),
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{4, 4, 4})
.expectColumn("unsorted", new int[]{2, 3, 92})
.allColumnsRegistered()
);
final List<String> partCols = Collections.singletonList("sorted");
Assert.assertArrayEquals(expectedBounds, parter.computeBoundaries(partCols));
final Iterator<RowsAndColumns> partedChunks = parter.partitionOnBoundaries(partCols).iterator();
for (RowsAndColumnsHelper expectation : expectations) {
Assert.assertTrue(partedChunks.hasNext());
expectation.validate(partedChunks.next());
}
Assert.assertFalse(partedChunks.hasNext());
boolean exceptionThrown = false;
try {
parter.partitionOnBoundaries(Collections.singletonList("unsorted"));
}
catch (ISE ex) {
Assert.assertEquals("Pre-sorted data required, rows[1] and [2] were not in order", ex.getMessage());
exceptionThrown = true;
}
Assert.assertTrue(exceptionThrown);
}
}

View File

@ -27,8 +27,10 @@ import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexWindowBound;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
@ -36,7 +38,8 @@ import org.apache.druid.query.operator.OperatorFactory;
import org.apache.druid.query.operator.WindowOperatorFactory;
import org.apache.druid.query.operator.window.ComposingProcessor;
import org.apache.druid.query.operator.window.Processor;
import org.apache.druid.query.operator.window.WindowAggregateProcessor;
import org.apache.druid.query.operator.window.WindowFrame;
import org.apache.druid.query.operator.window.WindowFramedAggregateProcessor;
import org.apache.druid.query.operator.window.ranking.WindowCumeDistProcessor;
import org.apache.druid.query.operator.window.ranking.WindowDenseRankProcessor;
import org.apache.druid.query.operator.window.ranking.WindowPercentileProcessor;
@ -71,11 +74,17 @@ public class Windowing
.put("LEAD", (agg) -> new WindowOffsetProcessor(agg.getColumn(0), agg.getOutputName(), agg.getConstantInt(1)))
.put("FIRST_VALUE", (agg) -> new WindowFirstProcessor(agg.getColumn(0), agg.getOutputName()))
.put("LAST_VALUE", (agg) -> new WindowLastProcessor(agg.getColumn(0), agg.getOutputName()))
.put("CUME_DIST", (agg) -> new WindowCumeDistProcessor(agg.getOrderingColumns(), agg.getOutputName()))
.put("DENSE_RANK", (agg) -> new WindowDenseRankProcessor(agg.getOrderingColumns(), agg.getOutputName()))
.put("CUME_DIST", (agg) -> new WindowCumeDistProcessor(agg.getGroup().getOrderingColumns(), agg.getOutputName()))
.put(
"DENSE_RANK",
(agg) -> new WindowDenseRankProcessor(agg.getGroup().getOrderingColumns(), agg.getOutputName())
)
.put("NTILE", (agg) -> new WindowPercentileProcessor(agg.getOutputName(), agg.getConstantInt(0)))
.put("PERCENT_RANK", (agg) -> new WindowRankProcessor(agg.getOrderingColumns(), agg.getOutputName(), true))
.put("RANK", (agg) -> new WindowRankProcessor(agg.getOrderingColumns(), agg.getOutputName(), false))
.put(
"PERCENT_RANK",
(agg) -> new WindowRankProcessor(agg.getGroup().getOrderingColumns(), agg.getOutputName(), true)
)
.put("RANK", (agg) -> new WindowRankProcessor(agg.getGroup().getOrderingColumns(), agg.getOutputName(), false))
.put("ROW_NUMBER", (agg) -> new WindowRowNumberProcessor(agg.getOutputName()))
.build();
private final List<OperatorFactory> ops;
@ -90,43 +99,23 @@ public class Windowing
{
final Window window = Preconditions.checkNotNull(partialQuery.getWindow(), "window");
// TODO(gianm): insert sorts and split the groups up at the rule stage; by this time, we assume there's one
// window and the dataset is already sorted appropriately.
// Right now, we assume that there is only a single grouping as our code cannot handle re-sorting and
// re-partitioning. As we relax that restriction, we will be able to plan multiple different groupings.
if (window.groups.size() != 1) {
plannerContext.setPlanningError("Multiple windows are not supported");
throw new CannotBuildQueryException(window);
}
final Window.Group group = Iterables.getOnlyElement(window.groups);
final WindowGroup group = new WindowGroup(window, Iterables.getOnlyElement(window.groups), rowSignature);
// Window.
// TODO(gianm): Validate order-by keys instead of ignoring them.
// Presently, the order by keys are not validated to ensure that the incoming query has pre-sorted the data
// as required by the window query. This should be done. In order to do it, we will need to know what the
// sub-query that we are running against actually looks like in order to then validate that the data will
// come back in the order expected...
final List<String> partitionColumns = new ArrayList<>();
for (int groupKey : group.keys) {
partitionColumns.add(rowSignature.getColumnName(groupKey));
}
// Frame.
// TODO(gianm): Validate ROWS vs RANGE instead of ignoring it.
// TODO(gianm): Support various other kinds of frames.
if (!group.lowerBound.isUnbounded()) {
plannerContext.setPlanningError("Lower bound [%s] is not supported", group.upperBound);
throw new CannotBuildQueryException(window);
}
final boolean cumulative;
if (group.upperBound.isUnbounded()) {
cumulative = false;
} else if (group.upperBound.isCurrentRow()) {
cumulative = true;
} else {
plannerContext.setPlanningError("Upper bound [%s] is not supported", group.upperBound);
throw new CannotBuildQueryException(window);
}
// Aggregations.
final String outputNamePrefix = Calcites.findUnusedPrefixForDigits("w", rowSignature.getColumnNames());
final List<AggregateCall> aggregateCalls = group.getAggregateCalls(window);
final List<AggregateCall> aggregateCalls = group.getAggregateCalls();
final List<Processor> processors = new ArrayList<>();
final List<AggregatorFactory> aggregations = new ArrayList<>();
@ -149,7 +138,7 @@ public class Windowing
Collections.emptyList(),
aggName,
aggCall,
false // TODO: finalize in a separate operator
false // Windowed aggregations don't currently finalize. This means that sketches won't work as expected.
);
if (aggregation == null
@ -178,11 +167,12 @@ public class Windowing
}
if (!aggregations.isEmpty()) {
if (cumulative) {
processors.add(new WindowAggregateProcessor(null, aggregations));
} else {
processors.add(new WindowAggregateProcessor(aggregations, null));
}
processors.add(
new WindowFramedAggregateProcessor(
group.getWindowFrame(),
aggregations.toArray(new AggregatorFactory[0])
)
);
}
if (processors.isEmpty()) {
@ -190,7 +180,7 @@ public class Windowing
}
final List<OperatorFactory> ops = Arrays.asList(
new NaivePartitioningOperatorFactory(partitionColumns),
new NaivePartitioningOperatorFactory(group.getPartitionColumns()),
new WindowOperatorFactory(
processors.size() == 1 ?
processors.get(0) : new ComposingProcessor(processors.toArray(new Processor[0]))
@ -229,6 +219,68 @@ public class Windowing
Processor make(WindowAggregate agg);
}
private static class WindowGroup
{
private final Window window;
private final RowSignature sig;
private final Window.Group group;
public WindowGroup(Window window, Window.Group group, RowSignature sig)
{
this.window = window;
this.sig = sig;
this.group = group;
}
public ArrayList<String> getPartitionColumns()
{
final ArrayList<String> retVal = new ArrayList<>();
for (int groupKey : group.keys) {
retVal.add(sig.getColumnName(groupKey));
}
return retVal;
}
public ArrayList<String> getOrderingColumns()
{
final List<RelFieldCollation> fields = group.orderKeys.getFieldCollations();
ArrayList<String> retVal = new ArrayList<>(fields.size());
for (RelFieldCollation field : fields) {
retVal.add(sig.getColumnName(field.getFieldIndex()));
}
return retVal;
}
public List<AggregateCall> getAggregateCalls()
{
return group.getAggregateCalls(window);
}
public WindowFrame getWindowFrame()
{
return new WindowFrame(
WindowFrame.PeerType.ROWS,
group.lowerBound.isUnbounded(),
figureOutOffset(group.lowerBound),
group.upperBound.isUnbounded(),
figureOutOffset(group.upperBound)
);
}
private int figureOutOffset(RexWindowBound bound)
{
if (bound.isUnbounded() || bound.isCurrentRow()) {
return 0;
}
return getConstant(((RexInputRef) bound.getOffset()).getIndex());
}
private int getConstant(int refIndex)
{
return ((Number) window.constants.get(refIndex - sig.size()).getValue()).intValue();
}
}
private static class WindowAggregate
{
private final String outputName;
@ -237,7 +289,7 @@ public class Windowing
private final PlannerContext context;
private final Project project;
private final List<RexLiteral> constants;
private final Window.Group group;
private final WindowGroup group;
private WindowAggregate(
String outputName,
@ -246,7 +298,7 @@ public class Windowing
PlannerContext context,
Project project,
List<RexLiteral> constants,
Window.Group group
WindowGroup group
)
{
this.outputName = outputName;
@ -267,14 +319,9 @@ public class Windowing
return outputName;
}
public ArrayList<String> getOrderingColumns()
public WindowGroup getGroup()
{
final List<RelFieldCollation> fields = group.orderKeys.getFieldCollations();
ArrayList<String> retVal = new ArrayList<>(fields.size());
for (RelFieldCollation field : fields) {
retVal.add(sig.getColumnName(field.getFieldIndex()));
}
return retVal;
return group;
}
public String getColumn(int argPosition)
@ -286,8 +333,7 @@ public class Windowing
public RexLiteral getConstantArgument(int argPosition)
{
final Integer constantIndex = call.getArgList().get(argPosition) - sig.size();
return constants.get(constantIndex);
return constants.get(call.getArgList().get(argPosition) - sig.size());
}
public int getConstantInt(int argPosition)

View File

@ -24,9 +24,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.collect.ImmutableMap;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import junitparams.naming.TestCaseName;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.ISE;
@ -36,10 +33,10 @@ import org.apache.druid.query.operator.OperatorFactory;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
@ -48,11 +45,12 @@ import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.regex.Pattern;
@RunWith(JUnitParamsRunner.class)
/**
* These tests are file-based, look in resources -> calcite/tests/window for the set of test specifications.
*/
@RunWith(Parameterized.class)
public class CalciteWindowQueryTest extends BaseCalciteQueryTest
{
@ -66,10 +64,8 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
private static final ObjectMapper YAML_JACKSON = new DefaultObjectMapper(new YAMLFactory(), "tests");
private static final AtomicLong EXPECTED_TESTS = new AtomicLong();
private static final AtomicLong TEST_COUNTER = new AtomicLong();
public Object parametersForWindowQueryTest() throws Exception
@Parameterized.Parameters(name = "{0}")
public static Object parametersForWindowQueryTest() throws Exception
{
final URL windowFolderUrl = ClassLoader.getSystemResource("calcite/tests/window");
File windowFolder = new File(windowFolderUrl.toURI());
@ -77,31 +73,25 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
final File[] listedFiles = windowFolder.listFiles(
pathname -> pathname.getName().toLowerCase(Locale.ROOT).endsWith(".sqltest")
);
EXPECTED_TESTS.set(listedFiles.length);
Pattern matcher = Pattern.compile(".*");
return Arrays
.stream(Objects.requireNonNull(listedFiles))
.map(File::getName)
.filter(matcher.asPredicate())
.toArray();
}
@AfterClass
public static void testRanAllTests()
private final String filename;
public CalciteWindowQueryTest(
String filename
)
{
// This validation exists to catch issues with the filter Pattern accidentally getting checked in. It validates
// that we ran all of the tests from the directory. If this is failing, most likely, the filter Pattern in
// parametersForWindowQueryTest accidentally got checked in as something other than ".*"
Assert.assertEquals(EXPECTED_TESTS.get(), TEST_COUNTER.get());
this.filename = filename;
}
@Test
@Parameters(method = "parametersForWindowQueryTest")
@SuppressWarnings("unchecked")
@TestCaseName("{0}")
public void windowQueryTest(String filename) throws IOException
public void windowQueryTest() throws IOException
{
final Function<String, String> stringManipulator;
if (NullHandling.sqlCompatible()) {
@ -110,7 +100,6 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
stringManipulator = Function.identity();
}
TEST_COUNTER.incrementAndGet();
final URL systemResource = ClassLoader.getSystemResource("calcite/tests/window/" + filename);
final Object objectFromYaml = YAML_JACKSON.readValue(systemResource.openStream(), Object.class);

View File

@ -1,25 +1,26 @@
type: "operatorValidation"
sql: "
SELECT
FLOOR(__time TO DAY) t,
SUM(cnt) c,
SUM(SUM(cnt)) OVER (ORDER BY FLOOR(__time TO DAY)) cc
FROM foo
GROUP BY FLOOR(__time TO DAY)"
sql: |
SELECT
FLOOR(__time TO DAY) t,
SUM(cnt) c,
SUM(SUM(cnt)) OVER (ORDER BY FLOOR(__time TO DAY)) cc
FROM foo
GROUP BY FLOOR(__time TO DAY)
expectedOperators:
- { type: "naivePartition", partitionColumns: [ ] }
- type: "window"
processor:
type: "aggregate"
cumulativeAggregations:
type: "framedAgg"
frame: { peerType: "ROWS", lowUnbounded: true, lowOffset: 0, uppUnbounded: false, uppOffset: 0 }
aggregations:
- { type: "longSum", name: "w0", fieldName: "a0" }
expectedResults:
- [946684800000, 1, 1]
- [946771200000, 1, 2]
- [946857600000, 1, 3]
- [978307200000, 1, 4]
- [978393600000, 1, 5]
- [978480000000, 1, 6]
- [ 946684800000, 1, 1 ]
- [ 946771200000, 1, 2 ]
- [ 946857600000, 1, 3 ]
- [ 978307200000, 1, 4 ]
- [ 978393600000, 1, 5 ]
- [ 978480000000, 1, 6 ]

File diff suppressed because it is too large Load Diff