Addressing shapeshifting issues with window functions (#15807)

Addressing shapeshifting issues with window functions
This commit is contained in:
Soumyava 2024-02-05 21:42:20 -08:00 committed by GitHub
parent 392d585ff8
commit b86f31f2c0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 100 additions and 6 deletions

View File

@ -21,12 +21,15 @@ package org.apache.druid.frame.segment;
import org.apache.druid.frame.Frame; import org.apache.druid.frame.Frame;
import org.apache.druid.frame.read.FrameReader; import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
import org.apache.druid.segment.CloseableShapeshifter;
import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.Segment; import org.apache.druid.segment.Segment;
import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
/** /**
@ -77,4 +80,15 @@ public class FrameSegment implements Segment
{ {
// Nothing to close. // Nothing to close.
} }
@SuppressWarnings("unchecked")
@Nullable
@Override
public <T> T as(@Nonnull Class<T> clazz)
{
if (CloseableShapeshifter.class.equals(clazz)) {
return (T) new FrameRowsAndColumns(frame, frameReader.signature());
}
return Segment.super.as(clazz);
}
} }

View File

@ -19,6 +19,7 @@
package org.apache.druid.query.operator; package org.apache.druid.query.operator;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.RE;
import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns;
@ -44,7 +45,7 @@ public class SegmentToRowsAndColumnsOperator implements Operator
{ {
try (final CloseableShapeshifter shifty = segment.as(CloseableShapeshifter.class)) { try (final CloseableShapeshifter shifty = segment.as(CloseableShapeshifter.class)) {
if (shifty == null) { if (shifty == null) {
throw new ISE("Segment[%s] cannot shapeshift", segment.getClass()); throw DruidException.defensive("Segment [%s] cannot shapeshift", segment.asString());
} }
RowsAndColumns rac; RowsAndColumns rac;
if (shifty instanceof RowsAndColumns) { if (shifty instanceof RowsAndColumns) {

View File

@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.segment.CloseableShapeshifter;
import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.RowSignature;
@ -36,7 +37,7 @@ import javax.annotation.Nullable;
import java.util.Collection; import java.util.Collection;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
public class FrameRowsAndColumns implements RowsAndColumns public class FrameRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter
{ {
private final Frame frame; private final Frame frame;
private final RowSignature signature; private final RowSignature signature;
@ -91,4 +92,10 @@ public class FrameRowsAndColumns implements RowsAndColumns
} }
return null; return null;
} }
@Override
public void close()
{
// nothing to close
}
} }

View File

@ -109,7 +109,7 @@ public class ArrayListSegment<RowType> implements Segment
if (CloseableShapeshifter.class.equals(clazz)) { if (CloseableShapeshifter.class.equals(clazz)) {
return (T) new MyCloseableShapeshifter(); return (T) new MyCloseableShapeshifter();
} }
return null; return Segment.super.as(clazz);
} }
@Override @Override

View File

@ -81,6 +81,6 @@ public class QueryableIndexSegment implements Segment
return (T) new QueryableIndexRowsAndColumns(index); return (T) new QueryableIndexRowsAndColumns(index);
} }
return null; return Segment.super.as(clazz);
} }
} }

View File

@ -184,4 +184,10 @@ public class ReferenceCountingSegment extends ReferenceCountingCloseableObject<S
} }
return baseObject.as(clazz); return baseObject.as(clazz);
} }
@Override
public String asString()
{
return baseObject.asString();
}
} }

View File

@ -69,4 +69,9 @@ public interface Segment extends Closeable
} }
return null; return null;
} }
default String asString()
{
return getClass().toString();
}
} }

View File

@ -20,6 +20,7 @@
package org.apache.druid.query.operator; package org.apache.druid.query.operator;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.RE;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper; import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
@ -88,8 +89,11 @@ public class SegmentToRowsAndColumnsOperatorTest
try { try {
Operator.go(op, new ExceptionalReceiver()); Operator.go(op, new ExceptionalReceiver());
} }
catch (ISE e) { catch (DruidException e) {
Assert.assertEquals(e.getMessage(), "Segment[class org.apache.druid.segment.TestSegmentForAs] cannot shapeshift"); Assert.assertEquals(
e.getMessage(),
"Segment [class org.apache.druid.segment.TestSegmentForAs] cannot shapeshift"
);
exceptionThrown = true; exceptionThrown = true;
} }
Assert.assertTrue(exceptionThrown); Assert.assertTrue(exceptionThrown);

View File

@ -222,6 +222,29 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
} }
} }
@Test
@SuppressWarnings("unchecked")
public void windowQueryTestWithCustomContextMaxSubqueryBytes() throws Exception
{
TestCase testCase = new TestCase(filename);
assumeThat(testCase.getType(), Matchers.not(TestType.failingTest));
if (testCase.getType() == TestType.operatorValidation) {
testBuilder()
.skipVectorize(true)
.sql(testCase.getSql())
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
QueryContexts.ENABLE_DEBUG, true,
QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000",
QueryContexts.WINDOWING_STRICT_VALIDATION, false
)
)
.addCustomVerification(QueryVerification.ofResults(testCase))
.run();
}
}
private WindowOperatorQuery getWindowOperatorQuery(List<Query<?>> queries) private WindowOperatorQuery getWindowOperatorQuery(List<Query<?>> queries)
{ {
assertEquals(1, queries.size()); assertEquals(1, queries.size());

View File

@ -0,0 +1,34 @@
type: "operatorValidation"
sql: |
SELECT
__time
, dim1
, m1
, sum(m2) as summ2
, RANK() OVER (PARTITION BY __time ORDER BY sum(m2) DESC) AS rank1
FROM foo
WHERE m1 IN (5,6)
GROUP BY
__time,
dim1,
m1
expectedOperators:
- type: "naiveSort"
columns:
- column: "d0"
direction: "ASC"
- column: "a0"
direction: "DESC"
- { type: "naivePartition", partitionColumns: [ d0 ] }
- type: "window"
processor:
type: "rank"
group: [ a0 ]
outputColumn: w0
asPercent: false
expectedResults:
- [ 978393600000, "def", 5, 5, 1 ]
- [ 978480000000, "abc", 6, 6, 1 ]