mirror of https://github.com/apache/druid.git
Make use group iterator for all window frames & support for same bound kinds (#16603)
Fixes apache/druid#15739
This commit is contained in:
parent
0fe6a2af68
commit
990fd5f5fb
|
@ -161,9 +161,9 @@ public class WindowFrame
|
|||
public int getLowerOffsetClamped(int maxRows)
|
||||
{
|
||||
if (lowerUnbounded) {
|
||||
return maxRows;
|
||||
return -maxRows;
|
||||
}
|
||||
return Math.min(maxRows, lowerOffset);
|
||||
return Math.max(-maxRows, lowerOffset);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.druid.query.dimension.DimensionSpec;
|
|||
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
|
||||
import org.apache.druid.query.operator.window.WindowFrame;
|
||||
import org.apache.druid.query.rowsandcols.RowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.column.ConstantObjectColumn;
|
||||
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.ColumnValueSelector;
|
||||
|
@ -61,34 +60,15 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
|
|||
AggregatorFactory[] aggFactories
|
||||
)
|
||||
{
|
||||
if (frame.isLowerUnbounded() && frame.isUpperUnbounded()) {
|
||||
return computeUnboundedAggregates(aggFactories);
|
||||
}
|
||||
|
||||
if (frame.getPeerType() == WindowFrame.PeerType.ROWS) {
|
||||
if (frame.isLowerUnbounded()) {
|
||||
return computeCumulativeAggregates(aggFactories, frame.getUpperOffset());
|
||||
} else if (frame.isUpperUnbounded()) {
|
||||
return computeReverseCumulativeAggregates(aggFactories, frame.getLowerOffset());
|
||||
} else {
|
||||
final int numRows = rac.numRows();
|
||||
int lowerOffset = frame.getLowerOffset();
|
||||
int upperOffset = frame.getUpperOffset();
|
||||
|
||||
if (numRows < lowerOffset + upperOffset + 1) {
|
||||
// In this case, there are not enough rows to completely build up the full window aperture before it needs to
|
||||
// also start contracting the aperture because of the upper offset. So we use a method that specifically
|
||||
// handles checks for both expanding and reducing the aperture on every iteration.
|
||||
return aggregateWindowApertureInFlux(aggFactories, lowerOffset, upperOffset);
|
||||
} else {
|
||||
// In this case, there are 3 distinct phases that allow us to loop with less
|
||||
// branches, so we have a method that specifically does that.
|
||||
return aggregateWindowApertureWellBehaved(aggFactories, lowerOffset, upperOffset);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return computeGroupAggregates(aggFactories, frame);
|
||||
Iterable<AggInterval> groupIterator = buildIteratorFor(rac, frame);
|
||||
ResultPopulator resultRac = new ResultPopulator(aggFactories, rac.numRows());
|
||||
AggIntervalCursor aggCursor = new AggIntervalCursor(rac, aggFactories);
|
||||
for (AggInterval aggInterval : groupIterator) {
|
||||
aggCursor.moveTo(aggInterval.inputRows);
|
||||
resultRac.write(aggInterval.outputRows, aggCursor);
|
||||
}
|
||||
resultRac.appendTo(rac);
|
||||
return rac;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -124,22 +104,34 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
|
|||
}
|
||||
}
|
||||
|
||||
private RowsAndColumns computeGroupAggregates(
|
||||
AggregatorFactory[] aggFactories,
|
||||
WindowFrame frame)
|
||||
public static Iterable<AggInterval> buildIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame)
|
||||
{
|
||||
Iterable<AggInterval> groupIterator = buildGroupIteratorFor(rac, frame);
|
||||
ResultPopulator resultRac = new ResultPopulator(aggFactories, rac.numRows());
|
||||
AggIntervalCursor aggCursor = new AggIntervalCursor(rac, aggFactories);
|
||||
for (AggInterval aggInterval : groupIterator) {
|
||||
aggCursor.moveTo(aggInterval.inputRows);
|
||||
resultRac.write(aggInterval.outputRows, aggCursor);
|
||||
int numRows = rac.numRows();
|
||||
if (frame.getLowerOffsetClamped(numRows) == -numRows && frame.getUpperOffsetClamped(numRows) == numRows) {
|
||||
return buildUnboundedIteratorFor(rac, frame);
|
||||
} else if (frame.getPeerType() == WindowFrame.PeerType.RANGE) {
|
||||
return buildGroupIteratorFor(rac, frame);
|
||||
} else {
|
||||
return buildRowIteratorFor(rac, frame);
|
||||
}
|
||||
resultRac.appendTo(rac);
|
||||
return rac;
|
||||
}
|
||||
|
||||
public static Iterable<AggInterval> buildGroupIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame)
|
||||
private static Iterable<AggInterval> buildUnboundedIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame)
|
||||
{
|
||||
int[] groupBoundaries = new int[]{0, rac.numRows()};
|
||||
return new GroupIteratorForWindowFrame(frame, groupBoundaries);
|
||||
}
|
||||
|
||||
private static Iterable<AggInterval> buildRowIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame)
|
||||
{
|
||||
int[] groupBoundaries = new int[rac.numRows() + 1];
|
||||
for (int j = 0; j < groupBoundaries.length; j++) {
|
||||
groupBoundaries[j] = j;
|
||||
}
|
||||
return new GroupIteratorForWindowFrame(frame, groupBoundaries);
|
||||
}
|
||||
|
||||
private static Iterable<AggInterval> buildGroupIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame)
|
||||
{
|
||||
int[] groupBoundaries = ClusteredGroupPartitioner.fromRAC(rac).computeBoundaries(frame.getOrderByColNames());
|
||||
return new GroupIteratorForWindowFrame(frame, groupBoundaries);
|
||||
|
@ -187,7 +179,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
|
|||
groupToRowIndex(relativeGroupId(1))
|
||||
),
|
||||
Interval.of(
|
||||
groupToRowIndex(relativeGroupId(-lowerOffset)),
|
||||
groupToRowIndex(relativeGroupId(lowerOffset)),
|
||||
groupToRowIndex(relativeGroupId(upperOffset))
|
||||
)
|
||||
);
|
||||
|
@ -365,6 +357,10 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
|
|||
for (int i = currentRows.b; i < newRows.b; i++) {
|
||||
aggregate(i);
|
||||
}
|
||||
} else if (currentRows.a > newRows.a && currentRows.b == newRows.b) {
|
||||
for (int i = newRows.a; i < currentRows.a; i++) {
|
||||
aggregate(i);
|
||||
}
|
||||
} else {
|
||||
newAggregators();
|
||||
for (int i : newRows) {
|
||||
|
@ -390,428 +386,6 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
|
|||
}
|
||||
}
|
||||
|
||||
private AppendableRowsAndColumns computeUnboundedAggregates(AggregatorFactory[] aggFactories)
|
||||
{
|
||||
Aggregator[] aggs = new Aggregator[aggFactories.length];
|
||||
|
||||
|
||||
AtomicInteger currRow = new AtomicInteger(0);
|
||||
final ColumnSelectorFactory columnSelectorFactory = ColumnSelectorFactoryMaker.fromRAC(rac).make(currRow);
|
||||
|
||||
for (int i = 0; i < aggFactories.length; i++) {
|
||||
aggs[i] = aggFactories[i].factorize(columnSelectorFactory);
|
||||
}
|
||||
|
||||
int numRows = rac.numRows();
|
||||
int rowId = currRow.get();
|
||||
while (rowId < numRows) {
|
||||
for (Aggregator agg : aggs) {
|
||||
agg.aggregate();
|
||||
}
|
||||
rowId = currRow.incrementAndGet();
|
||||
}
|
||||
|
||||
for (int i = 0; i < aggFactories.length; ++i) {
|
||||
rac.addColumn(
|
||||
aggFactories[i].getName(),
|
||||
new ConstantObjectColumn(aggs[i].get(), numRows, aggFactories[i].getIntermediateType())
|
||||
);
|
||||
aggs[i].close();
|
||||
}
|
||||
return rac;
|
||||
}
|
||||
|
||||
private AppendableRowsAndColumns computeCumulativeAggregates(AggregatorFactory[] aggFactories, int upperOffset)
|
||||
{
|
||||
int numRows = rac.numRows();
|
||||
if (upperOffset > numRows) {
|
||||
return computeUnboundedAggregates(aggFactories);
|
||||
}
|
||||
|
||||
|
||||
// We store the results in an Object array for convenience. This is definitely sub-par from a memory management
|
||||
// point of view as we should use native arrays when possible. This will be fine for now, but it probably makes
|
||||
// sense to look at optimizing this in the future. That said, such an optimization might best come by having
|
||||
// a specialized implementation of this interface against, say, a Frame object that can deal with arrays instead
|
||||
// of trying to optimize this generic implementation.
|
||||
Object[][] results = new Object[aggFactories.length][numRows];
|
||||
int resultStorageIndex = 0;
|
||||
|
||||
AtomicInteger rowIdProvider = new AtomicInteger(0);
|
||||
final ColumnSelectorFactory columnSelectorFactory = ColumnSelectorFactoryMaker.fromRAC(rac).make(rowIdProvider);
|
||||
|
||||
AggregatorFactory[] combiningFactories = new AggregatorFactory[aggFactories.length];
|
||||
Aggregator[] aggs = new Aggregator[aggFactories.length];
|
||||
for (int i = 0; i < aggFactories.length; i++) {
|
||||
combiningFactories[i] = aggFactories[i].getCombiningFactory();
|
||||
aggs[i] = aggFactories[i].factorize(columnSelectorFactory);
|
||||
}
|
||||
|
||||
// If there is an upper offset, we accumulate those aggregations before starting to generate results
|
||||
for (int i = 0; i < upperOffset; ++i) {
|
||||
for (Aggregator agg : aggs) {
|
||||
agg.aggregate();
|
||||
}
|
||||
rowIdProvider.incrementAndGet();
|
||||
}
|
||||
|
||||
// Prime the results
|
||||
if (rowIdProvider.get() < numRows) {
|
||||
for (int i = 0; i < aggs.length; i++) {
|
||||
aggs[i].aggregate();
|
||||
results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggs[i].get());
|
||||
aggs[i].close();
|
||||
aggs[i] = aggFactories[i].factorize(columnSelectorFactory);
|
||||
}
|
||||
|
||||
++resultStorageIndex;
|
||||
rowIdProvider.incrementAndGet();
|
||||
}
|
||||
|
||||
// From here out, we want to aggregate, peel off a row of results and then accumulate the aggregation
|
||||
for (int rowId = rowIdProvider.get(); rowId < numRows; ++rowId) {
|
||||
for (int i = 0; i < aggs.length; i++) {
|
||||
aggs[i].aggregate();
|
||||
results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggs[i].get());
|
||||
aggs[i].close();
|
||||
|
||||
// Use a combining aggregator to combine the result we just got with the result from the previous row
|
||||
// This is a lot of hoops to jump through just to combine two values, but AggregatorFactory.combine
|
||||
// allows for mutation of either of the arguments passed in, so it cannot be meaningfully used in this
|
||||
// context. Instead, we have to jump through these hoops to make sure that we are generating a new object.
|
||||
// It would've been nice if the AggregatorFactory interface had methods that were more usable for this,
|
||||
// but it doesn't so :shrug:
|
||||
final CumulativeColumnSelectorFactory combiningFactory = new CumulativeColumnSelectorFactory(
|
||||
aggFactories[i],
|
||||
results[i],
|
||||
resultStorageIndex - 1
|
||||
);
|
||||
final Aggregator combiningAgg = combiningFactories[i].factorize(combiningFactory);
|
||||
combiningAgg.aggregate();
|
||||
combiningFactory.increment();
|
||||
combiningAgg.aggregate();
|
||||
results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(combiningAgg.get());
|
||||
combiningAgg.close();
|
||||
|
||||
aggs[i] = aggFactories[i].factorize(columnSelectorFactory);
|
||||
}
|
||||
|
||||
++resultStorageIndex;
|
||||
rowIdProvider.incrementAndGet();
|
||||
}
|
||||
|
||||
// If we haven't filled up all of the results yet, there are no more rows, so just point the rest of the results
|
||||
// at the last result that we generated
|
||||
for (Object[] resultArr : results) {
|
||||
Arrays.fill(resultArr, resultStorageIndex, resultArr.length, resultArr[resultStorageIndex - 1]);
|
||||
}
|
||||
|
||||
return makeReturnRAC(aggFactories, results);
|
||||
}
|
||||
|
||||
private AppendableRowsAndColumns computeReverseCumulativeAggregates(AggregatorFactory[] aggFactories, int lowerOffset)
|
||||
{
|
||||
int numRows = rac.numRows();
|
||||
if (lowerOffset > numRows) {
|
||||
return computeUnboundedAggregates(aggFactories);
|
||||
}
|
||||
|
||||
// We store the results in an Object array for convenience. This is definitely sub-par from a memory management
|
||||
// point of view as we should use native arrays when possible. This will be fine for now, but it probably makes
|
||||
// sense to look at optimizing this in the future. That said, such an optimization might best come by having
|
||||
// a specialized implementation of this interface against, say, a Frame object that can deal with arrays instead
|
||||
// of trying to optimize this generic implementation.
|
||||
Object[][] results = new Object[aggFactories.length][numRows];
|
||||
int resultStorageIndex = numRows - 1;
|
||||
|
||||
AtomicInteger rowIdProvider = new AtomicInteger(numRows - 1);
|
||||
final ColumnSelectorFactory columnSelectorFactory = ColumnSelectorFactoryMaker.fromRAC(rac).make(rowIdProvider);
|
||||
|
||||
AggregatorFactory[] combiningFactories = new AggregatorFactory[aggFactories.length];
|
||||
Aggregator[] aggs = new Aggregator[aggFactories.length];
|
||||
for (int i = 0; i < aggFactories.length; i++) {
|
||||
combiningFactories[i] = aggFactories[i].getCombiningFactory();
|
||||
aggs[i] = aggFactories[i].factorize(columnSelectorFactory);
|
||||
}
|
||||
|
||||
// If there is a lower offset, we accumulate those aggregations before starting to generate results
|
||||
for (int i = 0; i < lowerOffset; ++i) {
|
||||
for (Aggregator agg : aggs) {
|
||||
agg.aggregate();
|
||||
}
|
||||
rowIdProvider.decrementAndGet();
|
||||
}
|
||||
|
||||
// Prime the results
|
||||
if (rowIdProvider.get() >= 0) {
|
||||
for (int i = 0; i < aggs.length; i++) {
|
||||
aggs[i].aggregate();
|
||||
results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggs[i].get());
|
||||
aggs[i].close();
|
||||
aggs[i] = aggFactories[i].factorize(columnSelectorFactory);
|
||||
}
|
||||
|
||||
--resultStorageIndex;
|
||||
rowIdProvider.decrementAndGet();
|
||||
}
|
||||
|
||||
// From here out, we want to aggregate, peel off a row of results and then accumulate the aggregation
|
||||
for (int rowId = rowIdProvider.get(); rowId >= 0; --rowId) {
|
||||
for (int i = 0; i < aggs.length; i++) {
|
||||
aggs[i].aggregate();
|
||||
results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggs[i].get());
|
||||
aggs[i].close();
|
||||
|
||||
// Use a combining aggregator to combine the result we just got with the result from the previous row
|
||||
// This is a lot of hoops to jump through just to combine two values, but AggregatorFactory.combine
|
||||
// allows for mutation of either of the arguments passed in, so it cannot be meaningfully used in this
|
||||
// context. Instead, we have to jump through these hoops to make sure that we are generating a new object.
|
||||
// It would've been nice if the AggregatorFactory interface had methods that were more usable for this,
|
||||
// but it doesn't so :shrug:
|
||||
final CumulativeColumnSelectorFactory combiningFactory = new CumulativeColumnSelectorFactory(
|
||||
aggFactories[i],
|
||||
results[i],
|
||||
resultStorageIndex + 1
|
||||
);
|
||||
final Aggregator combiningAgg = combiningFactories[i].factorize(combiningFactory);
|
||||
combiningAgg.aggregate();
|
||||
combiningFactory.decrement();
|
||||
combiningAgg.aggregate();
|
||||
results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(combiningAgg.get());
|
||||
combiningAgg.close();
|
||||
|
||||
aggs[i] = aggFactories[i].factorize(columnSelectorFactory);
|
||||
}
|
||||
|
||||
--resultStorageIndex;
|
||||
rowIdProvider.decrementAndGet();
|
||||
}
|
||||
|
||||
// If we haven't filled up all of the results yet, there are no more rows, so just point the rest of the results
|
||||
// at the last result that we generated
|
||||
for (Object[] resultArr : results) {
|
||||
Arrays.fill(resultArr, 0, resultStorageIndex + 1, resultArr[resultStorageIndex + 1]);
|
||||
}
|
||||
|
||||
return makeReturnRAC(aggFactories, results);
|
||||
}
|
||||
|
||||
private AppendableRowsAndColumns aggregateWindowApertureWellBehaved(
|
||||
AggregatorFactory[] aggFactories,
|
||||
int lowerOffset,
|
||||
int upperOffset
|
||||
)
|
||||
{
|
||||
// There are 3 different phases of operation when we have more rows than our window size
|
||||
// 1. Our window is not full, as we walk the rows we build up towards filling it
|
||||
// 2. Our window is full, as we walk the rows we take a value off and add a new aggregation
|
||||
// 3. We are nearing the end of the rows, we need to start shrinking the window aperture
|
||||
|
||||
int numRows = rac.numRows();
|
||||
int windowSize = lowerOffset + upperOffset + 1;
|
||||
|
||||
// We store the results in an Object array for convenience. This is definitely sub-par from a memory management
|
||||
// point of view as we should use native arrays when possible. This will be fine for now, but it probably makes
|
||||
// sense to look at optimizing this in the future. That said, such an optimization might best come by having
|
||||
// a specialized implementation of this interface against, say, a Frame object that can deal with arrays instead
|
||||
// of trying to optimize this generic implementation.
|
||||
Object[][] results = new Object[aggFactories.length][numRows];
|
||||
int resultStorageIndex = 0;
|
||||
|
||||
AtomicInteger rowIdProvider = new AtomicInteger(0);
|
||||
final ColumnSelectorFactory columnSelectorFactory = ColumnSelectorFactoryMaker.fromRAC(rac).make(rowIdProvider);
|
||||
|
||||
// This is the number of aggregators to actually aggregate for the current row.
|
||||
// Which also doubles as the nextIndex to roll through as we roll things in and out of the window
|
||||
int nextIndex = lowerOffset + 1;
|
||||
|
||||
Aggregator[][] aggregators = new Aggregator[aggFactories.length][windowSize];
|
||||
for (int i = 0; i < aggregators.length; i++) {
|
||||
final AggregatorFactory aggFactory = aggFactories[i];
|
||||
// instantiate the aggregators that need to be read on the first row.
|
||||
for (int j = 0; j < nextIndex; j++) {
|
||||
aggregators[i][j] = aggFactory.factorize(columnSelectorFactory);
|
||||
}
|
||||
}
|
||||
|
||||
// The first few rows will slowly build out the window to consume the upper-offset. The window will not
|
||||
// be full until we have walked upperOffset number of rows, so phase 1 runs until we have consumed
|
||||
// upperOffset number of rows.
|
||||
for (int upperIndex = 0; upperIndex < upperOffset; ++upperIndex) {
|
||||
for (Aggregator[] aggregator : aggregators) {
|
||||
for (int j = 0; j < nextIndex; ++j) {
|
||||
aggregator[j].aggregate();
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < aggFactories.length; ++i) {
|
||||
aggregators[i][nextIndex] = aggFactories[i].factorize(columnSelectorFactory);
|
||||
}
|
||||
++nextIndex;
|
||||
rowIdProvider.incrementAndGet();
|
||||
}
|
||||
|
||||
// End Phase 1, Enter Phase 2. At this point, nextIndex == windowSize, rowIdProvider is the same as
|
||||
// upperOffset and the aggregators matrix is entirely non-null. We need to iterate until our window has all of
|
||||
// the aggregators in it to fill up the final result set.
|
||||
int endResultStorageIndex = numRows - windowSize;
|
||||
for (; resultStorageIndex < endResultStorageIndex; ++resultStorageIndex) {
|
||||
for (Aggregator[] aggregator : aggregators) {
|
||||
for (Aggregator value : aggregator) {
|
||||
value.aggregate();
|
||||
}
|
||||
}
|
||||
|
||||
if (nextIndex == windowSize) {
|
||||
// Wrap back around and start pruning from the beginning of the window
|
||||
nextIndex = 0;
|
||||
}
|
||||
|
||||
for (int i = 0; i < aggFactories.length; ++i) {
|
||||
results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggregators[i][nextIndex].get());
|
||||
aggregators[i][nextIndex].close();
|
||||
aggregators[i][nextIndex] = aggFactories[i].factorize(columnSelectorFactory);
|
||||
}
|
||||
|
||||
++nextIndex;
|
||||
rowIdProvider.incrementAndGet();
|
||||
}
|
||||
|
||||
if (nextIndex == windowSize) {
|
||||
nextIndex = 0;
|
||||
}
|
||||
|
||||
// End Phase 2, enter Phase 3. At this point, our window has enough aggregators in it to fill up our final
|
||||
// result set. This means that for each new row that we complete, the window will "shrink" until we hit numRows,
|
||||
// at which point we will collect anything yet remaining and be done.
|
||||
|
||||
if (nextIndex != 0) {
|
||||
// Start by organizing the aggregators so that we are 0-indexed from nextIndex. This trades off creating
|
||||
// a new array of references in exchange for removing branches inside of the loop. It also makes the logic
|
||||
// simpler to understand.
|
||||
|
||||
Aggregator[][] reorganizedAggs = new Aggregator[aggFactories.length][windowSize];
|
||||
for (int i = 0; i < aggFactories.length; i++) {
|
||||
System.arraycopy(aggregators[i], nextIndex, reorganizedAggs[i], 0, windowSize - nextIndex);
|
||||
System.arraycopy(aggregators[i], 0, reorganizedAggs[i], windowSize - nextIndex, nextIndex);
|
||||
}
|
||||
aggregators = reorganizedAggs;
|
||||
nextIndex = 0;
|
||||
}
|
||||
|
||||
for (int rowId = rowIdProvider.get(); rowId < numRows; ++rowId) {
|
||||
for (Aggregator[] aggregator : aggregators) {
|
||||
for (int j = nextIndex; j < aggregator.length; ++j) {
|
||||
aggregator[j].aggregate();
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < aggFactories.length; ++i) {
|
||||
results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggregators[i][nextIndex].get());
|
||||
aggregators[i][nextIndex].close();
|
||||
aggregators[i][nextIndex] = null;
|
||||
}
|
||||
|
||||
++nextIndex;
|
||||
++resultStorageIndex;
|
||||
rowIdProvider.incrementAndGet();
|
||||
}
|
||||
|
||||
// End Phase 3, anything left in the window needs to be collected and put into our results
|
||||
for (; nextIndex < windowSize; ++nextIndex) {
|
||||
for (int i = 0; i < aggFactories.length; ++i) {
|
||||
results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggregators[i][nextIndex].get());
|
||||
aggregators[i][nextIndex].close();
|
||||
aggregators[i][nextIndex] = null;
|
||||
}
|
||||
++resultStorageIndex;
|
||||
}
|
||||
|
||||
return makeReturnRAC(aggFactories, results);
|
||||
}
|
||||
|
||||
private AppendableRowsAndColumns aggregateWindowApertureInFlux(
|
||||
AggregatorFactory[] aggFactories,
|
||||
int lowerOffset,
|
||||
int upperOffset
|
||||
)
|
||||
{
|
||||
// In this case, we need to store a value for all items, so our windowSize is equivalent to the number of rows
|
||||
// from the RowsAndColumns object that we are using.
|
||||
int windowSize = rac.numRows();
|
||||
|
||||
// We store the results in an Object array for convenience. This is definitely sub-par from a memory management
|
||||
// point of view as we should use native arrays when possible. This will be fine for now, but it probably makes
|
||||
// sense to look at optimizing this in the future. That said, such an optimization might best come by having
|
||||
// a specialized implementation of this interface against, say, a Frame object that can deal with arrays instead
|
||||
// of trying to optimize this generic implementation.
|
||||
Object[][] results = new Object[aggFactories.length][windowSize];
|
||||
int resultStorageIndex = 0;
|
||||
|
||||
AtomicInteger rowIdProvider = new AtomicInteger(0);
|
||||
final ColumnSelectorFactory columnSelectorFactory = ColumnSelectorFactoryMaker.fromRAC(rac).make(rowIdProvider);
|
||||
|
||||
Aggregator[][] aggregators = new Aggregator[aggFactories.length][windowSize];
|
||||
for (int i = 0; i < aggregators.length; i++) {
|
||||
final AggregatorFactory aggFactory = aggFactories[i];
|
||||
for (int j = 0; j < aggregators[i].length; j++) {
|
||||
aggregators[i][j] = aggFactory.factorize(columnSelectorFactory);
|
||||
}
|
||||
}
|
||||
|
||||
// This is the index to stop at for the current window aperture
|
||||
// The first row is used by all of the results for the lowerOffset num results, plus 1 for the "current row"
|
||||
int stopIndex = Math.min(lowerOffset + 1, windowSize);
|
||||
|
||||
int startIndex = 0;
|
||||
int rowId = rowIdProvider.get();
|
||||
while (rowId < windowSize) {
|
||||
for (Aggregator[] aggregator : aggregators) {
|
||||
for (int j = startIndex; j < stopIndex; ++j) {
|
||||
aggregator[j].aggregate();
|
||||
}
|
||||
}
|
||||
|
||||
if (rowId >= upperOffset) {
|
||||
for (int i = 0; i < aggregators.length; ++i) {
|
||||
results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggregators[i][startIndex].get());
|
||||
aggregators[i][startIndex].close();
|
||||
aggregators[i][startIndex] = null;
|
||||
}
|
||||
|
||||
++resultStorageIndex;
|
||||
++startIndex;
|
||||
}
|
||||
|
||||
if (stopIndex < windowSize) {
|
||||
++stopIndex;
|
||||
}
|
||||
rowId = rowIdProvider.incrementAndGet();
|
||||
}
|
||||
|
||||
|
||||
for (; startIndex < windowSize; ++startIndex) {
|
||||
for (int i = 0; i < aggregators.length; ++i) {
|
||||
results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggregators[i][startIndex].get());
|
||||
aggregators[i][startIndex].close();
|
||||
aggregators[i][startIndex] = null;
|
||||
}
|
||||
++resultStorageIndex;
|
||||
}
|
||||
|
||||
return makeReturnRAC(aggFactories, results);
|
||||
}
|
||||
|
||||
private AppendableRowsAndColumns makeReturnRAC(AggregatorFactory[] aggFactories, Object[][] results)
|
||||
{
|
||||
for (int i = 0; i < aggFactories.length; ++i) {
|
||||
rac.addColumn(
|
||||
aggFactories[i].getName(), new ObjectArrayColumn(results[i], aggFactories[i].getIntermediateType())
|
||||
);
|
||||
}
|
||||
return rac;
|
||||
}
|
||||
|
||||
private static class CumulativeColumnSelectorFactory implements ColumnSelectorFactory
|
||||
{
|
||||
private final ColumnCapabilitiesImpl columnCapabilities;
|
||||
|
@ -831,16 +405,6 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
|
|||
.setType(factory.getIntermediateType());
|
||||
}
|
||||
|
||||
public void increment()
|
||||
{
|
||||
++index;
|
||||
}
|
||||
|
||||
public void decrement()
|
||||
{
|
||||
--index;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nonnull
|
||||
public DimensionSelector makeDimensionSelector(@Nonnull DimensionSpec dimensionSpec)
|
||||
|
|
|
@ -91,7 +91,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
|
||||
|
||||
final RowsAndColumns results = agger.aggregateAll(
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, false, 1, false, 2, null),
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, false, -1, false, 2, null),
|
||||
new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory("sumFromLong", "intCol"),
|
||||
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
|
||||
|
@ -143,7 +143,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
|
||||
|
||||
final RowsAndColumns results = agger.aggregateAll(
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, false, 2, false, 0, null),
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, false, -2, false, 0, null),
|
||||
new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory("sumFromLong", "intCol"),
|
||||
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
|
||||
|
@ -169,7 +169,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
|
||||
|
||||
final RowsAndColumns results = agger.aggregateAll(
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, false, 5, false, 7, null),
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 7, null),
|
||||
new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory("sumFromLong", "intCol"),
|
||||
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
|
||||
|
@ -197,7 +197,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
|
||||
|
||||
final RowsAndColumns results = agger.aggregateAll(
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, false, 5, false, 1, null),
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 1, null),
|
||||
new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory("sumFromLong", "intCol"),
|
||||
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
|
||||
|
@ -225,7 +225,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
|
||||
|
||||
final RowsAndColumns results = agger.aggregateAll(
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, false, 5, false, 0, null),
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 0, null),
|
||||
new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory("sumFromLong", "intCol"),
|
||||
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
|
||||
|
@ -253,7 +253,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
|
||||
|
||||
final RowsAndColumns results = agger.aggregateAll(
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, false, 1, false, 7, null),
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, false, -1, false, 7, null),
|
||||
new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory("sumFromLong", "intCol"),
|
||||
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
|
||||
|
@ -337,7 +337,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
|
||||
|
||||
final RowsAndColumns results = agger.aggregateAll(
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, false, 5, false, 0, null),
|
||||
new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 0, null),
|
||||
new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory("sumFromLong", "intCol"),
|
||||
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
|
||||
|
@ -479,7 +479,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
WindowFrame frame = new WindowFrame(
|
||||
PeerType.RANGE,
|
||||
false,
|
||||
1,
|
||||
-1,
|
||||
false,
|
||||
0,
|
||||
Collections.singletonList(ColumnWithDirection.ascending("c1"))
|
||||
|
@ -517,7 +517,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
WindowFrame frame = new WindowFrame(
|
||||
PeerType.RANGE,
|
||||
false,
|
||||
1,
|
||||
-1,
|
||||
false,
|
||||
1,
|
||||
Collections.singletonList(ColumnWithDirection.ascending("c1"))
|
||||
|
@ -537,7 +537,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
WindowFrame frame = new WindowFrame(
|
||||
PeerType.RANGE,
|
||||
false,
|
||||
1,
|
||||
-1,
|
||||
false,
|
||||
1,
|
||||
Collections.singletonList(ColumnWithDirection.ascending("c1"))
|
||||
|
@ -556,7 +556,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase
|
|||
WindowFrame frame = new WindowFrame(
|
||||
PeerType.RANGE,
|
||||
false,
|
||||
1,
|
||||
-1,
|
||||
false,
|
||||
2,
|
||||
Collections.singletonList(ColumnWithDirection.ascending("c1"))
|
||||
|
|
|
@ -135,16 +135,6 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
|
|||
);
|
||||
}
|
||||
|
||||
if (isPrecedingOrFollowing(lowerBound) &&
|
||||
isPrecedingOrFollowing(upperBound) &&
|
||||
lowerBound.getKind() == upperBound.getKind()) {
|
||||
// this limitation can be lifted when https://github.com/apache/druid/issues/15739 is addressed
|
||||
throw buildCalciteContextException(
|
||||
"Query bounds with both lower and upper bounds as PRECEDING or FOLLOWING is not supported.",
|
||||
windowOrId
|
||||
);
|
||||
}
|
||||
|
||||
boolean hasBounds = lowerBound != null || upperBound != null;
|
||||
if (call.getKind() == SqlKind.NTILE && hasBounds) {
|
||||
throw buildCalciteContextException(
|
||||
|
@ -162,7 +152,6 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
if (plannerContext.queryContext().isWindowingStrictValidation()) {
|
||||
if (!targetWindow.isRows() &&
|
||||
(!isUnboundedOrCurrent(lowerBound) || !isUnboundedOrCurrent(upperBound))) {
|
||||
|
|
|
@ -466,7 +466,9 @@ public class Windowing
|
|||
if (bound.isUnbounded() || bound.isCurrentRow()) {
|
||||
return 0;
|
||||
}
|
||||
return getConstant(((RexInputRef) bound.getOffset()).getIndex());
|
||||
|
||||
final int value = getConstant(((RexInputRef) bound.getOffset()).getIndex());
|
||||
return bound.isPreceding() ? -value : value;
|
||||
}
|
||||
|
||||
private int getConstant(int refIndex)
|
||||
|
|
|
@ -15561,26 +15561,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
assertThat(e, invalidSqlIs("Window frames with expression based lower/upper bounds are not supported. (line [1], column [31])"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testUnSupportedWindowBoundTypes()
|
||||
{
|
||||
assumeFeatureAvailable(EngineFeature.WINDOW_FUNCTIONS);
|
||||
|
||||
DruidException e;
|
||||
e = assertThrows(DruidException.class, () -> testBuilder()
|
||||
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
|
||||
.sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) from druid.foo")
|
||||
.run());
|
||||
assertThat(e, invalidSqlIs("Query bounds with both lower and upper bounds as PRECEDING or FOLLOWING is not supported. (line [1], column [31])"));
|
||||
|
||||
e = assertThrows(DruidException.class, () -> testBuilder()
|
||||
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
|
||||
.sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) from druid.foo")
|
||||
.run());
|
||||
assertThat(e, invalidSqlIs("Query bounds with both lower and upper bounds as PRECEDING or FOLLOWING is not supported. (line [1], column [31])"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNtileNotSupportedWithFrame()
|
||||
{
|
||||
|
|
|
@ -278,6 +278,52 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
|
|||
.run();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWindowAllBoundsCombination()
|
||||
{
|
||||
testBuilder()
|
||||
.sql("select\n"
|
||||
+ "cityName,\n"
|
||||
+ "count(*) over (partition by cityName order by countryName rows between unbounded preceding and 1 preceding) c1,\n"
|
||||
+ "count(*) over (partition by cityName order by countryName rows between unbounded preceding and current row) c2,\n"
|
||||
+ "count(*) over (partition by cityName order by countryName rows between unbounded preceding and 1 following) c3,\n"
|
||||
+ "count(*) over (partition by cityName order by countryName rows between unbounded preceding and unbounded following) c4,\n"
|
||||
+ "count(*) over (partition by cityName order by countryName rows between 3 preceding and 1 preceding) c5,\n"
|
||||
+ "count(*) over (partition by cityName order by countryName rows between 1 preceding and current row) c6,\n"
|
||||
+ "count(*) over (partition by cityName order by countryName rows between 1 preceding and 1 FOLLOWING) c7,\n"
|
||||
+ "count(*) over (partition by cityName order by countryName rows between 1 preceding and unbounded FOLLOWING) c8,\n"
|
||||
+ "count(*) over (partition by cityName order by countryName rows between 1 FOLLOWING and unbounded FOLLOWING) c9,\n"
|
||||
+ "count(*) over (partition by cityName order by countryName rows between 1 FOLLOWING and 3 FOLLOWING) c10,\n"
|
||||
+ "count(*) over (partition by cityName order by countryName rows between current row and 1 following) c11,\n"
|
||||
+ "count(*) over (partition by cityName order by countryName rows between current row and unbounded following) c12\n"
|
||||
+ "from wikipedia\n"
|
||||
+ "where cityName in ('Vienna', 'Seoul')\n"
|
||||
+ "group by countryName, cityName, added")
|
||||
.queryContext(ImmutableMap.of(
|
||||
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
|
||||
QueryContexts.ENABLE_DEBUG, true
|
||||
))
|
||||
.expectedResults(ImmutableList.of(
|
||||
new Object[]{"Seoul", 0L, 1L, 2L, 13L, 0L, 1L, 2L, 13L, 12L, 3L, 2L, 13L},
|
||||
new Object[]{"Seoul", 1L, 2L, 3L, 13L, 1L, 2L, 3L, 13L, 11L, 3L, 2L, 12L},
|
||||
new Object[]{"Seoul", 2L, 3L, 4L, 13L, 2L, 2L, 3L, 12L, 10L, 3L, 2L, 11L},
|
||||
new Object[]{"Seoul", 3L, 4L, 5L, 13L, 3L, 2L, 3L, 11L, 9L, 3L, 2L, 10L},
|
||||
new Object[]{"Seoul", 4L, 5L, 6L, 13L, 3L, 2L, 3L, 10L, 8L, 3L, 2L, 9L},
|
||||
new Object[]{"Seoul", 5L, 6L, 7L, 13L, 3L, 2L, 3L, 9L, 7L, 3L, 2L, 8L},
|
||||
new Object[]{"Seoul", 6L, 7L, 8L, 13L, 3L, 2L, 3L, 8L, 6L, 3L, 2L, 7L},
|
||||
new Object[]{"Seoul", 7L, 8L, 9L, 13L, 3L, 2L, 3L, 7L, 5L, 3L, 2L, 6L},
|
||||
new Object[]{"Seoul", 8L, 9L, 10L, 13L, 3L, 2L, 3L, 6L, 4L, 3L, 2L, 5L},
|
||||
new Object[]{"Seoul", 9L, 10L, 11L, 13L, 3L, 2L, 3L, 5L, 3L, 3L, 2L, 4L},
|
||||
new Object[]{"Seoul", 10L, 11L, 12L, 13L, 3L, 2L, 3L, 4L, 2L, 2L, 2L, 3L},
|
||||
new Object[]{"Seoul", 11L, 12L, 13L, 13L, 3L, 2L, 3L, 3L, 1L, 1L, 2L, 2L},
|
||||
new Object[]{"Seoul", 12L, 13L, 13L, 13L, 3L, 2L, 2L, 2L, 0L, 0L, 1L, 1L},
|
||||
new Object[]{"Vienna", 0L, 1L, 2L, 3L, 0L, 1L, 2L, 3L, 2L, 2L, 2L, 3L},
|
||||
new Object[]{"Vienna", 1L, 2L, 3L, 3L, 1L, 2L, 3L, 3L, 1L, 1L, 2L, 2L},
|
||||
new Object[]{"Vienna", 2L, 3L, 3L, 3L, 2L, 2L, 2L, 2L, 0L, 0L, 1L, 1L}
|
||||
))
|
||||
.run();
|
||||
}
|
||||
|
||||
private WindowOperatorQuery getWindowOperatorQuery(List<Query<?>> queries)
|
||||
{
|
||||
assertEquals(1, queries.size());
|
||||
|
|
|
@ -18,7 +18,7 @@ expectedOperators:
|
|||
frame:
|
||||
peerType: "RANGE"
|
||||
lowUnbounded: false
|
||||
lowOffset: 3
|
||||
lowOffset: -3
|
||||
uppUnbounded: false
|
||||
uppOffset: 2
|
||||
orderBy:
|
||||
|
|
|
@ -17,7 +17,7 @@ expectedOperators:
|
|||
frame:
|
||||
peerType: "RANGE"
|
||||
lowUnbounded: false
|
||||
lowOffset: 3
|
||||
lowOffset: -3
|
||||
uppUnbounded: false
|
||||
uppOffset: 2
|
||||
orderBy: [ {column: "d1", direction: ASC} ]
|
||||
|
|
Loading…
Reference in New Issue