mirror of https://github.com/apache/druid.git
Make WindowFrames more specific (#16741)
Changes the WindowFrame internals / representation a bit; introduces dedicated frametypes for rows and groups which corresponds to the implemented processing methods
This commit is contained in:
parent
ca787885c9
commit
7e3fab5bf9
|
@ -246,11 +246,8 @@ Druid has guardrail logic to prevent you from executing window function queries
|
|||
|
||||
For example:
|
||||
- You cannot set expressions as bounds for window frames.
|
||||
- You cannot use two FOLLOWING expressions in the window frame. For example: `ROWS BETWEEN 2 ROWS FOLLOWING and 3 ROWS FOLLOWING`.
|
||||
- You can only use a RANGE frames when both endpoints are unbounded or current row.
|
||||
|
||||
If you write a query that violates one of these conditions, Druid throws an error: "The query contains a window frame which may return incorrect results. To disregard this warning, set `windowingStrictValidation` to false in the query context."
|
||||
|
||||
## Window function reference
|
||||
|
||||
|Function|Notes|
|
||||
|
|
|
@ -106,7 +106,7 @@ public class MSQWindowTest extends MSQTestBase
|
|||
.build();
|
||||
|
||||
|
||||
final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null);
|
||||
final WindowFrame theFrame = WindowFrame.unbounded();
|
||||
final AggregatorFactory[] theAggs = {
|
||||
new DoubleSumAggregatorFactory("w0", "d0")
|
||||
};
|
||||
|
@ -196,7 +196,7 @@ public class MSQWindowTest extends MSQTestBase
|
|||
.build();
|
||||
|
||||
|
||||
final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null);
|
||||
final WindowFrame theFrame = WindowFrame.unbounded();
|
||||
final AggregatorFactory[] theAggs = {
|
||||
new DoubleSumAggregatorFactory("w0", "d1")
|
||||
};
|
||||
|
@ -306,7 +306,7 @@ public class MSQWindowTest extends MSQTestBase
|
|||
.build();
|
||||
|
||||
|
||||
final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null);
|
||||
final WindowFrame theFrame = WindowFrame.unbounded();
|
||||
final AggregatorFactory[] theAggs = {
|
||||
new DoubleSumAggregatorFactory("w0", "d1")
|
||||
};
|
||||
|
@ -419,7 +419,7 @@ public class MSQWindowTest extends MSQTestBase
|
|||
.build();
|
||||
|
||||
|
||||
final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null);
|
||||
final WindowFrame theFrame = WindowFrame.unbounded();
|
||||
final AggregatorFactory[] theAggs = {
|
||||
new DoubleSumAggregatorFactory("w0", "d0")
|
||||
};
|
||||
|
@ -523,7 +523,7 @@ public class MSQWindowTest extends MSQTestBase
|
|||
.build();
|
||||
|
||||
|
||||
final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null);
|
||||
final WindowFrame theFrame = WindowFrame.unbounded();
|
||||
final AggregatorFactory[] theAggs = {
|
||||
new DoubleSumAggregatorFactory("w0", "d0")
|
||||
};
|
||||
|
@ -589,7 +589,7 @@ public class MSQWindowTest extends MSQTestBase
|
|||
.add("cc", ColumnType.DOUBLE)
|
||||
.build();
|
||||
|
||||
final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null);
|
||||
final WindowFrame theFrame = WindowFrame.unbounded();
|
||||
final AggregatorFactory[] theAggs = {
|
||||
new DoubleSumAggregatorFactory("w0", "m1")
|
||||
};
|
||||
|
@ -654,7 +654,7 @@ public class MSQWindowTest extends MSQTestBase
|
|||
.add("cc", ColumnType.DOUBLE)
|
||||
.build();
|
||||
|
||||
final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null);
|
||||
final WindowFrame theFrame = WindowFrame.unbounded();
|
||||
final AggregatorFactory[] theAggs = {
|
||||
new DoubleSumAggregatorFactory("w0", "m1")
|
||||
};
|
||||
|
@ -725,7 +725,7 @@ public class MSQWindowTest extends MSQTestBase
|
|||
.add("cc", ColumnType.DOUBLE)
|
||||
.build();
|
||||
|
||||
final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null);
|
||||
final WindowFrame theFrame = WindowFrame.unbounded();
|
||||
final AggregatorFactory[] theAggs = {
|
||||
new DoubleSumAggregatorFactory("w0", "m1")
|
||||
};
|
||||
|
@ -793,7 +793,7 @@ public class MSQWindowTest extends MSQTestBase
|
|||
.add("cc", ColumnType.DOUBLE)
|
||||
.build();
|
||||
|
||||
final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null);
|
||||
final WindowFrame theFrame = WindowFrame.unbounded();
|
||||
final AggregatorFactory[] theAggs = {
|
||||
new DoubleSumAggregatorFactory("w0", "d1")
|
||||
};
|
||||
|
@ -878,7 +878,7 @@ public class MSQWindowTest extends MSQTestBase
|
|||
.add("cc", ColumnType.DOUBLE)
|
||||
.build();
|
||||
|
||||
final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null);
|
||||
final WindowFrame theFrame = WindowFrame.unbounded();
|
||||
final AggregatorFactory[] theAggs = {
|
||||
new DoubleSumAggregatorFactory("w0", "m1")
|
||||
};
|
||||
|
@ -951,7 +951,7 @@ public class MSQWindowTest extends MSQTestBase
|
|||
.add("cc", ColumnType.DOUBLE)
|
||||
.build();
|
||||
|
||||
final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null);
|
||||
final WindowFrame theFrame = WindowFrame.unbounded();
|
||||
final AggregatorFactory[] theAggs = {
|
||||
new DoubleSumAggregatorFactory("w0", "m1")
|
||||
};
|
||||
|
@ -1028,17 +1028,7 @@ public class MSQWindowTest extends MSQTestBase
|
|||
.add("m2", ColumnType.DOUBLE)
|
||||
.build();
|
||||
|
||||
final WindowFrame theFrame = new WindowFrame(
|
||||
WindowFrame.PeerType.RANGE,
|
||||
true,
|
||||
0,
|
||||
false,
|
||||
0,
|
||||
ImmutableList.of(new ColumnWithDirection(
|
||||
"m1",
|
||||
ColumnWithDirection.Direction.ASC
|
||||
))
|
||||
);
|
||||
final WindowFrame theFrame = WindowFrame.forOrderBy("m1");
|
||||
final AggregatorFactory[] theAggs = {
|
||||
new DoubleSumAggregatorFactory("w0", "m1")
|
||||
};
|
||||
|
@ -1142,14 +1132,7 @@ public class MSQWindowTest extends MSQTestBase
|
|||
.add("m2", ColumnType.DOUBLE)
|
||||
.build();
|
||||
|
||||
final WindowFrame theFrame = new WindowFrame(
|
||||
WindowFrame.PeerType.ROWS,
|
||||
true,
|
||||
0,
|
||||
true,
|
||||
0,
|
||||
null
|
||||
);
|
||||
final WindowFrame theFrame = WindowFrame.unbounded();
|
||||
final AggregatorFactory[] theAggs = {
|
||||
new DoubleSumAggregatorFactory("w0", "m1")
|
||||
};
|
||||
|
@ -1233,7 +1216,7 @@ public class MSQWindowTest extends MSQTestBase
|
|||
.add("cc", ColumnType.DOUBLE)
|
||||
.build();
|
||||
|
||||
final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null);
|
||||
final WindowFrame theFrame = WindowFrame.unbounded();
|
||||
final AggregatorFactory[] theAggs = {
|
||||
new DoubleSumAggregatorFactory("w0", "m1")
|
||||
};
|
||||
|
@ -1322,14 +1305,7 @@ public class MSQWindowTest extends MSQTestBase
|
|||
.add("d3", ColumnType.STRING)
|
||||
.build();
|
||||
|
||||
final WindowFrame theFrame = new WindowFrame(
|
||||
WindowFrame.PeerType.ROWS,
|
||||
true,
|
||||
0,
|
||||
true,
|
||||
0,
|
||||
null
|
||||
);
|
||||
final WindowFrame theFrame = WindowFrame.unbounded();
|
||||
final AggregatorFactory[] theAggs = {
|
||||
new DoubleSumAggregatorFactory("w0", "m1")
|
||||
};
|
||||
|
@ -1412,14 +1388,7 @@ public class MSQWindowTest extends MSQTestBase
|
|||
.add("d3", ColumnType.STRING)
|
||||
.build();
|
||||
|
||||
final WindowFrame theFrame = new WindowFrame(
|
||||
WindowFrame.PeerType.ROWS,
|
||||
true,
|
||||
0,
|
||||
true,
|
||||
0,
|
||||
null
|
||||
);
|
||||
final WindowFrame theFrame = WindowFrame.unbounded();
|
||||
final AggregatorFactory[] theAggs = {
|
||||
new DoubleSumAggregatorFactory("w0", "m1")
|
||||
};
|
||||
|
@ -1796,7 +1765,7 @@ public class MSQWindowTest extends MSQTestBase
|
|||
.add("cc", ColumnType.LONG)
|
||||
.build();
|
||||
|
||||
final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null);
|
||||
final WindowFrame theFrame = WindowFrame.unbounded();
|
||||
final AggregatorFactory[] theAggs = {
|
||||
new LongSumAggregatorFactory("w0", "added")
|
||||
};
|
||||
|
@ -1887,7 +1856,7 @@ public class MSQWindowTest extends MSQTestBase
|
|||
.add("cc", ColumnType.LONG)
|
||||
.build();
|
||||
|
||||
final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null);
|
||||
final WindowFrame theFrame = WindowFrame.unbounded();
|
||||
final AggregatorFactory[] theAggs = {
|
||||
new LongSumAggregatorFactory("w0", "added")
|
||||
};
|
||||
|
@ -2001,7 +1970,7 @@ public class MSQWindowTest extends MSQTestBase
|
|||
.build();
|
||||
|
||||
|
||||
final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null);
|
||||
final WindowFrame theFrame = WindowFrame.unbounded();
|
||||
final AggregatorFactory[] theAggs = {
|
||||
new LongSumAggregatorFactory("w0", "d1")
|
||||
};
|
||||
|
|
|
@ -21,159 +21,192 @@ package org.apache.druid.query.operator.window;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.druid.query.operator.ColumnWithDirection;
|
||||
import org.apache.druid.annotations.SubclassesMustOverrideEqualsAndHashCode;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class WindowFrame
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
||||
@JsonSubTypes(value = {
|
||||
@JsonSubTypes.Type(name = "rows", value = WindowFrame.Rows.class),
|
||||
@JsonSubTypes.Type(name = "groups", value = WindowFrame.Groups.class),
|
||||
})
|
||||
@SubclassesMustOverrideEqualsAndHashCode
|
||||
public interface WindowFrame
|
||||
{
|
||||
public static WindowFrame unbounded()
|
||||
static WindowFrame unbounded()
|
||||
{
|
||||
return new WindowFrame(PeerType.ROWS, true, 0, true, 0, null);
|
||||
return rows(null, null);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public enum PeerType
|
||||
static Rows rows(Integer lowerOffset, Integer upperOffset)
|
||||
{
|
||||
ROWS,
|
||||
RANGE
|
||||
return new WindowFrame.Rows(lowerOffset, upperOffset);
|
||||
}
|
||||
|
||||
// Will likely need to add the order by columns to also be able to deal with RANGE peer type.
|
||||
private final PeerType peerType;
|
||||
private final boolean lowerUnbounded;
|
||||
private final int lowerOffset;
|
||||
private final boolean upperUnbounded;
|
||||
private final int upperOffset;
|
||||
private final List<ColumnWithDirection> orderBy;
|
||||
|
||||
@JsonCreator
|
||||
public WindowFrame(
|
||||
@JsonProperty("peerType") PeerType peerType,
|
||||
@JsonProperty("lowUnbounded") boolean lowerUnbounded,
|
||||
@JsonProperty("lowOffset") int lowerOffset,
|
||||
@JsonProperty("uppUnbounded") boolean upperUnbounded,
|
||||
@JsonProperty("uppOffset") int upperOffset,
|
||||
@JsonProperty("orderBy") List<ColumnWithDirection> orderBy
|
||||
)
|
||||
static Groups groups(Integer lowerOffset, Integer upperOffset, List<String> orderByColumns)
|
||||
{
|
||||
this.peerType = peerType;
|
||||
this.lowerUnbounded = lowerUnbounded;
|
||||
this.lowerOffset = lowerOffset;
|
||||
this.upperUnbounded = upperUnbounded;
|
||||
this.upperOffset = upperOffset;
|
||||
this.orderBy = orderBy;
|
||||
return new WindowFrame.Groups(lowerOffset, upperOffset, orderByColumns);
|
||||
}
|
||||
|
||||
@JsonProperty("peerType")
|
||||
public PeerType getPeerType()
|
||||
static WindowFrame forOrderBy(String... orderByColumns)
|
||||
{
|
||||
return peerType;
|
||||
return groups(null, 0, Lists.newArrayList(orderByColumns));
|
||||
}
|
||||
|
||||
@JsonProperty("lowUnbounded")
|
||||
public boolean isLowerUnbounded()
|
||||
abstract class OffsetFrame implements WindowFrame
|
||||
{
|
||||
return lowerUnbounded;
|
||||
}
|
||||
@JsonProperty
|
||||
public final Integer lowerOffset;
|
||||
@JsonProperty
|
||||
public final Integer upperOffset;
|
||||
|
||||
@JsonProperty("lowOffset")
|
||||
public int getLowerOffset()
|
||||
{
|
||||
return lowerOffset;
|
||||
}
|
||||
|
||||
@JsonProperty("uppUnbounded")
|
||||
public boolean isUpperUnbounded()
|
||||
{
|
||||
return upperUnbounded;
|
||||
}
|
||||
|
||||
@JsonProperty("uppOffset")
|
||||
public int getUpperOffset()
|
||||
{
|
||||
return upperOffset;
|
||||
}
|
||||
|
||||
@JsonProperty("orderBy")
|
||||
public List<ColumnWithDirection> getOrderBy()
|
||||
{
|
||||
return orderBy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
@JsonCreator
|
||||
public OffsetFrame(
|
||||
@JsonProperty("lowerOffset") Integer lowerOffset,
|
||||
@JsonProperty("upperOffset") Integer upperOffset)
|
||||
{
|
||||
this.lowerOffset = lowerOffset;
|
||||
this.upperOffset = upperOffset;
|
||||
}
|
||||
if (!(o instanceof WindowFrame)) {
|
||||
return false;
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(lowerOffset, upperOffset);
|
||||
}
|
||||
WindowFrame that = (WindowFrame) o;
|
||||
return lowerUnbounded == that.lowerUnbounded
|
||||
&& lowerOffset == that.lowerOffset
|
||||
&& upperUnbounded == that.upperUnbounded
|
||||
&& upperOffset == that.upperOffset
|
||||
&& peerType == that.peerType
|
||||
&& Objects.equals(orderBy, that.orderBy);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(peerType, lowerUnbounded, lowerOffset, upperUnbounded, upperOffset, orderBy);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "WindowFrame{" +
|
||||
"peerType=" + peerType +
|
||||
", lowerUnbounded=" + lowerUnbounded +
|
||||
", lowerOffset=" + lowerOffset +
|
||||
", upperUnbounded=" + upperUnbounded +
|
||||
", upperOffset=" + upperOffset +
|
||||
", orderBy=" + orderBy +
|
||||
'}';
|
||||
}
|
||||
|
||||
public static WindowFrame forOrderBy(ColumnWithDirection... orderBy)
|
||||
{
|
||||
return new WindowFrame(PeerType.RANGE, true, 0, false, 0, Lists.newArrayList(orderBy));
|
||||
}
|
||||
|
||||
public List<String> getOrderByColNames()
|
||||
{
|
||||
if (orderBy == null) {
|
||||
return Collections.emptyList();
|
||||
/**
|
||||
* Calculates the applicable lower offset if the max number of rows is
|
||||
* known.
|
||||
*/
|
||||
public int getLowerOffsetClamped(int maxRows)
|
||||
{
|
||||
if (lowerOffset == null) {
|
||||
return -maxRows;
|
||||
}
|
||||
return Math.max(-maxRows, lowerOffset);
|
||||
}
|
||||
return orderBy.stream().map(ColumnWithDirection::getColumn).collect(Collectors.toList());
|
||||
|
||||
/**
|
||||
* Calculates the applicable upper offset if the max number of rows is
|
||||
* known.
|
||||
*/
|
||||
public int getUpperOffsetClamped(int maxRows)
|
||||
{
|
||||
if (upperOffset == null) {
|
||||
return maxRows;
|
||||
}
|
||||
return Math.min(maxRows, upperOffset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj)
|
||||
{
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
OffsetFrame other = (OffsetFrame) obj;
|
||||
return Objects.equals(lowerOffset, other.lowerOffset) && Objects.equals(upperOffset, other.upperOffset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public abstract String toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates the applicable lower offset if the max number of rows is known.
|
||||
*/
|
||||
public int getLowerOffsetClamped(int maxRows)
|
||||
class Rows extends OffsetFrame
|
||||
{
|
||||
if (lowerUnbounded) {
|
||||
return -maxRows;
|
||||
@JsonCreator
|
||||
public Rows(
|
||||
@JsonProperty("lowerOffset") Integer lowerOffset,
|
||||
@JsonProperty("upperOffset") Integer upperOffset)
|
||||
{
|
||||
super(lowerOffset, upperOffset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "WindowFrame.Rows ["
|
||||
+ "lowerOffset=" + lowerOffset +
|
||||
", upperOffset=" + upperOffset +
|
||||
"]";
|
||||
}
|
||||
return Math.max(-maxRows, lowerOffset);
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates the applicable upper offset if the max number of rows is known.
|
||||
*/
|
||||
public int getUpperOffsetClamped(int maxRows)
|
||||
class Groups extends OffsetFrame
|
||||
{
|
||||
if (upperUnbounded) {
|
||||
return maxRows;
|
||||
@JsonProperty
|
||||
private final ImmutableList<String> orderByColumns;
|
||||
|
||||
@JsonCreator
|
||||
public Groups(
|
||||
@JsonProperty("lowerOffset") Integer lowerOffset,
|
||||
@JsonProperty("upperOffset") Integer upperOffset,
|
||||
@JsonProperty("orderByColumns") List<String> orderByColumns)
|
||||
{
|
||||
super(lowerOffset, upperOffset);
|
||||
this.orderByColumns = ImmutableList.copyOf(orderByColumns);
|
||||
}
|
||||
return Math.min(maxRows, upperOffset);
|
||||
|
||||
public List<String> getOrderByColumns()
|
||||
{
|
||||
return orderByColumns;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(lowerOffset, orderByColumns, upperOffset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj)
|
||||
{
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
Groups other = (Groups) obj;
|
||||
return Objects.equals(lowerOffset, other.lowerOffset)
|
||||
&& Objects.equals(orderByColumns, other.orderByColumns)
|
||||
&& Objects.equals(upperOffset, other.upperOffset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "WindowFrame.Groups [" +
|
||||
"lowerOffset=" + lowerOffset +
|
||||
", upperOffset=" + upperOffset +
|
||||
", orderByColumns=" + orderByColumns + "]";
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Nullable
|
||||
default <T extends WindowFrame> T unwrap(Class<T> clazz)
|
||||
{
|
||||
if (clazz.isInstance(this)) {
|
||||
return (T) this;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.druid.query.rowsandcols.semantic;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.druid.error.DruidException;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.UOE;
|
||||
import org.apache.druid.query.aggregation.Aggregator;
|
||||
|
@ -28,6 +29,9 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
|
|||
import org.apache.druid.query.dimension.DimensionSpec;
|
||||
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
|
||||
import org.apache.druid.query.operator.window.WindowFrame;
|
||||
import org.apache.druid.query.operator.window.WindowFrame.Groups;
|
||||
import org.apache.druid.query.operator.window.WindowFrame.OffsetFrame;
|
||||
import org.apache.druid.query.operator.window.WindowFrame.Rows;
|
||||
import org.apache.druid.query.rowsandcols.RowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
|
@ -106,22 +110,38 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
|
|||
public static Iterable<AggInterval> buildIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame)
|
||||
{
|
||||
int numRows = rac.numRows();
|
||||
if (frame.getLowerOffsetClamped(numRows) == -numRows && frame.getUpperOffsetClamped(numRows) == numRows) {
|
||||
return buildUnboundedIteratorFor(rac, frame);
|
||||
} else if (frame.getPeerType() == WindowFrame.PeerType.RANGE) {
|
||||
return buildGroupIteratorFor(rac, frame);
|
||||
} else {
|
||||
return buildRowIteratorFor(rac, frame);
|
||||
if (isEffectivelyUnbounded(frame, numRows)) {
|
||||
return buildUnboundedIteratorFor(rac);
|
||||
}
|
||||
Rows rowsFrame = frame.unwrap(WindowFrame.Rows.class);
|
||||
if (rowsFrame != null) {
|
||||
return buildRowIteratorFor(rac, rowsFrame);
|
||||
}
|
||||
Groups groupsFrame = frame.unwrap(WindowFrame.Groups.class);
|
||||
if (groupsFrame != null) {
|
||||
return buildGroupIteratorFor(rac, groupsFrame);
|
||||
}
|
||||
throw DruidException.defensive("Unable to handle WindowFrame [%s]!", frame);
|
||||
}
|
||||
|
||||
private static Iterable<AggInterval> buildUnboundedIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame)
|
||||
private static boolean isEffectivelyUnbounded(WindowFrame frame, int numRows)
|
||||
{
|
||||
int[] groupBoundaries = new int[]{0, rac.numRows()};
|
||||
return new GroupIteratorForWindowFrame(frame, groupBoundaries);
|
||||
OffsetFrame offsetFrame = frame.unwrap(WindowFrame.OffsetFrame.class);
|
||||
if (offsetFrame.getLowerOffsetClamped(numRows) == -numRows
|
||||
&& offsetFrame.getUpperOffsetClamped(numRows) == numRows) {
|
||||
// regardless the actual mode; all rows will be inside the frame!
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static Iterable<AggInterval> buildRowIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame)
|
||||
private static Iterable<AggInterval> buildUnboundedIteratorFor(AppendableRowsAndColumns rac)
|
||||
{
|
||||
int[] groupBoundaries = new int[] {0, rac.numRows()};
|
||||
return new GroupIteratorForWindowFrame(WindowFrame.rows(null, null), groupBoundaries);
|
||||
}
|
||||
|
||||
private static Iterable<AggInterval> buildRowIteratorFor(AppendableRowsAndColumns rac, WindowFrame.Rows frame)
|
||||
{
|
||||
int[] groupBoundaries = new int[rac.numRows() + 1];
|
||||
for (int j = 0; j < groupBoundaries.length; j++) {
|
||||
|
@ -130,9 +150,9 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
|
|||
return new GroupIteratorForWindowFrame(frame, groupBoundaries);
|
||||
}
|
||||
|
||||
private static Iterable<AggInterval> buildGroupIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame)
|
||||
private static Iterable<AggInterval> buildGroupIteratorFor(AppendableRowsAndColumns rac, WindowFrame.Groups frame)
|
||||
{
|
||||
int[] groupBoundaries = ClusteredGroupPartitioner.fromRAC(rac).computeBoundaries(frame.getOrderByColNames());
|
||||
int[] groupBoundaries = ClusteredGroupPartitioner.fromRAC(rac).computeBoundaries(frame.getOrderByColumns());
|
||||
return new GroupIteratorForWindowFrame(frame, groupBoundaries);
|
||||
}
|
||||
|
||||
|
@ -145,7 +165,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
|
|||
// upper exclusive
|
||||
private final int upperOffset;
|
||||
|
||||
public GroupIteratorForWindowFrame(WindowFrame frame, int[] groupBoundaries)
|
||||
public GroupIteratorForWindowFrame(WindowFrame.OffsetFrame frame, int[] groupBoundaries)
|
||||
{
|
||||
this.groupBoundaries = groupBoundaries;
|
||||
numGroups = groupBoundaries.length - 1;
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.operator.window;
|
||||
|
||||
import nl.jqno.equalsverifier.EqualsVerifier;
|
||||
import org.apache.druid.query.operator.window.WindowFrame.OffsetFrame;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
public class WindowFrameTest
|
||||
{
|
||||
@Test
|
||||
public void testEqualsRows()
|
||||
{
|
||||
EqualsVerifier.forClass(WindowFrame.Rows.class)
|
||||
.usingGetClass()
|
||||
.verify();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEqualsGroups()
|
||||
{
|
||||
EqualsVerifier.forClass(WindowFrame.Groups.class)
|
||||
.usingGetClass()
|
||||
.verify();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOffsetFrameUnbounded()
|
||||
{
|
||||
OffsetFrame of = new WindowFrame.Rows(null, null);
|
||||
assertEquals(-100, of.getLowerOffsetClamped(100));
|
||||
assertEquals(100, of.getUpperOffsetClamped(100));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOffsetFrameNormal()
|
||||
{
|
||||
OffsetFrame of = new WindowFrame.Rows(-1, 2);
|
||||
assertEquals(-1, of.getLowerOffsetClamped(100));
|
||||
assertEquals(2, of.getUpperOffsetClamped(100));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOffsetFrameUnbounded2()
|
||||
{
|
||||
OffsetFrame of = new WindowFrame.Rows(-200, 200);
|
||||
assertEquals(-100, of.getLowerOffsetClamped(100));
|
||||
assertEquals(100, of.getUpperOffsetClamped(100));
|
||||
}
|
||||
|
||||
}
|
|
@ -46,7 +46,7 @@ public class WindowFramedAggregateProcessorTest
|
|||
@Test
|
||||
public void testIsPassThruWhenRACReturnsSemanticInterface()
|
||||
{
|
||||
final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, false, 0, null);
|
||||
final WindowFrame theFrame = WindowFrame.rows(null, 0);
|
||||
final AggregatorFactory[] theAggs = {
|
||||
new LongMaxAggregatorFactory("cummMax", "intCol"),
|
||||
new DoubleSumAggregatorFactory("cummSum", "doubleCol")
|
||||
|
@ -78,7 +78,7 @@ public class WindowFramedAggregateProcessorTest
|
|||
@Test
|
||||
public void testDoesStuffWhenNoSemanticInterfacesAvailable()
|
||||
{
|
||||
final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, false, 0, null);
|
||||
final WindowFrame theFrame = WindowFrame.rows(null, 0);
|
||||
final AggregatorFactory[] theAggs = {
|
||||
new LongSumAggregatorFactory("sum", "intCol")
|
||||
};
|
||||
|
|
|
@ -25,10 +25,8 @@ import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
|
|||
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import org.apache.druid.query.operator.ColumnWithDirection;
|
||||
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
|
||||
import org.apache.druid.query.operator.window.WindowFrame;
|
||||
import org.apache.druid.query.operator.window.WindowFrame.PeerType;
|
||||
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.RowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.column.Column;
|
||||
|
@ -65,7 +63,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
|
||||
|
||||
final RowsAndColumns results = agger.aggregateAll(
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, false, 0, false, 0, null),
|
||||
WindowFrame.rows(0, 0),
|
||||
new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory("sumFromLong", "intCol"),
|
||||
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
|
||||
|
@ -91,7 +89,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
|
||||
|
||||
final RowsAndColumns results = agger.aggregateAll(
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, false, -1, false, 2, null),
|
||||
WindowFrame.rows(-1, 2),
|
||||
new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory("sumFromLong", "intCol"),
|
||||
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
|
||||
|
@ -117,7 +115,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
|
||||
|
||||
final RowsAndColumns results = agger.aggregateAll(
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, false, 0, false, 2, null),
|
||||
WindowFrame.rows(0, 2),
|
||||
new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory("sumFromLong", "intCol"),
|
||||
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
|
||||
|
@ -143,7 +141,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
|
||||
|
||||
final RowsAndColumns results = agger.aggregateAll(
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, false, -2, false, 0, null),
|
||||
WindowFrame.rows(-2, 0),
|
||||
new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory("sumFromLong", "intCol"),
|
||||
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
|
||||
|
@ -169,7 +167,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
|
||||
|
||||
final RowsAndColumns results = agger.aggregateAll(
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 7, null),
|
||||
WindowFrame.rows(-5, 7),
|
||||
new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory("sumFromLong", "intCol"),
|
||||
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
|
||||
|
@ -197,7 +195,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
|
||||
|
||||
final RowsAndColumns results = agger.aggregateAll(
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 1, null),
|
||||
WindowFrame.rows(-5, 1),
|
||||
new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory("sumFromLong", "intCol"),
|
||||
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
|
||||
|
@ -225,7 +223,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
|
||||
|
||||
final RowsAndColumns results = agger.aggregateAll(
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 0, null),
|
||||
WindowFrame.rows(-5, 0),
|
||||
new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory("sumFromLong", "intCol"),
|
||||
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
|
||||
|
@ -253,7 +251,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
|
||||
|
||||
final RowsAndColumns results = agger.aggregateAll(
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, false, -1, false, 7, null),
|
||||
WindowFrame.rows(-1, 7),
|
||||
new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory("sumFromLong", "intCol"),
|
||||
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
|
||||
|
@ -281,7 +279,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
|
||||
|
||||
final RowsAndColumns results = agger.aggregateAll(
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, false, 0, false, 7, null),
|
||||
WindowFrame.rows(0, 7),
|
||||
new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory("sumFromLong", "intCol"),
|
||||
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
|
||||
|
@ -309,7 +307,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
|
||||
|
||||
final RowsAndColumns results = agger.aggregateAll(
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, false, 0, false, 7, null),
|
||||
WindowFrame.rows(0, 7),
|
||||
new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory("sumFromLong", "intCol"),
|
||||
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
|
||||
|
@ -337,7 +335,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
|
||||
|
||||
final RowsAndColumns results = agger.aggregateAll(
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 0, null),
|
||||
WindowFrame.rows(-5, 0),
|
||||
new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory("sumFromLong", "intCol"),
|
||||
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
|
||||
|
@ -371,7 +369,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
|
||||
|
||||
final RowsAndColumns results = agger.aggregateAll(
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null),
|
||||
WindowFrame.unbounded(),
|
||||
new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory("sumFromLong", "intCol"),
|
||||
new LongSumAggregatorFactory("sumFromDouble", "doubleCol"),
|
||||
|
@ -409,7 +407,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
|
||||
|
||||
final RowsAndColumns results = agger.aggregateAll(
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, false, 0, null),
|
||||
WindowFrame.rows(null, 0),
|
||||
new AggregatorFactory[]{
|
||||
new LongMaxAggregatorFactory("cummMax", "intCol"),
|
||||
new DoubleSumAggregatorFactory("cummSum", "doubleCol")
|
||||
|
@ -443,7 +441,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
|
||||
|
||||
final RowsAndColumns results = agger.aggregateAll(
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, false, 0, true, 0, null),
|
||||
WindowFrame.rows(0, null),
|
||||
new AggregatorFactory[]{
|
||||
new LongMaxAggregatorFactory("cummMax", "intCol"),
|
||||
new DoubleSumAggregatorFactory("cummSum", "doubleCol")
|
||||
|
@ -465,7 +463,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
@Test
|
||||
public void testRangeOrderBy()
|
||||
{
|
||||
WindowFrame frame = WindowFrame.forOrderBy(ColumnWithDirection.ascending("c1"));
|
||||
WindowFrame frame = WindowFrame.forOrderBy("c1");
|
||||
int[] c1Vals = new int[] {0, 0, 0, 1, 1, 1, 2, 2, 2, 2};
|
||||
int[] c2Vals = new int[] {1, 1, 2, 1, 1, 2, 1, 1, 1, 2};
|
||||
int[] resVals = new int[] {4, 4, 4, 8, 8, 8, 13, 13, 13, 13};
|
||||
|
@ -476,14 +474,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
@Test
|
||||
public void testRangeB1()
|
||||
{
|
||||
WindowFrame frame = new WindowFrame(
|
||||
PeerType.RANGE,
|
||||
false,
|
||||
-1,
|
||||
false,
|
||||
0,
|
||||
Collections.singletonList(ColumnWithDirection.ascending("c1"))
|
||||
);
|
||||
WindowFrame frame = WindowFrame.groups(-1, 0, Collections.singletonList("c1"));
|
||||
|
||||
int[] c1Vals = new int[] {0, 1, 2, 2, 3, 4, 5};
|
||||
int[] c2Vals = new int[] {0, 1, 1, 1, 3, 4, 5};
|
||||
|
@ -495,14 +486,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
@Test
|
||||
public void testRangeA1()
|
||||
{
|
||||
WindowFrame frame = new WindowFrame(
|
||||
PeerType.RANGE,
|
||||
false,
|
||||
0,
|
||||
false,
|
||||
1,
|
||||
Collections.singletonList(ColumnWithDirection.ascending("c1"))
|
||||
);
|
||||
WindowFrame frame = WindowFrame.groups(0, 1, Collections.singletonList("c1"));
|
||||
|
||||
int[] c1Vals = new int[] {0, 1, 2, 2, 3, 4, 5};
|
||||
int[] c2Vals = new int[] {0, 1, 1, 1, 3, 4, 5};
|
||||
|
@ -514,14 +498,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
@Test
|
||||
public void testRangeB1A1()
|
||||
{
|
||||
WindowFrame frame = new WindowFrame(
|
||||
PeerType.RANGE,
|
||||
false,
|
||||
-1,
|
||||
false,
|
||||
1,
|
||||
Collections.singletonList(ColumnWithDirection.ascending("c1"))
|
||||
);
|
||||
WindowFrame frame = WindowFrame.groups(-1, 1, Collections.singletonList("c1"));
|
||||
|
||||
int[] c1Vals = new int[] {0, 1, 2, 3, 4, 5};
|
||||
int[] c2Vals = new int[] {0, 1, 2, 3, 4, 5};
|
||||
|
@ -534,14 +511,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
@Test
|
||||
public void testRangeB1A1_2()
|
||||
{
|
||||
WindowFrame frame = new WindowFrame(
|
||||
PeerType.RANGE,
|
||||
false,
|
||||
-1,
|
||||
false,
|
||||
1,
|
||||
Collections.singletonList(ColumnWithDirection.ascending("c1"))
|
||||
);
|
||||
WindowFrame frame = WindowFrame.groups(-1, 1, Collections.singletonList("c1"));
|
||||
|
||||
int[] c1Vals = new int[] {0, 0, 1, 2, 3, 3, 4, 4, 5};
|
||||
int[] c2Vals = new int[] {0, 0, 1, 2, 2, 1, 2, 2, 5};
|
||||
|
@ -553,14 +523,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
@Test
|
||||
public void testRangeB1A2()
|
||||
{
|
||||
WindowFrame frame = new WindowFrame(
|
||||
PeerType.RANGE,
|
||||
false,
|
||||
-1,
|
||||
false,
|
||||
2,
|
||||
Collections.singletonList(ColumnWithDirection.ascending("c1"))
|
||||
);
|
||||
WindowFrame frame = WindowFrame.groups(-1, 2, Collections.singletonList("c1"));
|
||||
|
||||
int[] c1Vals = new int[] {0, 0, 0, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3};
|
||||
int[] c2Vals = new int[] {1, 1, 2, 1, 1, 2, 1, 1, 1, 2, 1, 1, 1};
|
||||
|
|
|
@ -449,19 +449,22 @@ public class Windowing
|
|||
if (group.lowerBound.isUnbounded() && group.upperBound.isUnbounded()) {
|
||||
return WindowFrame.unbounded();
|
||||
}
|
||||
return new WindowFrame(
|
||||
group.isRows ? WindowFrame.PeerType.ROWS : WindowFrame.PeerType.RANGE,
|
||||
group.lowerBound.isUnbounded(),
|
||||
figureOutOffset(group.lowerBound),
|
||||
group.upperBound.isUnbounded(),
|
||||
figureOutOffset(group.upperBound),
|
||||
group.isRows ? null : getOrdering()
|
||||
);
|
||||
if (group.isRows) {
|
||||
return WindowFrame.rows(getBoundAsInteger(group.lowerBound), getBoundAsInteger(group.upperBound));
|
||||
} else {
|
||||
/* Right now we support GROUPS based framing in the native layer;
|
||||
* but the SQL layer doesn't accept that as of now.
|
||||
*/
|
||||
return WindowFrame.groups(getBoundAsInteger(group.lowerBound), getBoundAsInteger(group.upperBound), getOrderingColumNames());
|
||||
}
|
||||
}
|
||||
|
||||
private int figureOutOffset(RexWindowBound bound)
|
||||
private Integer getBoundAsInteger(RexWindowBound bound)
|
||||
{
|
||||
if (bound.isUnbounded() || bound.isCurrentRow()) {
|
||||
if (bound.isUnbounded()) {
|
||||
return null;
|
||||
}
|
||||
if (bound.isCurrentRow()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -15900,7 +15900,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
OperatorFactoryBuilders.naivePartitionOperator(),
|
||||
OperatorFactoryBuilders.windowOperators(
|
||||
OperatorFactoryBuilders.framedAggregateProcessor(
|
||||
WindowFrame.forOrderBy(ColumnWithDirection.ascending("d0")),
|
||||
WindowFrame.forOrderBy("d0"),
|
||||
new LongSumAggregatorFactory("w0", "a0")
|
||||
)
|
||||
)
|
||||
|
|
|
@ -45,8 +45,10 @@ import org.junit.jupiter.params.provider.MethodSource;
|
|||
import java.io.File;
|
||||
import java.net.URL;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
@ -66,6 +68,11 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
|
|||
|
||||
private static final ObjectMapper YAML_JACKSON = new DefaultObjectMapper(new YAMLFactory(), "tests");
|
||||
|
||||
private static final Map<String, Object> DEFAULT_QUERY_CONTEXT = ImmutableMap.of(
|
||||
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
|
||||
QueryContexts.ENABLE_DEBUG, true
|
||||
);
|
||||
|
||||
public static Object[] parametersForWindowQueryTest() throws Exception
|
||||
{
|
||||
final URL windowFolderUrl = ClassLoader.getSystemResource("calcite/tests/window");
|
||||
|
@ -184,6 +191,11 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
|
|||
log.info("Actual results:\n%s", sb.toString());
|
||||
}
|
||||
}
|
||||
|
||||
public Map<? extends String, ? extends Object> getQueryContext()
|
||||
{
|
||||
return input.queryContext == null ? Collections.emptyMap() : input.queryContext;
|
||||
}
|
||||
}
|
||||
|
||||
@MethodSource("parametersForWindowQueryTest")
|
||||
|
@ -199,32 +211,11 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
|
|||
testBuilder()
|
||||
.skipVectorize(true)
|
||||
.sql(testCase.getSql())
|
||||
.queryContext(ImmutableMap.of(
|
||||
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
|
||||
QueryContexts.ENABLE_DEBUG, true
|
||||
))
|
||||
.addCustomVerification(QueryVerification.ofResults(testCase))
|
||||
.run();
|
||||
}
|
||||
}
|
||||
|
||||
@MethodSource("parametersForWindowQueryTest")
|
||||
@ParameterizedTest(name = "{0}")
|
||||
@SuppressWarnings("unchecked")
|
||||
public void windowQueryTestWithCustomContextMaxSubqueryBytes(String filename) throws Exception
|
||||
{
|
||||
TestCase testCase = new TestCase(filename);
|
||||
|
||||
assumeTrue(testCase.getType() != TestType.failingTest);
|
||||
|
||||
if (testCase.getType() == TestType.operatorValidation) {
|
||||
testBuilder()
|
||||
.skipVectorize(true)
|
||||
.sql(testCase.getSql())
|
||||
.queryContext(ImmutableMap.of(QueryContexts.ENABLE_DEBUG, true,
|
||||
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
|
||||
QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000"
|
||||
)
|
||||
.queryContext(
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.putAll(DEFAULT_QUERY_CONTEXT)
|
||||
.putAll(testCase.getQueryContext())
|
||||
.build()
|
||||
)
|
||||
.addCustomVerification(QueryVerification.ofResults(testCase))
|
||||
.run();
|
||||
|
@ -241,10 +232,7 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
|
|||
+ "where countryName in ('Austria', 'Republic of Korea') "
|
||||
+ "and (cityName in ('Vienna', 'Seoul') or cityName is null)\n"
|
||||
+ "group by countryName, cityName, channel")
|
||||
.queryContext(ImmutableMap.of(
|
||||
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
|
||||
QueryContexts.ENABLE_DEBUG, true
|
||||
))
|
||||
.queryContext(DEFAULT_QUERY_CONTEXT)
|
||||
.expectedResults(
|
||||
ResultMatchMode.RELAX_NULLS,
|
||||
ImmutableList.of(
|
||||
|
@ -277,9 +265,13 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
|
|||
failingTest,
|
||||
operatorValidation
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public TestType type;
|
||||
|
||||
@JsonProperty
|
||||
public Map<String, String> queryContext;
|
||||
|
||||
@JsonProperty
|
||||
public String sql;
|
||||
|
||||
|
|
|
@ -15,13 +15,7 @@ expectedOperators:
|
|||
- type: "window"
|
||||
processor:
|
||||
type: "framedAgg"
|
||||
frame:
|
||||
peerType: "ROWS"
|
||||
lowUnbounded: true
|
||||
lowOffset: 0
|
||||
uppUnbounded: true
|
||||
uppOffset: 0
|
||||
orderBy: null
|
||||
frame: { type: "rows" }
|
||||
aggregations:
|
||||
- { "type": "doubleSum", "name": "w1", "fieldName": "_d1" }
|
||||
- type: "naiveSort"
|
||||
|
@ -33,13 +27,7 @@ expectedOperators:
|
|||
- type: "window"
|
||||
processor:
|
||||
type: "framedAgg"
|
||||
frame:
|
||||
peerType: "ROWS"
|
||||
lowUnbounded: true
|
||||
lowOffset: 0
|
||||
uppUnbounded: true
|
||||
uppOffset: 0
|
||||
orderBy: null
|
||||
frame: { type: "rows" }
|
||||
aggregations:
|
||||
- { "type": "doubleSum", "name": "w0", "fieldName": "_d0" }
|
||||
expectedResults:
|
||||
|
|
|
@ -13,7 +13,7 @@ expectedOperators:
|
|||
- type: "window"
|
||||
processor:
|
||||
type: "framedAgg"
|
||||
frame: { peerType: "ROWS", lowUnbounded: true, lowOffset: 0, uppUnbounded: true, uppOffset: 0 }
|
||||
frame: { type: "rows" }
|
||||
aggregations:
|
||||
- { type: "count", name: "w0" }
|
||||
|
||||
|
|
|
@ -19,60 +19,43 @@ expectedOperators:
|
|||
processor:
|
||||
type: "framedAgg"
|
||||
frame:
|
||||
peerType: "ROWS"
|
||||
lowUnbounded: true
|
||||
lowOffset: 0
|
||||
uppUnbounded: false
|
||||
uppOffset: 0
|
||||
orderBy: null
|
||||
type: "rows"
|
||||
upperOffset: 0
|
||||
aggregations:
|
||||
- { type: "count", name: "w0" }
|
||||
- type: "window"
|
||||
processor:
|
||||
type: "framedAgg"
|
||||
frame:
|
||||
peerType: "ROWS"
|
||||
lowUnbounded: false
|
||||
lowOffset: -1
|
||||
uppUnbounded: false
|
||||
uppOffset: 0
|
||||
orderBy: null
|
||||
type: "rows"
|
||||
lowerOffset: -1
|
||||
upperOffset: 0
|
||||
aggregations:
|
||||
- { type: "count", name: "w1" }
|
||||
- type: "window"
|
||||
processor:
|
||||
type: "framedAgg"
|
||||
frame:
|
||||
peerType: "ROWS"
|
||||
lowUnbounded: false
|
||||
lowOffset: 0
|
||||
uppUnbounded: false
|
||||
uppOffset: 0
|
||||
orderBy: null
|
||||
type: "rows"
|
||||
lowerOffset: 0
|
||||
upperOffset: 0
|
||||
aggregations:
|
||||
- { type: "count", name: "w2" }
|
||||
- type: "window"
|
||||
processor:
|
||||
type: "framedAgg"
|
||||
frame:
|
||||
peerType: "ROWS"
|
||||
lowUnbounded: false
|
||||
lowOffset: 0
|
||||
uppUnbounded: false
|
||||
uppOffset: 1
|
||||
orderBy: null
|
||||
type: "rows"
|
||||
lowerOffset: 0
|
||||
upperOffset: 1
|
||||
aggregations:
|
||||
- { type: "count", name: "w3" }
|
||||
- type: "window"
|
||||
processor:
|
||||
type: "framedAgg"
|
||||
frame:
|
||||
peerType: "ROWS"
|
||||
lowUnbounded: false
|
||||
lowOffset: 0
|
||||
uppUnbounded: true
|
||||
uppOffset: 0
|
||||
orderBy: null
|
||||
type: "rows"
|
||||
lowerOffset: 0
|
||||
aggregations:
|
||||
- { type: "count", name: "w4" }
|
||||
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
type: "failingTest"
|
||||
type: "operatorValidation"
|
||||
|
||||
sql: |
|
||||
SELECT
|
||||
m1,
|
||||
COUNT(m1) OVER () cc
|
||||
SUM(m1) OVER () cc
|
||||
FROM druid.foo
|
||||
|
||||
expectedOperators:
|
||||
|
@ -12,18 +12,16 @@ expectedOperators:
|
|||
- type: "window"
|
||||
processor:
|
||||
type: "framedAgg"
|
||||
frame: { peerType: "ROWS", lowUnbounded: true, lowOffset: 0, uppUnbounded: true, uppOffset: 0 }
|
||||
frame: { type: "rows" }
|
||||
aggregations:
|
||||
- type: "filtered"
|
||||
aggregator: {"type":"count","name":"w0"}
|
||||
filter:
|
||||
type: not
|
||||
field: {"type":"null","column":"m1"}
|
||||
name: null
|
||||
- type: doubleSum
|
||||
name: w0
|
||||
fieldName: m1
|
||||
|
||||
expectedResults:
|
||||
- [1.0,6]
|
||||
- [2.0,6]
|
||||
- [3.0,6]
|
||||
- [4.0,6]
|
||||
- [5.0,6]
|
||||
- [6.0,6]
|
||||
- [1.0,21.0]
|
||||
- [2.0,21.0]
|
||||
- [3.0,21.0]
|
||||
- [4.0,21.0]
|
||||
- [5.0,21.0]
|
||||
- [6.0,21.0]
|
||||
|
|
|
@ -12,7 +12,7 @@ expectedOperators:
|
|||
- type: "window"
|
||||
processor:
|
||||
type: "framedAgg"
|
||||
frame: { peerType: "ROWS", lowUnbounded: true, lowOffset: 0, uppUnbounded: true, uppOffset: 0 }
|
||||
frame: { type: rows }
|
||||
aggregations:
|
||||
- type: "doubleSum"
|
||||
name: "w0"
|
||||
|
|
|
@ -1,5 +1,8 @@
|
|||
type: "operatorValidation"
|
||||
|
||||
queryContext:
|
||||
maxSubqueryBytes: 100000
|
||||
|
||||
sql: |
|
||||
SELECT
|
||||
__time
|
||||
|
|
|
@ -14,12 +14,9 @@ expectedOperators:
|
|||
processor:
|
||||
type: "framedAgg"
|
||||
frame:
|
||||
peerType: "RANGE"
|
||||
lowUnbounded: true
|
||||
lowOffset: 0
|
||||
uppUnbounded: false
|
||||
uppOffset: 0
|
||||
orderBy: [ {column: "d0", direction: ASC} ]
|
||||
type: groups
|
||||
upperOffset: 0
|
||||
orderByColumns: [ "d0" ]
|
||||
aggregations:
|
||||
- { type: "longSum", name: "w0", fieldName: "a0" }
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@ expectedOperators:
|
|||
- type: "window"
|
||||
processor:
|
||||
type: "framedAgg"
|
||||
frame: { peerType: "ROWS", lowUnbounded: true, lowOffset: 0, uppUnbounded: true, uppOffset: 0 }
|
||||
frame: { type: "rows" }
|
||||
aggregations:
|
||||
- { type: "doubleMin", name: "w0", fieldName: "_v0" }
|
||||
- { type: "longMin", name: "w1", fieldName: "v1" }
|
||||
|
|
|
@ -16,12 +16,9 @@ expectedOperators:
|
|||
processor:
|
||||
type: "framedAgg"
|
||||
frame:
|
||||
peerType: "ROWS"
|
||||
lowUnbounded: false
|
||||
lowOffset: -3
|
||||
uppUnbounded: false
|
||||
uppOffset: 2
|
||||
orderBy: null
|
||||
type: rows
|
||||
lowerOffset: -3
|
||||
upperOffset: 2
|
||||
aggregations:
|
||||
- { type: "longSum", name: "w0", fieldName: "a0" }
|
||||
- { type: "naiveSort", columns: [ { column: "d1", direction: "ASC" }, { column: "a0", direction: "ASC"} ]}
|
||||
|
|
|
@ -39,14 +39,9 @@ expectedOperators:
|
|||
- { "type": "cumeDist", "group": [ "a0" ], "outputColumn": "w9" }
|
||||
- type: "framedAgg"
|
||||
frame:
|
||||
peerType: "RANGE"
|
||||
lowUnbounded: true
|
||||
lowOffset: 0
|
||||
uppUnbounded: false
|
||||
uppOffset: 0
|
||||
orderBy:
|
||||
- column: a0
|
||||
direction: ASC
|
||||
type: groups
|
||||
upperOffset: 0
|
||||
orderByColumns: [ a0 ]
|
||||
aggregations:
|
||||
- { "type": "longSum", "name": "w0", "fieldName": "a0" }
|
||||
|
||||
|
|
|
@ -15,11 +15,9 @@ expectedOperators:
|
|||
processor:
|
||||
type: "framedAgg"
|
||||
frame:
|
||||
peerType: "ROWS"
|
||||
lowUnbounded: false
|
||||
lowOffset: -3
|
||||
uppUnbounded: false
|
||||
uppOffset: 2
|
||||
type: "rows"
|
||||
lowerOffset: -3
|
||||
upperOffset: 2
|
||||
orderBy: null
|
||||
aggregations:
|
||||
- { type: "longSum", name: "w0", fieldName: "a0" }
|
||||
|
|
|
@ -19,7 +19,7 @@ expectedOperators:
|
|||
- type: "window"
|
||||
processor:
|
||||
type: "framedAgg"
|
||||
frame: { peerType: "ROWS", lowUnbounded: true, lowOffset: 0, uppUnbounded: true, uppOffset: 0 }
|
||||
frame: { type: "rows" }
|
||||
aggregations:
|
||||
- { "type": "longSum", "name": "w0", "fieldName": "a0" }
|
||||
- type: "window"
|
||||
|
|
|
@ -28,7 +28,7 @@ expectedOperators:
|
|||
- { "type": "last", "inputColumn": "a0", "outputColumn": "w2" }
|
||||
- { "type": "percentile", "outputColumn": "w3", "numBuckets": 3 }
|
||||
- type: "framedAgg"
|
||||
frame: { peerType: "ROWS", lowUnbounded: true, lowOffset: 0, uppUnbounded: true, uppOffset: 0 }
|
||||
frame: { type: "rows" }
|
||||
aggregations:
|
||||
- { "type": "longSum", "name": "w0", "fieldName": "a0" }
|
||||
- type: "window"
|
||||
|
|
|
@ -19,7 +19,7 @@ expectedOperators:
|
|||
- type: "window"
|
||||
processor:
|
||||
type: "framedAgg"
|
||||
frame: { peerType: "ROWS", lowUnbounded: true, lowOffset: 0, uppUnbounded: true, uppOffset: 0 }
|
||||
frame: { type: rows }
|
||||
aggregations:
|
||||
- { type: "longMin", name: "w0", fieldName: "l2" }
|
||||
- type: "naiveSort"
|
||||
|
@ -31,7 +31,7 @@ expectedOperators:
|
|||
- type: "window"
|
||||
processor:
|
||||
type: "framedAgg"
|
||||
frame: { peerType: "RANGE", lowUnbounded: true, lowOffset: 0, uppUnbounded: false, uppOffset: 0, orderBy: [{ column: l1, direction: ASC }] }
|
||||
frame: { type: groups, upperOffset: 0, orderByColumns: [ l1 ] }
|
||||
aggregations:
|
||||
- { type: "longMin", name: "w1", fieldName: "l2" }
|
||||
|
||||
|
|
Loading…
Reference in New Issue