MSQ window functions: Reject MVDs during window processing (#17036)

* MSQ window functions: Reject MVDs during window processing

* MSQ window functions: Reject MVDs during window processing

* Remove parameterization from MSQWindowTest
This commit is contained in:
Akshat Jain 2024-09-23 11:39:35 +05:30 committed by GitHub
parent df680bab05
commit 40414cfe78
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 200 additions and 190 deletions

View File

@ -34,6 +34,7 @@ import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.util.SettableLongVirtualColumn;
import org.apache.druid.frame.write.FrameWriter;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.Unit;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.indexing.error.MSQException;
@ -54,6 +55,7 @@ import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.NullableTypeStrategy;
import org.apache.druid.segment.column.RowSignature;
@ -451,6 +453,14 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
int match = 0;
for (String columnName : partitionColumnNames) {
int i = frameReader.signature().indexOf(columnName);
if (ColumnType.STRING.equals(frameReader.signature().getColumnType(columnName).get()) && (row1.get(i) instanceof List || row2.get(i) instanceof List)) {
// special handling to reject MVDs
throw new UOE(
"Encountered a multi value column [%s]. Window processing does not support MVDs. "
+ "Consider using UNNEST or MV_TO_ARRAY.",
columnName
);
}
if (typeStrategies[i].compare(row1.get(i), row2.get(i)) == 0) {
match++;
}

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.msq.indexing.MSQSpec;
@ -64,11 +65,10 @@ import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.timeline.SegmentId;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.hamcrest.CoreMatchers;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -77,19 +77,8 @@ import java.util.Map;
public class MSQWindowTest extends MSQTestBase
{
public static Collection<Object[]> data()
{
Object[][] data = new Object[][]{
{DEFAULT, DEFAULT_MSQ_CONTEXT}
};
return Arrays.asList(data);
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testWindowOnFooWithPartitionByAndInnerGroupBy(String contextName, Map<String, Object> context)
@Test
public void testWindowOnFooWithPartitionByAndInnerGroupBy()
{
RowSignature rowSignature = RowSignature.builder()
.add("m1", ColumnType.FLOAT)
@ -108,7 +97,7 @@ public class MSQWindowTest extends MSQTestBase
ColumnType.FLOAT
)
))
.setContext(context)
.setContext(DEFAULT_MSQ_CONTEXT)
.build();
@ -121,7 +110,7 @@ public class MSQWindowTest extends MSQTestBase
final WindowOperatorQuery query = new WindowOperatorQuery(
new QueryDataSource(groupByQuery),
new LegacySegmentSpec(Intervals.ETERNITY),
context,
DEFAULT_MSQ_CONTEXT,
RowSignature.builder().add("d0", ColumnType.FLOAT).add("w0", ColumnType.DOUBLE).build(),
ImmutableList.of(
new NaiveSortOperatorFactory(ImmutableList.of(ColumnWithDirection.ascending("d0"))),
@ -151,7 +140,7 @@ public class MSQWindowTest extends MSQTestBase
new Object[]{5.0f, 5.0},
new Object[]{6.0f, 6.0}
))
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
@ -170,9 +159,8 @@ public class MSQWindowTest extends MSQTestBase
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testWindowOnFooWithFirstWindowPartitionNextWindowEmpty(String contextName, Map<String, Object> context)
@Test
public void testWindowOnFooWithFirstWindowPartitionNextWindowEmpty()
{
RowSignature rowSignature = RowSignature.builder()
.add("m1", ColumnType.FLOAT)
@ -198,7 +186,7 @@ public class MSQWindowTest extends MSQTestBase
ColumnType.DOUBLE
)
))
.setContext(context)
.setContext(DEFAULT_MSQ_CONTEXT)
.build();
@ -215,7 +203,7 @@ public class MSQWindowTest extends MSQTestBase
final WindowOperatorQuery query = new WindowOperatorQuery(
new QueryDataSource(groupByQuery),
new LegacySegmentSpec(Intervals.ETERNITY),
context,
DEFAULT_MSQ_CONTEXT,
RowSignature.builder()
.add("d0", ColumnType.FLOAT)
.add("d1", ColumnType.DOUBLE)
@ -258,7 +246,7 @@ public class MSQWindowTest extends MSQTestBase
new Object[]{5.0f, 5.0, 5.0, 21.0},
new Object[]{6.0f, 6.0, 6.0, 21.0}
))
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
@ -277,12 +265,8 @@ public class MSQWindowTest extends MSQTestBase
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testWindowOnFooWith2WindowsBothWindowsHavingPartitionByInnerGroupBy(
String contextName,
Map<String, Object> context
)
@Test
public void testWindowOnFooWith2WindowsBothWindowsHavingPartitionByInnerGroupBy()
{
RowSignature rowSignature = RowSignature.builder()
.add("m1", ColumnType.FLOAT)
@ -308,7 +292,7 @@ public class MSQWindowTest extends MSQTestBase
ColumnType.DOUBLE
)
))
.setContext(context)
.setContext(DEFAULT_MSQ_CONTEXT)
.build();
@ -325,7 +309,7 @@ public class MSQWindowTest extends MSQTestBase
final WindowOperatorQuery query = new WindowOperatorQuery(
new QueryDataSource(groupByQuery),
new LegacySegmentSpec(Intervals.ETERNITY),
context,
DEFAULT_MSQ_CONTEXT,
RowSignature.builder()
.add("d0", ColumnType.FLOAT)
.add("d1", ColumnType.DOUBLE)
@ -372,7 +356,7 @@ public class MSQWindowTest extends MSQTestBase
new Object[]{5.0f, 5.0, 5.0, 5.0},
new Object[]{6.0f, 6.0, 6.0, 6.0}
))
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
@ -391,12 +375,8 @@ public class MSQWindowTest extends MSQTestBase
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testWindowOnFooWith2WindowsBothPartitionByWithOrderReversed(
String contextName,
Map<String, Object> context
)
@Test
public void testWindowOnFooWith2WindowsBothPartitionByWithOrderReversed()
{
RowSignature rowSignature = RowSignature.builder()
.add("m1", ColumnType.FLOAT)
@ -421,7 +401,7 @@ public class MSQWindowTest extends MSQTestBase
ColumnType.DOUBLE
)
))
.setContext(context)
.setContext(DEFAULT_MSQ_CONTEXT)
.build();
@ -438,7 +418,7 @@ public class MSQWindowTest extends MSQTestBase
final WindowOperatorQuery query = new WindowOperatorQuery(
new QueryDataSource(groupByQuery),
new LegacySegmentSpec(Intervals.ETERNITY),
context,
DEFAULT_MSQ_CONTEXT,
RowSignature.builder()
.add("d0", ColumnType.FLOAT)
.add("d1", ColumnType.DOUBLE)
@ -485,7 +465,7 @@ public class MSQWindowTest extends MSQTestBase
new Object[]{5.0f, 5.0, 5.0, 5.0},
new Object[]{6.0f, 6.0, 6.0, 6.0}
))
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
@ -504,9 +484,8 @@ public class MSQWindowTest extends MSQTestBase
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testWindowOnFooWithEmptyOverWithGroupBy(String contextName, Map<String, Object> context)
@Test
public void testWindowOnFooWithEmptyOverWithGroupBy()
{
RowSignature rowSignature = RowSignature.builder()
.add("m1", ColumnType.FLOAT)
@ -525,7 +504,7 @@ public class MSQWindowTest extends MSQTestBase
ColumnType.FLOAT
)
))
.setContext(context)
.setContext(DEFAULT_MSQ_CONTEXT)
.build();
@ -538,7 +517,7 @@ public class MSQWindowTest extends MSQTestBase
final WindowOperatorQuery query = new WindowOperatorQuery(
new QueryDataSource(groupByQuery),
new LegacySegmentSpec(Intervals.ETERNITY),
context,
DEFAULT_MSQ_CONTEXT,
RowSignature.builder().add("d0", ColumnType.FLOAT).add("w0", ColumnType.DOUBLE).build(),
ImmutableList.of(
new NaivePartitioningOperatorFactory(ImmutableList.of()),
@ -567,7 +546,7 @@ public class MSQWindowTest extends MSQTestBase
new Object[]{5.0f, 21.0},
new Object[]{6.0f, 21.0}
))
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
@ -586,9 +565,8 @@ public class MSQWindowTest extends MSQTestBase
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testWindowOnFooWithNoGroupByAndPartition(String contextName, Map<String, Object> context)
@Test
public void testWindowOnFooWithNoGroupByAndPartition()
{
RowSignature rowSignature = RowSignature.builder()
.add("m1", ColumnType.FLOAT)
@ -603,7 +581,7 @@ public class MSQWindowTest extends MSQTestBase
final Map<String, Object> contextWithRowSignature =
ImmutableMap.<String, Object>builder()
.putAll(context)
.putAll(DEFAULT_MSQ_CONTEXT)
.put(DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"m1\",\"type\":\"FLOAT\"}]")
.build();
@ -617,7 +595,7 @@ public class MSQWindowTest extends MSQTestBase
.context(contextWithRowSignature)
.build()),
new LegacySegmentSpec(Intervals.ETERNITY),
context,
DEFAULT_MSQ_CONTEXT,
RowSignature.builder().add("m1", ColumnType.FLOAT).add("w0", ColumnType.DOUBLE).build(),
ImmutableList.of(
new NaiveSortOperatorFactory(ImmutableList.of(ColumnWithDirection.ascending("m1"))),
@ -647,13 +625,12 @@ public class MSQWindowTest extends MSQTestBase
new Object[]{5.0f, 5.0},
new Object[]{6.0f, 6.0}
))
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testWindowOnFooWithNoGroupByAndPartitionOnTwoElements(String contextName, Map<String, Object> context)
@Test
public void testWindowOnFooWithNoGroupByAndPartitionOnTwoElements()
{
RowSignature rowSignature = RowSignature.builder()
.add("m1", ColumnType.FLOAT)
@ -668,7 +645,7 @@ public class MSQWindowTest extends MSQTestBase
final Map<String, Object> contextWithRowSignature =
ImmutableMap.<String, Object>builder()
.putAll(context)
.putAll(DEFAULT_MSQ_CONTEXT)
.put(
DruidQuery.CTX_SCAN_SIGNATURE,
"[{\"name\":\"m1\",\"type\":\"FLOAT\"},{\"name\":\"m2\",\"type\":\"DOUBLE\"}]"
@ -685,7 +662,7 @@ public class MSQWindowTest extends MSQTestBase
.context(contextWithRowSignature)
.build()),
new LegacySegmentSpec(Intervals.ETERNITY),
context,
DEFAULT_MSQ_CONTEXT,
RowSignature.builder().add("m1", ColumnType.FLOAT).add("w0", ColumnType.DOUBLE).build(),
ImmutableList.of(
new NaiveSortOperatorFactory(ImmutableList.of(
@ -718,13 +695,12 @@ public class MSQWindowTest extends MSQTestBase
new Object[]{5.0f, 5.0},
new Object[]{6.0f, 6.0}
))
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testWindowOnFooWithNoGroupByAndPartitionByAnother(String contextName, Map<String, Object> context)
@Test
public void testWindowOnFooWithNoGroupByAndPartitionByAnother()
{
RowSignature rowSignature = RowSignature.builder()
.add("m1", ColumnType.FLOAT)
@ -739,7 +715,7 @@ public class MSQWindowTest extends MSQTestBase
final Map<String, Object> contextWithRowSignature =
ImmutableMap.<String, Object>builder()
.putAll(context)
.putAll(DEFAULT_MSQ_CONTEXT)
.put(
DruidQuery.CTX_SCAN_SIGNATURE,
"[{\"name\":\"m1\",\"type\":\"FLOAT\"},{\"name\":\"m2\",\"type\":\"DOUBLE\"}]"
@ -756,7 +732,7 @@ public class MSQWindowTest extends MSQTestBase
.context(contextWithRowSignature)
.build()),
new LegacySegmentSpec(Intervals.ETERNITY),
context,
DEFAULT_MSQ_CONTEXT,
RowSignature.builder().add("m1", ColumnType.FLOAT).add("w0", ColumnType.DOUBLE).build(),
ImmutableList.of(
new NaiveSortOperatorFactory(ImmutableList.of(ColumnWithDirection.ascending("m2"))),
@ -786,13 +762,12 @@ public class MSQWindowTest extends MSQTestBase
new Object[]{5.0f, 5.0},
new Object[]{6.0f, 6.0}
))
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testWindowOnFooWithGroupByAndInnerLimit(String contextName, Map<String, Object> context)
@Test
public void testWindowOnFooWithGroupByAndInnerLimit()
{
RowSignature rowSignature = RowSignature.builder()
.add("m1", ColumnType.FLOAT)
@ -825,10 +800,10 @@ public class MSQWindowTest extends MSQTestBase
)
))
.setLimit(5)
.setContext(context)
.setContext(DEFAULT_MSQ_CONTEXT)
.build()),
new LegacySegmentSpec(Intervals.ETERNITY),
context,
DEFAULT_MSQ_CONTEXT,
RowSignature.builder().add("d0", ColumnType.FLOAT).add("w0", ColumnType.DOUBLE).build(),
ImmutableList.of(
new NaivePartitioningOperatorFactory(ImmutableList.of()),
@ -861,17 +836,16 @@ public class MSQWindowTest extends MSQTestBase
new Object[]{4.0f, 15.0},
new Object[]{5.0f, 15.0}
))
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testWindowOnFooWithNoGroupByAndPartitionAndVirtualColumns(String contextName, Map<String, Object> context)
@Test
public void testWindowOnFooWithNoGroupByAndPartitionAndVirtualColumns()
{
final Map<String, Object> contextWithRowSignature =
ImmutableMap.<String, Object>builder()
.putAll(context)
.putAll(DEFAULT_MSQ_CONTEXT)
.put(
DruidQuery.CTX_SCAN_SIGNATURE,
"[{\"name\":\"m1\",\"type\":\"FLOAT\"},{\"name\":\"v0\",\"type\":\"LONG\"}]"
@ -901,7 +875,7 @@ public class MSQWindowTest extends MSQTestBase
.context(contextWithRowSignature)
.build()),
new LegacySegmentSpec(Intervals.ETERNITY),
context,
DEFAULT_MSQ_CONTEXT,
RowSignature.builder()
.add("v0", ColumnType.LONG)
.add("m1", ColumnType.FLOAT)
@ -936,19 +910,18 @@ public class MSQWindowTest extends MSQTestBase
new Object[]{3, 5.0f, 5.0},
new Object[]{3, 6.0f, 6.0}
))
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testWindowOnFooWithNoGroupByAndEmptyOver(String contextName, Map<String, Object> context)
@Test
public void testWindowOnFooWithNoGroupByAndEmptyOver()
{
final Map<String, Object> contextWithRowSignature =
ImmutableMap.<String, Object>builder()
.putAll(context)
.putAll(DEFAULT_MSQ_CONTEXT)
.put(DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"m1\",\"type\":\"FLOAT\"}]")
.build();
@ -973,7 +946,7 @@ public class MSQWindowTest extends MSQTestBase
.context(contextWithRowSignature)
.build()),
new LegacySegmentSpec(Intervals.ETERNITY),
context,
DEFAULT_MSQ_CONTEXT,
RowSignature.builder().add("m1", ColumnType.FLOAT).add("w0", ColumnType.DOUBLE).build(),
ImmutableList.of(
new NaivePartitioningOperatorFactory(ImmutableList.of()),
@ -1002,17 +975,16 @@ public class MSQWindowTest extends MSQTestBase
new Object[]{5.0f, 21.0},
new Object[]{6.0f, 21.0}
))
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testWindowOnFooWithPartitionByOrderBYWithJoin(String contextName, Map<String, Object> context)
@Test
public void testWindowOnFooWithPartitionByOrderBYWithJoin()
{
final Map<String, Object> contextWithRowSignature =
ImmutableMap.<String, Object>builder()
.putAll(context)
.putAll(DEFAULT_MSQ_CONTEXT)
.put(
DruidQuery.CTX_SCAN_SIGNATURE,
"[{\"name\":\"j0.m2\",\"type\":\"DOUBLE\"},{\"name\":\"m1\",\"type\":\"FLOAT\"}]"
@ -1021,7 +993,7 @@ public class MSQWindowTest extends MSQTestBase
final Map<String, Object> contextWithRowSignature1 =
ImmutableMap.<String, Object>builder()
.putAll(context)
.putAll(DEFAULT_MSQ_CONTEXT)
.put(
DruidQuery.CTX_SCAN_SIGNATURE,
"[{\"name\":\"m2\",\"type\":\"DOUBLE\"},{\"name\":\"v0\",\"type\":\"FLOAT\"}]"
@ -1070,7 +1042,7 @@ public class MSQWindowTest extends MSQTestBase
.context(contextWithRowSignature)
.build()),
new LegacySegmentSpec(Intervals.ETERNITY),
context,
DEFAULT_MSQ_CONTEXT,
RowSignature.builder()
.add("m1", ColumnType.FLOAT)
.add("w0", ColumnType.DOUBLE)
@ -1106,17 +1078,16 @@ public class MSQWindowTest extends MSQTestBase
new Object[]{5.0f, 5.0, 5.0},
new Object[]{6.0f, 6.0, 6.0}
))
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testWindowOnFooWithEmptyOverWithJoin(String contextName, Map<String, Object> context)
@Test
public void testWindowOnFooWithEmptyOverWithJoin()
{
final Map<String, Object> contextWithRowSignature =
ImmutableMap.<String, Object>builder()
.putAll(context)
.putAll(DEFAULT_MSQ_CONTEXT)
.put(
DruidQuery.CTX_SCAN_SIGNATURE,
"[{\"name\":\"j0.m2\",\"type\":\"DOUBLE\"},{\"name\":\"m1\",\"type\":\"FLOAT\"}]"
@ -1125,7 +1096,7 @@ public class MSQWindowTest extends MSQTestBase
final Map<String, Object> contextWithRowSignature1 =
ImmutableMap.<String, Object>builder()
.putAll(context)
.putAll(DEFAULT_MSQ_CONTEXT)
.put(
DruidQuery.CTX_SCAN_SIGNATURE,
"[{\"name\":\"m2\",\"type\":\"DOUBLE\"},{\"name\":\"v0\",\"type\":\"FLOAT\"}]"
@ -1174,7 +1145,7 @@ public class MSQWindowTest extends MSQTestBase
.context(contextWithRowSignature)
.build()),
new LegacySegmentSpec(Intervals.ETERNITY),
context,
DEFAULT_MSQ_CONTEXT,
RowSignature.builder()
.add("m1", ColumnType.FLOAT)
.add("w0", ColumnType.DOUBLE)
@ -1209,13 +1180,12 @@ public class MSQWindowTest extends MSQTestBase
new Object[]{5.0f, 21.0, 5.0},
new Object[]{6.0f, 21.0, 6.0}
))
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testWindowOnFooWithDim2(String contextName, Map<String, Object> context)
@Test
public void testWindowOnFooWithDim2()
{
RowSignature rowSignature = RowSignature.builder()
.add("dim2", ColumnType.STRING)
@ -1230,7 +1200,7 @@ public class MSQWindowTest extends MSQTestBase
final Map<String, Object> contextWithRowSignature =
ImmutableMap.<String, Object>builder()
.putAll(context)
.putAll(DEFAULT_MSQ_CONTEXT)
.put(
DruidQuery.CTX_SCAN_SIGNATURE,
"[{\"name\":\"dim2\",\"type\":\"STRING\"},{\"name\":\"m1\",\"type\":\"FLOAT\"}]"
@ -1247,7 +1217,7 @@ public class MSQWindowTest extends MSQTestBase
.context(contextWithRowSignature)
.build()),
new LegacySegmentSpec(Intervals.ETERNITY),
context,
DEFAULT_MSQ_CONTEXT,
RowSignature.builder().add("dim2", ColumnType.STRING).add("w0", ColumnType.DOUBLE).build(),
ImmutableList.of(
new NaiveSortOperatorFactory(ImmutableList.of(ColumnWithDirection.ascending("dim2"))),
@ -1287,17 +1257,16 @@ public class MSQWindowTest extends MSQTestBase
new Object[]{"abc", 5.0},
new Object[]{null, 8.0}
))
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testWindowOnFooWithEmptyOverWithUnnest(String contextName, Map<String, Object> context)
@Test
public void testWindowOnFooWithEmptyOverWithUnnest()
{
final Map<String, Object> contextWithRowSignature =
ImmutableMap.<String, Object>builder()
.putAll(context)
.putAll(DEFAULT_MSQ_CONTEXT)
.put(
DruidQuery.CTX_SCAN_SIGNATURE,
"[{\"name\":\"j0.unnest\",\"type\":\"STRING\"},{\"name\":\"m1\",\"type\":\"FLOAT\"}]"
@ -1333,7 +1302,7 @@ public class MSQWindowTest extends MSQTestBase
.context(contextWithRowSignature)
.build()),
new LegacySegmentSpec(Intervals.ETERNITY),
context,
DEFAULT_MSQ_CONTEXT,
RowSignature.builder()
.add("m1", ColumnType.FLOAT)
.add("w0", ColumnType.DOUBLE)
@ -1370,17 +1339,16 @@ public class MSQWindowTest extends MSQTestBase
new Object[]{5.0f, 24.0, NullHandling.sqlCompatible() ? null : ""},
new Object[]{6.0f, 24.0, NullHandling.sqlCompatible() ? null : ""}
))
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testWindowOnFooWithPartitionByAndWithUnnest(String contextName, Map<String, Object> context)
@Test
public void testWindowOnFooWithPartitionByAndWithUnnest()
{
final Map<String, Object> contextWithRowSignature =
ImmutableMap.<String, Object>builder()
.putAll(context)
.putAll(DEFAULT_MSQ_CONTEXT)
.put(
DruidQuery.CTX_SCAN_SIGNATURE,
"[{\"name\":\"j0.unnest\",\"type\":\"STRING\"},{\"name\":\"m1\",\"type\":\"FLOAT\"}]"
@ -1416,7 +1384,7 @@ public class MSQWindowTest extends MSQTestBase
.context(contextWithRowSignature)
.build()),
new LegacySegmentSpec(Intervals.ETERNITY),
context,
DEFAULT_MSQ_CONTEXT,
RowSignature.builder()
.add("m1", ColumnType.FLOAT)
.add("w0", ColumnType.DOUBLE)
@ -1454,14 +1422,13 @@ public class MSQWindowTest extends MSQTestBase
new Object[]{5.0f, 5.0, NullHandling.sqlCompatible() ? null : ""},
new Object[]{6.0f, 6.0, NullHandling.sqlCompatible() ? null : ""}
))
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.verifyResults();
}
// Insert Tests
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testInsertWithWindow(String contextName, Map<String, Object> context)
@Test
public void testInsertWithWindow()
{
List<Object[]> expectedRows = ImmutableList.of(
new Object[]{946684800000L, 1.0f, 1.0},
@ -1484,7 +1451,7 @@ public class MSQWindowTest extends MSQTestBase
+ "SUM(m1) OVER(PARTITION BY m1) as summ1\n"
+ "from foo\n"
+ "GROUP BY __time, m1 PARTITIONED BY ALL")
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedResultRows(expectedRows)
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
@ -1492,9 +1459,8 @@ public class MSQWindowTest extends MSQTestBase
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testInsertWithWindowEmptyOver(String contextName, Map<String, Object> context)
@Test
public void testInsertWithWindowEmptyOver()
{
List<Object[]> expectedRows = ImmutableList.of(
new Object[]{946684800000L, 1.0f, 21.0},
@ -1517,7 +1483,7 @@ public class MSQWindowTest extends MSQTestBase
+ "SUM(m1) OVER() as summ1\n"
+ "from foo\n"
+ "GROUP BY __time, m1 PARTITIONED BY ALL")
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedResultRows(expectedRows)
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
@ -1525,9 +1491,8 @@ public class MSQWindowTest extends MSQTestBase
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testInsertWithWindowPartitionByOrderBy(String contextName, Map<String, Object> context)
@Test
public void testInsertWithWindowPartitionByOrderBy()
{
List<Object[]> expectedRows = ImmutableList.of(
new Object[]{946684800000L, 1.0f, 1.0},
@ -1550,7 +1515,7 @@ public class MSQWindowTest extends MSQTestBase
+ "SUM(m1) OVER(PARTITION BY m1 ORDER BY m1 ASC) as summ1\n"
+ "from foo\n"
+ "GROUP BY __time, m1 PARTITIONED BY ALL")
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedResultRows(expectedRows)
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
@ -1560,9 +1525,8 @@ public class MSQWindowTest extends MSQTestBase
// Replace Tests
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testReplaceWithWindowsAndUnnest(String contextName, Map<String, Object> context)
@Test
public void testReplaceWithWindowsAndUnnest()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
@ -1576,7 +1540,7 @@ public class MSQWindowTest extends MSQTestBase
+ "PARTITIONED BY ALL CLUSTERED BY m1")
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
.setExpectedResultRows(
ImmutableList.of(
@ -1594,9 +1558,8 @@ public class MSQWindowTest extends MSQTestBase
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testSimpleWindowWithPartitionBy(String contextName, Map<String, Object> context)
@Test
public void testSimpleWindowWithPartitionBy()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
@ -1609,7 +1572,7 @@ public class MSQWindowTest extends MSQTestBase
+ "PARTITIONED BY ALL CLUSTERED BY m1")
.setExpectedDataSource("foo")
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
.setExpectedResultRows(
ImmutableList.of(
@ -1625,9 +1588,8 @@ public class MSQWindowTest extends MSQTestBase
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testSimpleWindowWithEmptyOver(String contextName, Map<String, Object> context)
@Test
public void testSimpleWindowWithEmptyOver()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
@ -1640,7 +1602,7 @@ public class MSQWindowTest extends MSQTestBase
+ "PARTITIONED BY ALL CLUSTERED BY m1")
.setExpectedDataSource("foo")
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
.setExpectedResultRows(
ImmutableList.of(
@ -1656,9 +1618,8 @@ public class MSQWindowTest extends MSQTestBase
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testSimpleWindowWithEmptyOverNoGroupBy(String contextName, Map<String, Object> context)
@Test
public void testSimpleWindowWithEmptyOverNoGroupBy()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
@ -1671,7 +1632,7 @@ public class MSQWindowTest extends MSQTestBase
+ "PARTITIONED BY ALL CLUSTERED BY m1")
.setExpectedDataSource("foo")
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
.setExpectedResultRows(
ImmutableList.of(
@ -1687,9 +1648,8 @@ public class MSQWindowTest extends MSQTestBase
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testSimpleWindowWithDuplicateSelectNode(String contextName, Map<String, Object> context)
@Test
public void testSimpleWindowWithDuplicateSelectNode()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
@ -1703,7 +1663,7 @@ public class MSQWindowTest extends MSQTestBase
+ "PARTITIONED BY ALL CLUSTERED BY m1")
.setExpectedDataSource("foo")
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
.setExpectedResultRows(
ImmutableList.of(
@ -1719,9 +1679,8 @@ public class MSQWindowTest extends MSQTestBase
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testSimpleWindowWithJoins(String contextName, Map<String, Object> context)
@Test
public void testSimpleWindowWithJoins()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
@ -1735,7 +1694,7 @@ public class MSQWindowTest extends MSQTestBase
+ "PARTITIONED BY DAY CLUSTERED BY m1")
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
.setExpectedResultRows(
ImmutableList.of(
@ -1761,9 +1720,8 @@ public class MSQWindowTest extends MSQTestBase
}
// Bigger dataset tests
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testSelectWithWikipedia(String contextName, Map<String, Object> context)
@Test
public void testSelectWithWikipedia()
{
RowSignature rowSignature = RowSignature.builder()
.add("cityName", ColumnType.STRING)
@ -1779,7 +1737,7 @@ public class MSQWindowTest extends MSQTestBase
final Map<String, Object> contextWithRowSignature =
ImmutableMap.<String, Object>builder()
.putAll(context)
.putAll(DEFAULT_MSQ_CONTEXT)
.put(
DruidQuery.CTX_SCAN_SIGNATURE,
"[{\"name\":\"added\",\"type\":\"LONG\"},{\"name\":\"cityName\",\"type\":\"STRING\"}]"
@ -1797,7 +1755,7 @@ public class MSQWindowTest extends MSQTestBase
.context(contextWithRowSignature)
.build()),
new LegacySegmentSpec(Intervals.ETERNITY),
context,
DEFAULT_MSQ_CONTEXT,
RowSignature.builder().add("cityName", ColumnType.STRING)
.add("added", ColumnType.LONG)
.add("w0", ColumnType.LONG).build(),
@ -1830,17 +1788,16 @@ public class MSQWindowTest extends MSQTestBase
new Object[]{"Albuquerque", 9L, 140L},
new Object[]{"Albuquerque", 2L, 140L}
))
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testSelectWithWikipediaEmptyOverWithCustomContext(String contextName, Map<String, Object> context)
@Test
public void testSelectWithWikipediaEmptyOverWithCustomContext()
{
final Map<String, Object> customContext =
ImmutableMap.<String, Object>builder()
.putAll(context)
.putAll(DEFAULT_MSQ_CONTEXT)
.put(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW, 200)
.build();
@ -1852,9 +1809,8 @@ public class MSQWindowTest extends MSQTestBase
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testSelectWithWikipediaWithPartitionKeyNotInSelect(String contextName, Map<String, Object> context)
@Test
public void testSelectWithWikipediaWithPartitionKeyNotInSelect()
{
RowSignature rowSignature = RowSignature.builder()
.add("cityName", ColumnType.STRING)
@ -1870,7 +1826,7 @@ public class MSQWindowTest extends MSQTestBase
final Map<String, Object> innerContextWithRowSignature =
ImmutableMap.<String, Object>builder()
.putAll(context)
.putAll(DEFAULT_MSQ_CONTEXT)
.put(
DruidQuery.CTX_SCAN_SIGNATURE,
"[{\"name\":\"added\",\"type\":\"LONG\"},{\"name\":\"cityName\",\"type\":\"STRING\"},{\"name\":\"countryIsoCode\",\"type\":\"STRING\"}]"
@ -1888,7 +1844,7 @@ public class MSQWindowTest extends MSQTestBase
.context(innerContextWithRowSignature)
.build()),
new LegacySegmentSpec(Intervals.ETERNITY),
context,
DEFAULT_MSQ_CONTEXT,
RowSignature.builder().add("cityName", ColumnType.STRING)
.add("added", ColumnType.LONG)
.add("w0", ColumnType.LONG).build(),
@ -1902,7 +1858,7 @@ public class MSQWindowTest extends MSQTestBase
final Map<String, Object> outerContextWithRowSignature =
ImmutableMap.<String, Object>builder()
.putAll(context)
.putAll(DEFAULT_MSQ_CONTEXT)
.put(
DruidQuery.CTX_SCAN_SIGNATURE,
"[{\"name\":\"added\",\"type\":\"LONG\"},{\"name\":\"cityName\",\"type\":\"STRING\"},{\"name\":\"w0\",\"type\":\"LONG\"}]"
@ -1941,13 +1897,12 @@ public class MSQWindowTest extends MSQTestBase
new Object[]{"Tokyo", 0L, 12615L},
new Object[]{"Santiago", 161L, 401L}
))
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testGroupByWithWikipedia(String contextName, Map<String, Object> context)
@Test
public void testGroupByWithWikipedia()
{
RowSignature rowSignature = RowSignature.builder()
.add("cityName", ColumnType.STRING)
@ -1972,7 +1927,7 @@ public class MSQWindowTest extends MSQTestBase
ColumnType.LONG
)
))
.setContext(context)
.setContext(DEFAULT_MSQ_CONTEXT)
.build();
@ -1985,7 +1940,7 @@ public class MSQWindowTest extends MSQTestBase
final WindowOperatorQuery query = new WindowOperatorQuery(
new QueryDataSource(groupByQuery),
new LegacySegmentSpec(Intervals.ETERNITY),
context,
DEFAULT_MSQ_CONTEXT,
RowSignature.builder().add("d0", ColumnType.STRING)
.add("d1", ColumnType.LONG)
.add("w0", ColumnType.LONG).build(),
@ -2019,13 +1974,12 @@ public class MSQWindowTest extends MSQTestBase
new Object[]{"Albuquerque", 9L, 140L},
new Object[]{"Albuquerque", 129L, 140L}
))
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testReplaceGroupByOnWikipedia(String contextName, Map<String, Object> context)
@Test
public void testReplaceGroupByOnWikipedia()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
@ -2041,7 +1995,7 @@ public class MSQWindowTest extends MSQTestBase
+ "PARTITIONED BY ALL CLUSTERED BY added")
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
.setExpectedResultRows(
ImmutableList.of(
@ -2055,11 +2009,10 @@ public class MSQWindowTest extends MSQTestBase
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testWindowOnMixOfEmptyAndNonEmptyOverWithMultipleWorkers(String contextName, Map<String, Object> context)
@Test
public void testWindowOnMixOfEmptyAndNonEmptyOverWithMultipleWorkers()
{
final Map<String, Object> multipleWorkerContext = new HashMap<>(context);
final Map<String, Object> multipleWorkerContext = new HashMap<>(DEFAULT_MSQ_CONTEXT);
multipleWorkerContext.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 5);
final RowSignature rowSignature = RowSignature.builder()
@ -2286,9 +2239,8 @@ public class MSQWindowTest extends MSQTestBase
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testReplaceWithPartitionedByDayOnWikipedia(String contextName, Map<String, Object> context)
@Test
public void testReplaceWithPartitionedByDayOnWikipedia()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
@ -2304,7 +2256,7 @@ public class MSQWindowTest extends MSQTestBase
+ "PARTITIONED BY DAY")
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
.setExpectedResultRows(
ImmutableList.of(
@ -2323,4 +2275,41 @@ public class MSQWindowTest extends MSQTestBase
)))
.verifyResults();
}
@Test
public void testFailurePartitionByMVD_1()
{
testSelectQuery()
.setSql("select cityName, countryName, array_to_mv(array[1,length(cityName)]), "
+ "row_number() over (partition by array_to_mv(array[1,length(cityName)]) order by countryName, cityName)\n"
+ "from wikipedia\n"
+ "where countryName in ('Austria', 'Republic of Korea') and cityName is not null\n"
+ "order by 1, 2, 3")
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedExecutionErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(ISE.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
"Encountered a multi value column [v0]. Window processing does not support MVDs. Consider using UNNEST or MV_TO_ARRAY."))
))
.verifyExecutionError();
}
@Test
public void testFailurePartitionByMVD_2()
{
testSelectQuery()
.setSql(" select cityName, countryName, array_to_mv(array[1,length(cityName)]),"
+ "row_number() over (partition by countryName order by countryName, cityName) as c1,\n"
+ "row_number() over (partition by array_to_mv(array[1,length(cityName)]) order by countryName, cityName) as c2\n"
+ "from wikipedia\n"
+ "where countryName in ('Austria', 'Republic of Korea') and cityName is not null\n"
+ "order by 1, 2, 3")
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedExecutionErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(ISE.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
"Encountered a multi value column [v0]. Window processing does not support MVDs. Consider using UNNEST or MV_TO_ARRAY."))
))
.verifyExecutionError();
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.query.rowsandcols;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.column.ColumnAccessorBasedColumn;
@ -28,6 +29,7 @@ import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
@ -128,7 +130,16 @@ public class RearrangedRowsAndColumns implements RowsAndColumns
@Override
public Object getObject(int rowNum)
{
return accessor.getObject(pointers[start + rowNum]);
Object value = accessor.getObject(pointers[start + rowNum]);
if (ColumnType.STRING.equals(getType()) && value instanceof List) {
// special handling to reject MVDs
throw new UOE(
"Encountered a multi value column [%s]. Window processing does not support MVDs. "
+ "Consider using UNNEST or MV_TO_ARRAY.",
name
);
}
return value;
}
@Override