From b86f31f2c00fc07b585b556c4b0d0b910a074164 Mon Sep 17 00:00:00 2001 From: Soumyava <93540295+somu-imply@users.noreply.github.com> Date: Mon, 5 Feb 2024 21:42:20 -0800 Subject: [PATCH] Addressing shapeshifting issues with window functions (#15807) Addressing shapeshifting issues with window functions --- .../druid/frame/segment/FrameSegment.java | 14 ++++++++ .../SegmentToRowsAndColumnsOperator.java | 3 +- .../concrete/FrameRowsAndColumns.java | 9 ++++- .../druid/segment/ArrayListSegment.java | 2 +- .../druid/segment/QueryableIndexSegment.java | 2 +- .../segment/ReferenceCountingSegment.java | 6 ++++ .../org/apache/druid/segment/Segment.java | 5 +++ .../SegmentToRowsAndColumnsOperatorTest.java | 8 +++-- .../sql/calcite/CalciteWindowQueryTest.java | 23 +++++++++++++ .../tests/window/rank_handling.sqlTest | 34 +++++++++++++++++++ 10 files changed, 100 insertions(+), 6 deletions(-) create mode 100644 sql/src/test/resources/calcite/tests/window/rank_handling.sqlTest diff --git a/processing/src/main/java/org/apache/druid/frame/segment/FrameSegment.java b/processing/src/main/java/org/apache/druid/frame/segment/FrameSegment.java index 8cd510532e6..706385f6e9c 100644 --- a/processing/src/main/java/org/apache/druid/frame/segment/FrameSegment.java +++ b/processing/src/main/java/org/apache/druid/frame/segment/FrameSegment.java @@ -21,12 +21,15 @@ package org.apache.druid.frame.segment; import org.apache.druid.frame.Frame; import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns; +import org.apache.druid.segment.CloseableShapeshifter; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.Segment; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.timeline.SegmentId; import org.joda.time.Interval; +import javax.annotation.Nonnull; import javax.annotation.Nullable; /** @@ -77,4 +80,15 @@ public class FrameSegment implements Segment { // Nothing to close. } + + @SuppressWarnings("unchecked") + @Nullable + @Override + public T as(@Nonnull Class clazz) + { + if (CloseableShapeshifter.class.equals(clazz)) { + return (T) new FrameRowsAndColumns(frame, frameReader.signature()); + } + return Segment.super.as(clazz); + } } diff --git a/processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java b/processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java index 7155f655636..984623abd53 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java +++ b/processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java @@ -19,6 +19,7 @@ package org.apache.druid.query.operator; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.RE; import org.apache.druid.query.rowsandcols.RowsAndColumns; @@ -44,7 +45,7 @@ public class SegmentToRowsAndColumnsOperator implements Operator { try (final CloseableShapeshifter shifty = segment.as(CloseableShapeshifter.class)) { if (shifty == null) { - throw new ISE("Segment[%s] cannot shapeshift", segment.getClass()); + throw DruidException.defensive("Segment [%s] cannot shapeshift", segment.asString()); } RowsAndColumns rac; if (shifty instanceof RowsAndColumns) { diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java index 59a97f12bf5..16dd988b77d 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.segment.CloseableShapeshifter; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; @@ -36,7 +37,7 @@ import javax.annotation.Nullable; import java.util.Collection; import java.util.LinkedHashMap; -public class FrameRowsAndColumns implements RowsAndColumns +public class FrameRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter { private final Frame frame; private final RowSignature signature; @@ -91,4 +92,10 @@ public class FrameRowsAndColumns implements RowsAndColumns } return null; } + + @Override + public void close() + { + // nothing to close + } } diff --git a/processing/src/main/java/org/apache/druid/segment/ArrayListSegment.java b/processing/src/main/java/org/apache/druid/segment/ArrayListSegment.java index 0e597e29347..e7423b5b89d 100644 --- a/processing/src/main/java/org/apache/druid/segment/ArrayListSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/ArrayListSegment.java @@ -109,7 +109,7 @@ public class ArrayListSegment implements Segment if (CloseableShapeshifter.class.equals(clazz)) { return (T) new MyCloseableShapeshifter(); } - return null; + return Segment.super.as(clazz); } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java b/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java index f145c1db9cd..9d75748b416 100644 --- a/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java @@ -81,6 +81,6 @@ public class QueryableIndexSegment implements Segment return (T) new QueryableIndexRowsAndColumns(index); } - return null; + return Segment.super.as(clazz); } } diff --git a/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java b/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java index 55e390463b5..b6ce3860b2f 100644 --- a/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java @@ -184,4 +184,10 @@ public class ReferenceCountingSegment extends ReferenceCountingCloseableObject> queries) { assertEquals(1, queries.size()); diff --git a/sql/src/test/resources/calcite/tests/window/rank_handling.sqlTest b/sql/src/test/resources/calcite/tests/window/rank_handling.sqlTest new file mode 100644 index 00000000000..0e66ed87460 --- /dev/null +++ b/sql/src/test/resources/calcite/tests/window/rank_handling.sqlTest @@ -0,0 +1,34 @@ +type: "operatorValidation" + +sql: | + SELECT + __time + , dim1 + , m1 + , sum(m2) as summ2 + , RANK() OVER (PARTITION BY __time ORDER BY sum(m2) DESC) AS rank1 + FROM foo + WHERE m1 IN (5,6) + GROUP BY + __time, + dim1, + m1 + +expectedOperators: + - type: "naiveSort" + columns: + - column: "d0" + direction: "ASC" + - column: "a0" + direction: "DESC" + - { type: "naivePartition", partitionColumns: [ d0 ] } + - type: "window" + processor: + type: "rank" + group: [ a0 ] + outputColumn: w0 + asPercent: false + +expectedResults: + - [ 978393600000, "def", 5, 5, 1 ] + - [ 978480000000, "abc", 6, 6, 1 ]