diff --git a/processing/src/main/java/org/apache/druid/query/operator/window/WindowFrame.java b/processing/src/main/java/org/apache/druid/query/operator/window/WindowFrame.java index b70df2c5203..fca50c25b28 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/window/WindowFrame.java +++ b/processing/src/main/java/org/apache/druid/query/operator/window/WindowFrame.java @@ -161,9 +161,9 @@ public class WindowFrame public int getLowerOffsetClamped(int maxRows) { if (lowerUnbounded) { - return maxRows; + return -maxRows; } - return Math.min(maxRows, lowerOffset); + return Math.max(-maxRows, lowerOffset); } /** diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java index 106fa9674a0..19bc6170c12 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java @@ -29,7 +29,6 @@ import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.query.operator.window.WindowFrame; import org.apache.druid.query.rowsandcols.RowsAndColumns; -import org.apache.druid.query.rowsandcols.column.ConstantObjectColumn; import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; @@ -61,34 +60,15 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable AggregatorFactory[] aggFactories ) { - if (frame.isLowerUnbounded() && frame.isUpperUnbounded()) { - return computeUnboundedAggregates(aggFactories); - } - - if (frame.getPeerType() == WindowFrame.PeerType.ROWS) { - if (frame.isLowerUnbounded()) { - return computeCumulativeAggregates(aggFactories, frame.getUpperOffset()); - } else if (frame.isUpperUnbounded()) { - return computeReverseCumulativeAggregates(aggFactories, frame.getLowerOffset()); - } else { - final int numRows = rac.numRows(); - int lowerOffset = frame.getLowerOffset(); - int upperOffset = frame.getUpperOffset(); - - if (numRows < lowerOffset + upperOffset + 1) { - // In this case, there are not enough rows to completely build up the full window aperture before it needs to - // also start contracting the aperture because of the upper offset. So we use a method that specifically - // handles checks for both expanding and reducing the aperture on every iteration. - return aggregateWindowApertureInFlux(aggFactories, lowerOffset, upperOffset); - } else { - // In this case, there are 3 distinct phases that allow us to loop with less - // branches, so we have a method that specifically does that. - return aggregateWindowApertureWellBehaved(aggFactories, lowerOffset, upperOffset); - } - } - } else { - return computeGroupAggregates(aggFactories, frame); + Iterable groupIterator = buildIteratorFor(rac, frame); + ResultPopulator resultRac = new ResultPopulator(aggFactories, rac.numRows()); + AggIntervalCursor aggCursor = new AggIntervalCursor(rac, aggFactories); + for (AggInterval aggInterval : groupIterator) { + aggCursor.moveTo(aggInterval.inputRows); + resultRac.write(aggInterval.outputRows, aggCursor); } + resultRac.appendTo(rac); + return rac; } /** @@ -124,22 +104,34 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable } } - private RowsAndColumns computeGroupAggregates( - AggregatorFactory[] aggFactories, - WindowFrame frame) + public static Iterable buildIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame) { - Iterable groupIterator = buildGroupIteratorFor(rac, frame); - ResultPopulator resultRac = new ResultPopulator(aggFactories, rac.numRows()); - AggIntervalCursor aggCursor = new AggIntervalCursor(rac, aggFactories); - for (AggInterval aggInterval : groupIterator) { - aggCursor.moveTo(aggInterval.inputRows); - resultRac.write(aggInterval.outputRows, aggCursor); + int numRows = rac.numRows(); + if (frame.getLowerOffsetClamped(numRows) == -numRows && frame.getUpperOffsetClamped(numRows) == numRows) { + return buildUnboundedIteratorFor(rac, frame); + } else if (frame.getPeerType() == WindowFrame.PeerType.RANGE) { + return buildGroupIteratorFor(rac, frame); + } else { + return buildRowIteratorFor(rac, frame); } - resultRac.appendTo(rac); - return rac; } - public static Iterable buildGroupIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame) + private static Iterable buildUnboundedIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame) + { + int[] groupBoundaries = new int[]{0, rac.numRows()}; + return new GroupIteratorForWindowFrame(frame, groupBoundaries); + } + + private static Iterable buildRowIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame) + { + int[] groupBoundaries = new int[rac.numRows() + 1]; + for (int j = 0; j < groupBoundaries.length; j++) { + groupBoundaries[j] = j; + } + return new GroupIteratorForWindowFrame(frame, groupBoundaries); + } + + private static Iterable buildGroupIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame) { int[] groupBoundaries = ClusteredGroupPartitioner.fromRAC(rac).computeBoundaries(frame.getOrderByColNames()); return new GroupIteratorForWindowFrame(frame, groupBoundaries); @@ -187,7 +179,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable groupToRowIndex(relativeGroupId(1)) ), Interval.of( - groupToRowIndex(relativeGroupId(-lowerOffset)), + groupToRowIndex(relativeGroupId(lowerOffset)), groupToRowIndex(relativeGroupId(upperOffset)) ) ); @@ -365,6 +357,10 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable for (int i = currentRows.b; i < newRows.b; i++) { aggregate(i); } + } else if (currentRows.a > newRows.a && currentRows.b == newRows.b) { + for (int i = newRows.a; i < currentRows.a; i++) { + aggregate(i); + } } else { newAggregators(); for (int i : newRows) { @@ -390,428 +386,6 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable } } - private AppendableRowsAndColumns computeUnboundedAggregates(AggregatorFactory[] aggFactories) - { - Aggregator[] aggs = new Aggregator[aggFactories.length]; - - - AtomicInteger currRow = new AtomicInteger(0); - final ColumnSelectorFactory columnSelectorFactory = ColumnSelectorFactoryMaker.fromRAC(rac).make(currRow); - - for (int i = 0; i < aggFactories.length; i++) { - aggs[i] = aggFactories[i].factorize(columnSelectorFactory); - } - - int numRows = rac.numRows(); - int rowId = currRow.get(); - while (rowId < numRows) { - for (Aggregator agg : aggs) { - agg.aggregate(); - } - rowId = currRow.incrementAndGet(); - } - - for (int i = 0; i < aggFactories.length; ++i) { - rac.addColumn( - aggFactories[i].getName(), - new ConstantObjectColumn(aggs[i].get(), numRows, aggFactories[i].getIntermediateType()) - ); - aggs[i].close(); - } - return rac; - } - - private AppendableRowsAndColumns computeCumulativeAggregates(AggregatorFactory[] aggFactories, int upperOffset) - { - int numRows = rac.numRows(); - if (upperOffset > numRows) { - return computeUnboundedAggregates(aggFactories); - } - - - // We store the results in an Object array for convenience. This is definitely sub-par from a memory management - // point of view as we should use native arrays when possible. This will be fine for now, but it probably makes - // sense to look at optimizing this in the future. That said, such an optimization might best come by having - // a specialized implementation of this interface against, say, a Frame object that can deal with arrays instead - // of trying to optimize this generic implementation. - Object[][] results = new Object[aggFactories.length][numRows]; - int resultStorageIndex = 0; - - AtomicInteger rowIdProvider = new AtomicInteger(0); - final ColumnSelectorFactory columnSelectorFactory = ColumnSelectorFactoryMaker.fromRAC(rac).make(rowIdProvider); - - AggregatorFactory[] combiningFactories = new AggregatorFactory[aggFactories.length]; - Aggregator[] aggs = new Aggregator[aggFactories.length]; - for (int i = 0; i < aggFactories.length; i++) { - combiningFactories[i] = aggFactories[i].getCombiningFactory(); - aggs[i] = aggFactories[i].factorize(columnSelectorFactory); - } - - // If there is an upper offset, we accumulate those aggregations before starting to generate results - for (int i = 0; i < upperOffset; ++i) { - for (Aggregator agg : aggs) { - agg.aggregate(); - } - rowIdProvider.incrementAndGet(); - } - - // Prime the results - if (rowIdProvider.get() < numRows) { - for (int i = 0; i < aggs.length; i++) { - aggs[i].aggregate(); - results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggs[i].get()); - aggs[i].close(); - aggs[i] = aggFactories[i].factorize(columnSelectorFactory); - } - - ++resultStorageIndex; - rowIdProvider.incrementAndGet(); - } - - // From here out, we want to aggregate, peel off a row of results and then accumulate the aggregation - for (int rowId = rowIdProvider.get(); rowId < numRows; ++rowId) { - for (int i = 0; i < aggs.length; i++) { - aggs[i].aggregate(); - results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggs[i].get()); - aggs[i].close(); - - // Use a combining aggregator to combine the result we just got with the result from the previous row - // This is a lot of hoops to jump through just to combine two values, but AggregatorFactory.combine - // allows for mutation of either of the arguments passed in, so it cannot be meaningfully used in this - // context. Instead, we have to jump through these hoops to make sure that we are generating a new object. - // It would've been nice if the AggregatorFactory interface had methods that were more usable for this, - // but it doesn't so :shrug: - final CumulativeColumnSelectorFactory combiningFactory = new CumulativeColumnSelectorFactory( - aggFactories[i], - results[i], - resultStorageIndex - 1 - ); - final Aggregator combiningAgg = combiningFactories[i].factorize(combiningFactory); - combiningAgg.aggregate(); - combiningFactory.increment(); - combiningAgg.aggregate(); - results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(combiningAgg.get()); - combiningAgg.close(); - - aggs[i] = aggFactories[i].factorize(columnSelectorFactory); - } - - ++resultStorageIndex; - rowIdProvider.incrementAndGet(); - } - - // If we haven't filled up all of the results yet, there are no more rows, so just point the rest of the results - // at the last result that we generated - for (Object[] resultArr : results) { - Arrays.fill(resultArr, resultStorageIndex, resultArr.length, resultArr[resultStorageIndex - 1]); - } - - return makeReturnRAC(aggFactories, results); - } - - private AppendableRowsAndColumns computeReverseCumulativeAggregates(AggregatorFactory[] aggFactories, int lowerOffset) - { - int numRows = rac.numRows(); - if (lowerOffset > numRows) { - return computeUnboundedAggregates(aggFactories); - } - - // We store the results in an Object array for convenience. This is definitely sub-par from a memory management - // point of view as we should use native arrays when possible. This will be fine for now, but it probably makes - // sense to look at optimizing this in the future. That said, such an optimization might best come by having - // a specialized implementation of this interface against, say, a Frame object that can deal with arrays instead - // of trying to optimize this generic implementation. - Object[][] results = new Object[aggFactories.length][numRows]; - int resultStorageIndex = numRows - 1; - - AtomicInteger rowIdProvider = new AtomicInteger(numRows - 1); - final ColumnSelectorFactory columnSelectorFactory = ColumnSelectorFactoryMaker.fromRAC(rac).make(rowIdProvider); - - AggregatorFactory[] combiningFactories = new AggregatorFactory[aggFactories.length]; - Aggregator[] aggs = new Aggregator[aggFactories.length]; - for (int i = 0; i < aggFactories.length; i++) { - combiningFactories[i] = aggFactories[i].getCombiningFactory(); - aggs[i] = aggFactories[i].factorize(columnSelectorFactory); - } - - // If there is a lower offset, we accumulate those aggregations before starting to generate results - for (int i = 0; i < lowerOffset; ++i) { - for (Aggregator agg : aggs) { - agg.aggregate(); - } - rowIdProvider.decrementAndGet(); - } - - // Prime the results - if (rowIdProvider.get() >= 0) { - for (int i = 0; i < aggs.length; i++) { - aggs[i].aggregate(); - results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggs[i].get()); - aggs[i].close(); - aggs[i] = aggFactories[i].factorize(columnSelectorFactory); - } - - --resultStorageIndex; - rowIdProvider.decrementAndGet(); - } - - // From here out, we want to aggregate, peel off a row of results and then accumulate the aggregation - for (int rowId = rowIdProvider.get(); rowId >= 0; --rowId) { - for (int i = 0; i < aggs.length; i++) { - aggs[i].aggregate(); - results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggs[i].get()); - aggs[i].close(); - - // Use a combining aggregator to combine the result we just got with the result from the previous row - // This is a lot of hoops to jump through just to combine two values, but AggregatorFactory.combine - // allows for mutation of either of the arguments passed in, so it cannot be meaningfully used in this - // context. Instead, we have to jump through these hoops to make sure that we are generating a new object. - // It would've been nice if the AggregatorFactory interface had methods that were more usable for this, - // but it doesn't so :shrug: - final CumulativeColumnSelectorFactory combiningFactory = new CumulativeColumnSelectorFactory( - aggFactories[i], - results[i], - resultStorageIndex + 1 - ); - final Aggregator combiningAgg = combiningFactories[i].factorize(combiningFactory); - combiningAgg.aggregate(); - combiningFactory.decrement(); - combiningAgg.aggregate(); - results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(combiningAgg.get()); - combiningAgg.close(); - - aggs[i] = aggFactories[i].factorize(columnSelectorFactory); - } - - --resultStorageIndex; - rowIdProvider.decrementAndGet(); - } - - // If we haven't filled up all of the results yet, there are no more rows, so just point the rest of the results - // at the last result that we generated - for (Object[] resultArr : results) { - Arrays.fill(resultArr, 0, resultStorageIndex + 1, resultArr[resultStorageIndex + 1]); - } - - return makeReturnRAC(aggFactories, results); - } - - private AppendableRowsAndColumns aggregateWindowApertureWellBehaved( - AggregatorFactory[] aggFactories, - int lowerOffset, - int upperOffset - ) - { - // There are 3 different phases of operation when we have more rows than our window size - // 1. Our window is not full, as we walk the rows we build up towards filling it - // 2. Our window is full, as we walk the rows we take a value off and add a new aggregation - // 3. We are nearing the end of the rows, we need to start shrinking the window aperture - - int numRows = rac.numRows(); - int windowSize = lowerOffset + upperOffset + 1; - - // We store the results in an Object array for convenience. This is definitely sub-par from a memory management - // point of view as we should use native arrays when possible. This will be fine for now, but it probably makes - // sense to look at optimizing this in the future. That said, such an optimization might best come by having - // a specialized implementation of this interface against, say, a Frame object that can deal with arrays instead - // of trying to optimize this generic implementation. - Object[][] results = new Object[aggFactories.length][numRows]; - int resultStorageIndex = 0; - - AtomicInteger rowIdProvider = new AtomicInteger(0); - final ColumnSelectorFactory columnSelectorFactory = ColumnSelectorFactoryMaker.fromRAC(rac).make(rowIdProvider); - - // This is the number of aggregators to actually aggregate for the current row. - // Which also doubles as the nextIndex to roll through as we roll things in and out of the window - int nextIndex = lowerOffset + 1; - - Aggregator[][] aggregators = new Aggregator[aggFactories.length][windowSize]; - for (int i = 0; i < aggregators.length; i++) { - final AggregatorFactory aggFactory = aggFactories[i]; - // instantiate the aggregators that need to be read on the first row. - for (int j = 0; j < nextIndex; j++) { - aggregators[i][j] = aggFactory.factorize(columnSelectorFactory); - } - } - - // The first few rows will slowly build out the window to consume the upper-offset. The window will not - // be full until we have walked upperOffset number of rows, so phase 1 runs until we have consumed - // upperOffset number of rows. - for (int upperIndex = 0; upperIndex < upperOffset; ++upperIndex) { - for (Aggregator[] aggregator : aggregators) { - for (int j = 0; j < nextIndex; ++j) { - aggregator[j].aggregate(); - } - } - - for (int i = 0; i < aggFactories.length; ++i) { - aggregators[i][nextIndex] = aggFactories[i].factorize(columnSelectorFactory); - } - ++nextIndex; - rowIdProvider.incrementAndGet(); - } - - // End Phase 1, Enter Phase 2. At this point, nextIndex == windowSize, rowIdProvider is the same as - // upperOffset and the aggregators matrix is entirely non-null. We need to iterate until our window has all of - // the aggregators in it to fill up the final result set. - int endResultStorageIndex = numRows - windowSize; - for (; resultStorageIndex < endResultStorageIndex; ++resultStorageIndex) { - for (Aggregator[] aggregator : aggregators) { - for (Aggregator value : aggregator) { - value.aggregate(); - } - } - - if (nextIndex == windowSize) { - // Wrap back around and start pruning from the beginning of the window - nextIndex = 0; - } - - for (int i = 0; i < aggFactories.length; ++i) { - results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggregators[i][nextIndex].get()); - aggregators[i][nextIndex].close(); - aggregators[i][nextIndex] = aggFactories[i].factorize(columnSelectorFactory); - } - - ++nextIndex; - rowIdProvider.incrementAndGet(); - } - - if (nextIndex == windowSize) { - nextIndex = 0; - } - - // End Phase 2, enter Phase 3. At this point, our window has enough aggregators in it to fill up our final - // result set. This means that for each new row that we complete, the window will "shrink" until we hit numRows, - // at which point we will collect anything yet remaining and be done. - - if (nextIndex != 0) { - // Start by organizing the aggregators so that we are 0-indexed from nextIndex. This trades off creating - // a new array of references in exchange for removing branches inside of the loop. It also makes the logic - // simpler to understand. - - Aggregator[][] reorganizedAggs = new Aggregator[aggFactories.length][windowSize]; - for (int i = 0; i < aggFactories.length; i++) { - System.arraycopy(aggregators[i], nextIndex, reorganizedAggs[i], 0, windowSize - nextIndex); - System.arraycopy(aggregators[i], 0, reorganizedAggs[i], windowSize - nextIndex, nextIndex); - } - aggregators = reorganizedAggs; - nextIndex = 0; - } - - for (int rowId = rowIdProvider.get(); rowId < numRows; ++rowId) { - for (Aggregator[] aggregator : aggregators) { - for (int j = nextIndex; j < aggregator.length; ++j) { - aggregator[j].aggregate(); - } - } - - for (int i = 0; i < aggFactories.length; ++i) { - results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggregators[i][nextIndex].get()); - aggregators[i][nextIndex].close(); - aggregators[i][nextIndex] = null; - } - - ++nextIndex; - ++resultStorageIndex; - rowIdProvider.incrementAndGet(); - } - - // End Phase 3, anything left in the window needs to be collected and put into our results - for (; nextIndex < windowSize; ++nextIndex) { - for (int i = 0; i < aggFactories.length; ++i) { - results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggregators[i][nextIndex].get()); - aggregators[i][nextIndex].close(); - aggregators[i][nextIndex] = null; - } - ++resultStorageIndex; - } - - return makeReturnRAC(aggFactories, results); - } - - private AppendableRowsAndColumns aggregateWindowApertureInFlux( - AggregatorFactory[] aggFactories, - int lowerOffset, - int upperOffset - ) - { - // In this case, we need to store a value for all items, so our windowSize is equivalent to the number of rows - // from the RowsAndColumns object that we are using. - int windowSize = rac.numRows(); - - // We store the results in an Object array for convenience. This is definitely sub-par from a memory management - // point of view as we should use native arrays when possible. This will be fine for now, but it probably makes - // sense to look at optimizing this in the future. That said, such an optimization might best come by having - // a specialized implementation of this interface against, say, a Frame object that can deal with arrays instead - // of trying to optimize this generic implementation. - Object[][] results = new Object[aggFactories.length][windowSize]; - int resultStorageIndex = 0; - - AtomicInteger rowIdProvider = new AtomicInteger(0); - final ColumnSelectorFactory columnSelectorFactory = ColumnSelectorFactoryMaker.fromRAC(rac).make(rowIdProvider); - - Aggregator[][] aggregators = new Aggregator[aggFactories.length][windowSize]; - for (int i = 0; i < aggregators.length; i++) { - final AggregatorFactory aggFactory = aggFactories[i]; - for (int j = 0; j < aggregators[i].length; j++) { - aggregators[i][j] = aggFactory.factorize(columnSelectorFactory); - } - } - - // This is the index to stop at for the current window aperture - // The first row is used by all of the results for the lowerOffset num results, plus 1 for the "current row" - int stopIndex = Math.min(lowerOffset + 1, windowSize); - - int startIndex = 0; - int rowId = rowIdProvider.get(); - while (rowId < windowSize) { - for (Aggregator[] aggregator : aggregators) { - for (int j = startIndex; j < stopIndex; ++j) { - aggregator[j].aggregate(); - } - } - - if (rowId >= upperOffset) { - for (int i = 0; i < aggregators.length; ++i) { - results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggregators[i][startIndex].get()); - aggregators[i][startIndex].close(); - aggregators[i][startIndex] = null; - } - - ++resultStorageIndex; - ++startIndex; - } - - if (stopIndex < windowSize) { - ++stopIndex; - } - rowId = rowIdProvider.incrementAndGet(); - } - - - for (; startIndex < windowSize; ++startIndex) { - for (int i = 0; i < aggregators.length; ++i) { - results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggregators[i][startIndex].get()); - aggregators[i][startIndex].close(); - aggregators[i][startIndex] = null; - } - ++resultStorageIndex; - } - - return makeReturnRAC(aggFactories, results); - } - - private AppendableRowsAndColumns makeReturnRAC(AggregatorFactory[] aggFactories, Object[][] results) - { - for (int i = 0; i < aggFactories.length; ++i) { - rac.addColumn( - aggFactories[i].getName(), new ObjectArrayColumn(results[i], aggFactories[i].getIntermediateType()) - ); - } - return rac; - } - private static class CumulativeColumnSelectorFactory implements ColumnSelectorFactory { private final ColumnCapabilitiesImpl columnCapabilities; @@ -831,16 +405,6 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable .setType(factory.getIntermediateType()); } - public void increment() - { - ++index; - } - - public void decrement() - { - --index; - } - @Override @Nonnull public DimensionSelector makeDimensionSelector(@Nonnull DimensionSpec dimensionSpec) diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/FramedOnHeapAggregatableTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/FramedOnHeapAggregatableTest.java index d00e12b3b66..d5b11f7a612 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/FramedOnHeapAggregatableTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/FramedOnHeapAggregatableTest.java @@ -91,7 +91,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac); final RowsAndColumns results = agger.aggregateAll( - new WindowFrame(WindowFrame.PeerType.ROWS, false, 1, false, 2, null), + new WindowFrame(WindowFrame.PeerType.ROWS, false, -1, false, 2, null), new AggregatorFactory[]{ new LongSumAggregatorFactory("sumFromLong", "intCol"), new DoubleMaxAggregatorFactory("maxFromInt", "intCol"), @@ -143,7 +143,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac); final RowsAndColumns results = agger.aggregateAll( - new WindowFrame(WindowFrame.PeerType.ROWS, false, 2, false, 0, null), + new WindowFrame(WindowFrame.PeerType.ROWS, false, -2, false, 0, null), new AggregatorFactory[]{ new LongSumAggregatorFactory("sumFromLong", "intCol"), new DoubleMaxAggregatorFactory("maxFromInt", "intCol"), @@ -169,7 +169,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac); final RowsAndColumns results = agger.aggregateAll( - new WindowFrame(WindowFrame.PeerType.ROWS, false, 5, false, 7, null), + new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 7, null), new AggregatorFactory[]{ new LongSumAggregatorFactory("sumFromLong", "intCol"), new DoubleMaxAggregatorFactory("maxFromInt", "intCol"), @@ -197,7 +197,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac); final RowsAndColumns results = agger.aggregateAll( - new WindowFrame(WindowFrame.PeerType.ROWS, false, 5, false, 1, null), + new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 1, null), new AggregatorFactory[]{ new LongSumAggregatorFactory("sumFromLong", "intCol"), new DoubleMaxAggregatorFactory("maxFromInt", "intCol"), @@ -225,7 +225,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac); final RowsAndColumns results = agger.aggregateAll( - new WindowFrame(WindowFrame.PeerType.ROWS, false, 5, false, 0, null), + new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 0, null), new AggregatorFactory[]{ new LongSumAggregatorFactory("sumFromLong", "intCol"), new DoubleMaxAggregatorFactory("maxFromInt", "intCol"), @@ -253,7 +253,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac); final RowsAndColumns results = agger.aggregateAll( - new WindowFrame(WindowFrame.PeerType.ROWS, false, 1, false, 7, null), + new WindowFrame(WindowFrame.PeerType.ROWS, false, -1, false, 7, null), new AggregatorFactory[]{ new LongSumAggregatorFactory("sumFromLong", "intCol"), new DoubleMaxAggregatorFactory("maxFromInt", "intCol"), @@ -337,7 +337,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac); final RowsAndColumns results = agger.aggregateAll( - new WindowFrame(WindowFrame.PeerType.ROWS, false, 5, false, 0, null), + new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 0, null), new AggregatorFactory[]{ new LongSumAggregatorFactory("sumFromLong", "intCol"), new DoubleMaxAggregatorFactory("maxFromInt", "intCol"), @@ -479,7 +479,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase WindowFrame frame = new WindowFrame( PeerType.RANGE, false, - 1, + -1, false, 0, Collections.singletonList(ColumnWithDirection.ascending("c1")) @@ -517,7 +517,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase WindowFrame frame = new WindowFrame( PeerType.RANGE, false, - 1, + -1, false, 1, Collections.singletonList(ColumnWithDirection.ascending("c1")) @@ -537,7 +537,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase WindowFrame frame = new WindowFrame( PeerType.RANGE, false, - 1, + -1, false, 1, Collections.singletonList(ColumnWithDirection.ascending("c1")) @@ -556,7 +556,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase WindowFrame frame = new WindowFrame( PeerType.RANGE, false, - 1, + -1, false, 2, Collections.singletonList(ColumnWithDirection.ascending("c1")) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java index a6319c40f66..d399786676c 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java @@ -135,16 +135,6 @@ public class DruidSqlValidator extends BaseDruidSqlValidator ); } - if (isPrecedingOrFollowing(lowerBound) && - isPrecedingOrFollowing(upperBound) && - lowerBound.getKind() == upperBound.getKind()) { - // this limitation can be lifted when https://github.com/apache/druid/issues/15739 is addressed - throw buildCalciteContextException( - "Query bounds with both lower and upper bounds as PRECEDING or FOLLOWING is not supported.", - windowOrId - ); - } - boolean hasBounds = lowerBound != null || upperBound != null; if (call.getKind() == SqlKind.NTILE && hasBounds) { throw buildCalciteContextException( @@ -162,7 +152,6 @@ public class DruidSqlValidator extends BaseDruidSqlValidator } } - if (plannerContext.queryContext().isWindowingStrictValidation()) { if (!targetWindow.isRows() && (!isUnboundedOrCurrent(lowerBound) || !isUnboundedOrCurrent(upperBound))) { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java index 4f0f0eda21b..afd775ef4ee 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java @@ -466,7 +466,9 @@ public class Windowing if (bound.isUnbounded() || bound.isCurrentRow()) { return 0; } - return getConstant(((RexInputRef) bound.getOffset()).getIndex()); + + final int value = getConstant(((RexInputRef) bound.getOffset()).getIndex()); + return bound.isPreceding() ? -value : value; } private int getConstant(int refIndex) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 9a0a0318210..ec40fb3f871 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -15561,26 +15561,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest assertThat(e, invalidSqlIs("Window frames with expression based lower/upper bounds are not supported. (line [1], column [31])")); } - - @Test - public void testUnSupportedWindowBoundTypes() - { - assumeFeatureAvailable(EngineFeature.WINDOW_FUNCTIONS); - - DruidException e; - e = assertThrows(DruidException.class, () -> testBuilder() - .queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true)) - .sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) from druid.foo") - .run()); - assertThat(e, invalidSqlIs("Query bounds with both lower and upper bounds as PRECEDING or FOLLOWING is not supported. (line [1], column [31])")); - - e = assertThrows(DruidException.class, () -> testBuilder() - .queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true)) - .sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) from druid.foo") - .run()); - assertThat(e, invalidSqlIs("Query bounds with both lower and upper bounds as PRECEDING or FOLLOWING is not supported. (line [1], column [31])")); - } - @Test public void testNtileNotSupportedWithFrame() { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java index f5e161b3a6a..cdc4bc9cbf9 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java @@ -278,6 +278,52 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest .run(); } + @Test + public void testWindowAllBoundsCombination() + { + testBuilder() + .sql("select\n" + + "cityName,\n" + + "count(*) over (partition by cityName order by countryName rows between unbounded preceding and 1 preceding) c1,\n" + + "count(*) over (partition by cityName order by countryName rows between unbounded preceding and current row) c2,\n" + + "count(*) over (partition by cityName order by countryName rows between unbounded preceding and 1 following) c3,\n" + + "count(*) over (partition by cityName order by countryName rows between unbounded preceding and unbounded following) c4,\n" + + "count(*) over (partition by cityName order by countryName rows between 3 preceding and 1 preceding) c5,\n" + + "count(*) over (partition by cityName order by countryName rows between 1 preceding and current row) c6,\n" + + "count(*) over (partition by cityName order by countryName rows between 1 preceding and 1 FOLLOWING) c7,\n" + + "count(*) over (partition by cityName order by countryName rows between 1 preceding and unbounded FOLLOWING) c8,\n" + + "count(*) over (partition by cityName order by countryName rows between 1 FOLLOWING and unbounded FOLLOWING) c9,\n" + + "count(*) over (partition by cityName order by countryName rows between 1 FOLLOWING and 3 FOLLOWING) c10,\n" + + "count(*) over (partition by cityName order by countryName rows between current row and 1 following) c11,\n" + + "count(*) over (partition by cityName order by countryName rows between current row and unbounded following) c12\n" + + "from wikipedia\n" + + "where cityName in ('Vienna', 'Seoul')\n" + + "group by countryName, cityName, added") + .queryContext(ImmutableMap.of( + PlannerContext.CTX_ENABLE_WINDOW_FNS, true, + QueryContexts.ENABLE_DEBUG, true + )) + .expectedResults(ImmutableList.of( + new Object[]{"Seoul", 0L, 1L, 2L, 13L, 0L, 1L, 2L, 13L, 12L, 3L, 2L, 13L}, + new Object[]{"Seoul", 1L, 2L, 3L, 13L, 1L, 2L, 3L, 13L, 11L, 3L, 2L, 12L}, + new Object[]{"Seoul", 2L, 3L, 4L, 13L, 2L, 2L, 3L, 12L, 10L, 3L, 2L, 11L}, + new Object[]{"Seoul", 3L, 4L, 5L, 13L, 3L, 2L, 3L, 11L, 9L, 3L, 2L, 10L}, + new Object[]{"Seoul", 4L, 5L, 6L, 13L, 3L, 2L, 3L, 10L, 8L, 3L, 2L, 9L}, + new Object[]{"Seoul", 5L, 6L, 7L, 13L, 3L, 2L, 3L, 9L, 7L, 3L, 2L, 8L}, + new Object[]{"Seoul", 6L, 7L, 8L, 13L, 3L, 2L, 3L, 8L, 6L, 3L, 2L, 7L}, + new Object[]{"Seoul", 7L, 8L, 9L, 13L, 3L, 2L, 3L, 7L, 5L, 3L, 2L, 6L}, + new Object[]{"Seoul", 8L, 9L, 10L, 13L, 3L, 2L, 3L, 6L, 4L, 3L, 2L, 5L}, + new Object[]{"Seoul", 9L, 10L, 11L, 13L, 3L, 2L, 3L, 5L, 3L, 3L, 2L, 4L}, + new Object[]{"Seoul", 10L, 11L, 12L, 13L, 3L, 2L, 3L, 4L, 2L, 2L, 2L, 3L}, + new Object[]{"Seoul", 11L, 12L, 13L, 13L, 3L, 2L, 3L, 3L, 1L, 1L, 2L, 2L}, + new Object[]{"Seoul", 12L, 13L, 13L, 13L, 3L, 2L, 2L, 2L, 0L, 0L, 1L, 1L}, + new Object[]{"Vienna", 0L, 1L, 2L, 3L, 0L, 1L, 2L, 3L, 2L, 2L, 2L, 3L}, + new Object[]{"Vienna", 1L, 2L, 3L, 3L, 1L, 2L, 3L, 3L, 1L, 1L, 2L, 2L}, + new Object[]{"Vienna", 2L, 3L, 3L, 3L, 2L, 2L, 2L, 2L, 0L, 0L, 1L, 1L} + )) + .run(); + } + private WindowOperatorQuery getWindowOperatorQuery(List> queries) { assertEquals(1, queries.size()); diff --git a/sql/src/test/resources/calcite/tests/window/wikipediaAggregationsMultipleOrdering.sqlTest b/sql/src/test/resources/calcite/tests/window/wikipediaAggregationsMultipleOrdering.sqlTest index 221f7a52bcd..b8da0dbd658 100644 --- a/sql/src/test/resources/calcite/tests/window/wikipediaAggregationsMultipleOrdering.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/wikipediaAggregationsMultipleOrdering.sqlTest @@ -18,7 +18,7 @@ expectedOperators: frame: peerType: "RANGE" lowUnbounded: false - lowOffset: 3 + lowOffset: -3 uppUnbounded: false uppOffset: 2 orderBy: diff --git a/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest b/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest index 12e5736951b..07cfadfb680 100644 --- a/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest @@ -17,7 +17,7 @@ expectedOperators: frame: peerType: "RANGE" lowUnbounded: false - lowOffset: 3 + lowOffset: -3 uppUnbounded: false uppOffset: 2 orderBy: [ {column: "d1", direction: ASC} ]