Merge remote-tracking branch 'apache/master' into quidem-msq

This commit is contained in:
Zoltan Haindrich 2024-08-06 11:42:04 +00:00
commit 29b2b559d9
15 changed files with 402 additions and 162 deletions

View File

@ -43,7 +43,6 @@ import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.StageDefinitionBuilder;
import org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessorFactory;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.FilteredDataSource;
import org.apache.druid.query.InlineDataSource;
@ -424,21 +423,11 @@ public class DataSourcePlan
@Nullable final QueryContext parentContext
)
{
// check if parentContext has a window operator
final Map<String, Object> windowShuffleMap = new HashMap<>();
if (parentContext != null && parentContext.containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) {
windowShuffleMap.put(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL, parentContext.get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL));
}
final QueryDefinition subQueryDef = queryKit.makeQueryDefinition(
queryId,
// Subqueries ignore SQL_INSERT_SEGMENT_GRANULARITY, even if set in the context. It's only used for the
// outermost query, and setting it for the subquery makes us erroneously add bucketing where it doesn't belong.
windowShuffleMap.isEmpty()
? dataSource.getQuery()
.withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY)
: dataSource.getQuery()
.withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY)
.withOverriddenContext(windowShuffleMap),
dataSource.getQuery().withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY),
queryKit,
ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount),
maxWorkerCount,

View File

@ -20,7 +20,6 @@
package org.apache.druid.msq.querykit;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.frame.key.KeyOrder;
@ -88,17 +87,6 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
List<List<OperatorFactory>> operatorList = getOperatorListFromQuery(originalQuery);
log.info("Created operatorList with operator factories: [%s]", operatorList);
ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount);
// add this shuffle spec to the last stage of the inner query
final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder(queryId);
if (nextShuffleSpec != null) {
final ClusterBy windowClusterBy = nextShuffleSpec.clusterBy();
originalQuery = (WindowOperatorQuery) originalQuery.withOverriddenContext(ImmutableMap.of(
MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL,
windowClusterBy
));
}
final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource(
queryKit,
queryId,
@ -112,7 +100,8 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
false
);
dataSourcePlan.getSubQueryDefBuilder().ifPresent(queryDefBuilder::addAll);
ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount);
final QueryDefinitionBuilder queryDefBuilder = makeQueryDefinitionBuilder(queryId, dataSourcePlan, nextShuffleSpec);
final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber());
final WindowOperatorQuery queryToRun = (WindowOperatorQuery) originalQuery.withDataSource(dataSourcePlan.getNewDataSource());
@ -309,12 +298,16 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
}
}
if (partition == null || partition.getPartitionColumns().isEmpty()) {
if (partition == null) {
// If operatorFactories doesn't have any partitioning factory, then we should keep the shuffle spec from previous stage.
// This indicates that we already have the data partitioned correctly, and hence we don't need to do any shuffling.
return null;
}
if (partition.getPartitionColumns().isEmpty()) {
return MixShuffleSpec.instance();
}
List<KeyColumn> keyColsOfWindow = new ArrayList<>();
for (String partitionColumn : partition.getPartitionColumns()) {
KeyColumn kc;
@ -328,4 +321,29 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), maxWorkerCount);
}
/**
* Override the shuffle spec of the last stage based on the shuffling required by the first window stage.
* @param queryId
* @param dataSourcePlan
* @param shuffleSpec
* @return
*/
private QueryDefinitionBuilder makeQueryDefinitionBuilder(String queryId, DataSourcePlan dataSourcePlan, ShuffleSpec shuffleSpec)
{
final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder(queryId);
int previousStageNumber = dataSourcePlan.getSubQueryDefBuilder().get().build().getFinalStageDefinition().getStageNumber();
for (final StageDefinition stageDef : dataSourcePlan.getSubQueryDefBuilder().get().build().getStageDefinitions()) {
if (stageDef.getStageNumber() == previousStageNumber) {
RowSignature rowSignature = QueryKitUtils.sortableSignature(
stageDef.getSignature(),
shuffleSpec.clusterBy().getColumns()
);
queryDefBuilder.add(StageDefinition.builder(stageDef).shuffleSpec(shuffleSpec).signature(rowSignature));
} else {
queryDefBuilder.add(StageDefinition.builder(stageDef));
}
}
return queryDefBuilder;
}
}

View File

@ -28,7 +28,6 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.msq.input.stage.StageInputSpec;
import org.apache.druid.msq.kernel.HashShuffleSpec;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
import org.apache.druid.msq.kernel.ShuffleSpec;
@ -39,7 +38,6 @@ import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.msq.querykit.ShuffleSpecFactories;
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.DimensionComparisonUtils;
import org.apache.druid.query.Query;
import org.apache.druid.query.dimension.DimensionSpec;
@ -168,104 +166,40 @@ public class GroupByQueryKit implements QueryKit<GroupByQuery>
partitionBoost
);
final ShuffleSpec nextShuffleWindowSpec = getShuffleSpecForNextWindow(originalQuery, maxWorkerCount);
queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + 1)
.inputs(new StageInputSpec(firstStageNumber))
.signature(resultSignature)
.maxWorkerCount(maxWorkerCount)
.shuffleSpec(
shuffleSpecFactoryPostAggregation != null
? shuffleSpecFactoryPostAggregation.build(resultClusterBy, false)
: null
)
.processorFactory(new GroupByPostShuffleFrameProcessorFactory(queryToRun))
);
if (nextShuffleWindowSpec == null) {
if (doLimitOrOffset) {
final ShuffleSpec finalShuffleSpec = resultShuffleSpecFactory.build(resultClusterBy, false);
final DefaultLimitSpec limitSpec = (DefaultLimitSpec) queryToRun.getLimitSpec();
queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + 1)
.inputs(new StageInputSpec(firstStageNumber))
StageDefinition.builder(firstStageNumber + 2)
.inputs(new StageInputSpec(firstStageNumber + 1))
.signature(resultSignature)
.maxWorkerCount(maxWorkerCount)
.shuffleSpec(
shuffleSpecFactoryPostAggregation != null
? shuffleSpecFactoryPostAggregation.build(resultClusterBy, false)
: null
.maxWorkerCount(1)
.shuffleSpec(finalShuffleSpec)
.processorFactory(
new OffsetLimitFrameProcessorFactory(
limitSpec.getOffset(),
limitSpec.isLimited() ? (long) limitSpec.getLimit() : null
)
)
.processorFactory(new GroupByPostShuffleFrameProcessorFactory(queryToRun))
);
if (doLimitOrOffset) {
final ShuffleSpec finalShuffleSpec = resultShuffleSpecFactory.build(resultClusterBy, false);
final DefaultLimitSpec limitSpec = (DefaultLimitSpec) queryToRun.getLimitSpec();
queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + 2)
.inputs(new StageInputSpec(firstStageNumber + 1))
.signature(resultSignature)
.maxWorkerCount(1)
.shuffleSpec(finalShuffleSpec)
.processorFactory(
new OffsetLimitFrameProcessorFactory(
limitSpec.getOffset(),
limitSpec.isLimited() ? (long) limitSpec.getLimit() : null
)
)
);
}
} else {
final RowSignature stageSignature;
// sort the signature to make sure the prefix is aligned
stageSignature = QueryKitUtils.sortableSignature(
resultSignature,
nextShuffleWindowSpec.clusterBy().getColumns()
);
queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + 1)
.inputs(new StageInputSpec(firstStageNumber))
.signature(stageSignature)
.maxWorkerCount(maxWorkerCount)
.shuffleSpec(doLimitOrOffset ? (shuffleSpecFactoryPostAggregation != null
? shuffleSpecFactoryPostAggregation.build(
resultClusterBy,
false
)
: null) : nextShuffleWindowSpec)
.processorFactory(new GroupByPostShuffleFrameProcessorFactory(queryToRun))
);
if (doLimitOrOffset) {
final DefaultLimitSpec limitSpec = (DefaultLimitSpec) queryToRun.getLimitSpec();
final ShuffleSpec finalShuffleSpec = resultShuffleSpecFactory.build(resultClusterBy, false);
queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + 2)
.inputs(new StageInputSpec(firstStageNumber + 1))
.signature(resultSignature)
.maxWorkerCount(1)
.shuffleSpec(finalShuffleSpec)
.processorFactory(
new OffsetLimitFrameProcessorFactory(
limitSpec.getOffset(),
limitSpec.isLimited() ? (long) limitSpec.getLimit() : null
)
)
);
}
}
return queryDefBuilder.build();
}
/**
* @param originalQuery which has the context for the next shuffle if that's present in the next window
* @param maxWorkerCount max worker count
* @return shuffle spec without partition boosting for next stage, null if there is no partition by for next window
*/
private ShuffleSpec getShuffleSpecForNextWindow(GroupByQuery originalQuery, int maxWorkerCount)
{
final ShuffleSpec nextShuffleWindowSpec;
if (originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) {
final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext()
.get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL);
nextShuffleWindowSpec = new HashShuffleSpec(
windowClusterBy,
maxWorkerCount
);
} else {
nextShuffleWindowSpec = null;
}
return nextShuffleWindowSpec;
}
/**
* Intermediate signature of a particular {@link GroupByQuery}. Does not include post-aggregators, and all
* aggregations are nonfinalized.

View File

@ -37,7 +37,6 @@ import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.msq.querykit.ShuffleSpecFactories;
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.Query;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.column.ColumnType;
@ -129,26 +128,8 @@ public class ScanQueryKit implements QueryKit<ScanQuery>
);
}
// Update partition by of next window
final RowSignature signatureSoFar = signatureBuilder.build();
boolean addShuffle = true;
if (originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) {
final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext()
.get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL);
for (KeyColumn c : windowClusterBy.getColumns()) {
if (!signatureSoFar.contains(c.columnName())) {
addShuffle = false;
break;
}
}
if (addShuffle) {
clusterByColumns.addAll(windowClusterBy.getColumns());
}
} else {
// Add partition boosting column.
clusterByColumns.add(new KeyColumn(QueryKitUtils.PARTITION_BOOST_COLUMN, KeyOrder.ASCENDING));
signatureBuilder.add(QueryKitUtils.PARTITION_BOOST_COLUMN, ColumnType.LONG);
}
clusterByColumns.add(new KeyColumn(QueryKitUtils.PARTITION_BOOST_COLUMN, KeyOrder.ASCENDING));
signatureBuilder.add(QueryKitUtils.PARTITION_BOOST_COLUMN, ColumnType.LONG);
final ClusterBy clusterBy =
QueryKitUtils.clusterByWithSegmentGranularity(new ClusterBy(clusterByColumns, 0), segmentGranularity);

View File

@ -167,8 +167,6 @@ public class MultiStageQueryContext
public static final String CTX_ARRAY_INGEST_MODE = "arrayIngestMode";
public static final ArrayIngestMode DEFAULT_ARRAY_INGEST_MODE = ArrayIngestMode.ARRAY;
public static final String NEXT_WINDOW_SHUFFLE_COL = "__windowShuffleCol";
public static final String MAX_ROWS_MATERIALIZED_IN_WINDOW = "maxRowsMaterializedInWindow";
public static final String CTX_SKIP_TYPE_VERIFICATION = "skipTypeVerification";

View File

@ -37,7 +37,9 @@ import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
@ -48,6 +50,7 @@ import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.operator.window.WindowFrame;
import org.apache.druid.query.operator.window.WindowFramedAggregateProcessor;
import org.apache.druid.query.operator.window.WindowOperatorFactory;
import org.apache.druid.query.operator.window.ranking.WindowRowNumberProcessor;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.LegacySegmentSpec;
import org.apache.druid.segment.column.ColumnType;
@ -65,6 +68,8 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -1842,7 +1847,7 @@ public class MSQWindowTest extends MSQTestBase
.setSql(
"select cityName, added, SUM(added) OVER () cc from wikipedia")
.setQueryContext(customContext)
.setExpectedMSQFault(new TooManyRowsInAWindowFault(15676, 200))
.setExpectedMSQFault(new TooManyRowsInAWindowFault(15921, 200))
.verifyResults();
}
@ -2048,4 +2053,235 @@ public class MSQWindowTest extends MSQTestBase
.setExpectedSegments(ImmutableSet.of(SegmentId.of("foo1", Intervals.ETERNITY, "test", 0)))
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testWindowOnMixOfEmptyAndNonEmptyOverWithMultipleWorkers(String contextName, Map<String, Object> context)
{
final Map<String, Object> multipleWorkerContext = new HashMap<>(context);
multipleWorkerContext.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 5);
final RowSignature rowSignature = RowSignature.builder()
.add("countryName", ColumnType.STRING)
.add("cityName", ColumnType.STRING)
.add("channel", ColumnType.STRING)
.add("c1", ColumnType.LONG)
.add("c2", ColumnType.LONG)
.build();
final Map<String, Object> contextWithRowSignature =
ImmutableMap.<String, Object>builder()
.putAll(multipleWorkerContext)
.put(
DruidQuery.CTX_SCAN_SIGNATURE,
"[{\"name\":\"d0\",\"type\":\"STRING\"},{\"name\":\"d1\",\"type\":\"STRING\"},{\"name\":\"d2\",\"type\":\"STRING\"},{\"name\":\"w0\",\"type\":\"LONG\"},{\"name\":\"w1\",\"type\":\"LONG\"}]"
)
.build();
final GroupByQuery groupByQuery = GroupByQuery.builder()
.setDataSource(CalciteTests.WIKIPEDIA)
.setInterval(querySegmentSpec(Filtration
.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(
new DefaultDimensionSpec(
"countryName",
"d0",
ColumnType.STRING
),
new DefaultDimensionSpec(
"cityName",
"d1",
ColumnType.STRING
),
new DefaultDimensionSpec(
"channel",
"d2",
ColumnType.STRING
)
))
.setDimFilter(in("countryName", ImmutableList.of("Austria", "Republic of Korea")))
.setContext(multipleWorkerContext)
.build();
final AggregatorFactory[] aggs = {
new FilteredAggregatorFactory(new CountAggregatorFactory("w1"), notNull("d2"), "w1")
};
final WindowOperatorQuery windowQuery = new WindowOperatorQuery(
new QueryDataSource(groupByQuery),
new LegacySegmentSpec(Intervals.ETERNITY),
multipleWorkerContext,
RowSignature.builder()
.add("d0", ColumnType.STRING)
.add("d1", ColumnType.STRING)
.add("d2", ColumnType.STRING)
.add("w0", ColumnType.LONG)
.add("w1", ColumnType.LONG).build(),
ImmutableList.of(
new NaiveSortOperatorFactory(ImmutableList.of(ColumnWithDirection.ascending("d0"), ColumnWithDirection.ascending("d1"), ColumnWithDirection.ascending("d2"))),
new NaivePartitioningOperatorFactory(Collections.emptyList()),
new WindowOperatorFactory(new WindowRowNumberProcessor("w0")),
new NaiveSortOperatorFactory(ImmutableList.of(ColumnWithDirection.ascending("d1"), ColumnWithDirection.ascending("d0"), ColumnWithDirection.ascending("d2"))),
new NaivePartitioningOperatorFactory(Collections.singletonList("d1")),
new WindowOperatorFactory(new WindowFramedAggregateProcessor(WindowFrame.forOrderBy("d0", "d1", "d2"), aggs))
),
ImmutableList.of()
);
final ScanQuery scanQuery = Druids.newScanQueryBuilder()
.dataSource(new QueryDataSource(windowQuery))
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("d0", "d1", "d2", "w0", "w1")
.orderBy(
ImmutableList.of(
new ScanQuery.OrderBy("d0", ScanQuery.Order.ASCENDING),
new ScanQuery.OrderBy("d1", ScanQuery.Order.ASCENDING),
new ScanQuery.OrderBy("d2", ScanQuery.Order.ASCENDING)
)
)
.columnTypes(ColumnType.STRING, ColumnType.STRING, ColumnType.STRING, ColumnType.LONG, ColumnType.LONG)
.limit(Long.MAX_VALUE)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(contextWithRowSignature)
.build();
final String sql = "select countryName, cityName, channel, \n"
+ "row_number() over (order by countryName, cityName, channel) as c1, \n"
+ "count(channel) over (partition by cityName order by countryName, cityName, channel) as c2\n"
+ "from wikipedia\n"
+ "where countryName in ('Austria', 'Republic of Korea')\n"
+ "group by countryName, cityName, channel "
+ "order by countryName, cityName, channel";
final String nullValue = NullHandling.sqlCompatible() ? null : "";
testSelectQuery()
.setSql(sql)
.setExpectedMSQSpec(MSQSpec.builder()
.query(scanQuery)
.columnMappings(
new ColumnMappings(ImmutableList.of(
new ColumnMapping("d0", "countryName"),
new ColumnMapping("d1", "cityName"),
new ColumnMapping("d2", "channel"),
new ColumnMapping("w0", "c1"),
new ColumnMapping("w1", "c2")
)
))
.tuningConfig(MSQTuningConfig.defaultConfig())
.build())
.setExpectedRowSignature(rowSignature)
.setExpectedResultRows(
ImmutableList.<Object[]>of(
new Object[]{"Austria", nullValue, "#de.wikipedia", 1L, 1L},
new Object[]{"Austria", "Horsching", "#de.wikipedia", 2L, 1L},
new Object[]{"Austria", "Vienna", "#de.wikipedia", 3L, 1L},
new Object[]{"Austria", "Vienna", "#es.wikipedia", 4L, 2L},
new Object[]{"Austria", "Vienna", "#tr.wikipedia", 5L, 3L},
new Object[]{"Republic of Korea", nullValue, "#en.wikipedia", 6L, 2L},
new Object[]{"Republic of Korea", nullValue, "#ja.wikipedia", 7L, 3L},
new Object[]{"Republic of Korea", nullValue, "#ko.wikipedia", 8L, 4L},
new Object[]{"Republic of Korea", "Jeonju", "#ko.wikipedia", 9L, 1L},
new Object[]{"Republic of Korea", "Seongnam-si", "#ko.wikipedia", 10L, 1L},
new Object[]{"Republic of Korea", "Seoul", "#ko.wikipedia", 11L, 1L},
new Object[]{"Republic of Korea", "Suwon-si", "#ko.wikipedia", 12L, 1L},
new Object[]{"Republic of Korea", "Yongsan-dong", "#ko.wikipedia", 13L, 1L}
)
)
.setQueryContext(multipleWorkerContext)
// Stage 0
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(13).bytes(872).frames(1),
0, 0, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(4, 4, 4, 1).bytes(251, 266, 300, 105).frames(1, 1, 1, 1),
0, 0, "shuffle"
)
// Stage 1, Worker 0
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(4).bytes(251).frames(1),
1, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(4).bytes(251).frames(1),
1, 0, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(4).bytes(251).frames(1),
1, 0, "shuffle"
)
// Stage 1, Worker 1
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(0, 4).bytes(0, 266).frames(0, 1),
1, 1, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(0, 4).bytes(0, 266).frames(0, 1),
1, 1, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(4).bytes(266).frames(1),
1, 1, "shuffle"
)
// Stage 1, Worker 2
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(0, 0, 4).bytes(0, 0, 300).frames(0, 0, 1),
1, 2, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(0, 0, 4).bytes(0, 0, 300).frames(0, 0, 1),
1, 2, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(4).bytes(300).frames(1),
1, 2, "shuffle"
)
// Stage 1, Worker 3
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(0, 0, 0, 1).bytes(0, 0, 0, 105).frames(0, 0, 0, 1),
1, 3, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(0, 0, 0, 1).bytes(0, 0, 0, 105).frames(0, 0, 0, 1),
1, 3, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(1).bytes(105).frames(1),
1, 3, "shuffle"
)
// Stage 2 (window stage)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(13).bytes(922).frames(4),
2, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(13).bytes(1158).frames(1),
2, 0, "output"
)
// Stage 3, Worker 0
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(13).bytes(1158).frames(1),
3, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(13).bytes(1379).frames(1),
3, 0, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(13).bytes(1327).frames(1),
3, 0, "shuffle"
)
.verifyResults();
}
}

View File

@ -30,6 +30,7 @@ import org.apache.druid.segment.column.ColumnType;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
public class WindowRowNumberProcessor implements Processor
{
@ -137,4 +138,23 @@ public class WindowRowNumberProcessor implements Processor
{
return Collections.singletonList(outputColumn);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WindowRowNumberProcessor that = (WindowRowNumberProcessor) o;
return Objects.equals(outputColumn, that.outputColumn);
}
@Override
public int hashCode()
{
return Objects.hashCode(outputColumn);
}
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.query.operator.window.ranking;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.query.operator.window.Processor;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
@ -61,4 +62,13 @@ public class WindowRowNumberProcessorTest
final RowsAndColumns results = processor.process(rac);
expectations.validate(results);
}
@Test
public void testEqualsAndHashcode()
{
EqualsVerifier.forClass(WindowRowNumberProcessor.class)
.withNonnullFields("outputColumn")
.usingGetClass()
.verify();
}
}

View File

@ -133,6 +133,8 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
throw Util.unexpected(windowOrId.getKind());
}
updateBoundsIfNeeded(targetWindow);
@Nullable
SqlNode lowerBound = targetWindow.getLowerBound();
@Nullable
@ -144,17 +146,6 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
);
}
if (lowerBound != null && upperBound == null) {
if (lowerBound.getKind() == SqlKind.FOLLOWING || SqlWindow.isUnboundedFollowing(lowerBound)) {
upperBound = lowerBound;
lowerBound = SqlWindow.createCurrentRow(SqlParserPos.ZERO);
} else {
upperBound = SqlWindow.createCurrentRow(SqlParserPos.ZERO);
}
targetWindow.setLowerBound(lowerBound);
targetWindow.setUpperBound(upperBound);
}
boolean hasBounds = lowerBound != null || upperBound != null;
if (call.getKind() == SqlKind.NTILE && hasBounds) {
throw buildCalciteContextException(
@ -758,6 +749,28 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
|| SqlWindow.isUnboundedPreceding(bound);
}
/**
* Checks if any bound is null and updates with CURRENT ROW
*/
private void updateBoundsIfNeeded(SqlWindow window)
{
@Nullable
SqlNode lowerBound = window.getLowerBound();
@Nullable
SqlNode upperBound = window.getUpperBound();
if (lowerBound != null && upperBound == null) {
if (lowerBound.getKind() == SqlKind.FOLLOWING || SqlWindow.isUnboundedFollowing(lowerBound)) {
upperBound = lowerBound;
lowerBound = SqlWindow.createCurrentRow(SqlParserPos.ZERO);
} else {
upperBound = SqlWindow.createCurrentRow(SqlParserPos.ZERO);
}
window.setLowerBound(lowerBound);
window.setUpperBound(upperBound);
}
}
@Override
public void validateCall(SqlCall call, SqlValidatorScope scope)
{
@ -812,6 +825,10 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
sqlNode
);
}
if (sqlNode instanceof SqlWindow) {
SqlWindow window = (SqlWindow) sqlNode;
updateBoundsIfNeeded(window);
}
}
super.validateWindowClause(select);
}

View File

@ -7,10 +7,11 @@ sql: |
count(*) OVER (partition by dim2 ORDER BY dim1 ROWS 1 PRECEDING),
count(*) OVER (partition by dim2 ORDER BY dim1 ROWS CURRENT ROW),
count(*) OVER (partition by dim2 ORDER BY dim1 ROWS 1 FOLLOWING),
count(*) OVER (partition by dim2 ORDER BY dim1 ROWS UNBOUNDED FOLLOWING)
count(*) OVER W
FROM numfoo
WHERE dim2 IN ('a', 'abc')
GROUP BY dim2, dim1
WINDOW W AS (partition by dim2 ORDER BY dim1 ROWS UNBOUNDED FOLLOWING)
expectedOperators:
- {"type":"naiveSort","columns":[{"column":"_d1","direction":"ASC"},{"column":"_d0","direction":"ASC"}]}

View File

@ -206,7 +206,7 @@ export class ConsoleApplication extends React.PureComponent<
private readonly openSupervisorSubmit = () => {
this.openSupervisorDialog = true;
location.hash = 'supervisor';
location.hash = 'supervisors';
this.resetInitialsWithDelay();
};

View File

@ -23,7 +23,7 @@ import axios from 'axios';
import { Api } from '../singletons';
import type { RowColumn } from './general';
import { assemble } from './general';
import { assemble, lookupBy } from './general';
const CANCELED_MESSAGE = 'Query canceled by user.';
@ -345,10 +345,24 @@ export async function queryDruidSql<T = any>(
export interface QueryExplanation {
query: any;
signature: { name: string; type: string }[];
columnMappings: {
queryColumn: string;
outputColumn: string;
}[];
}
export function formatSignature(queryExplanation: QueryExplanation): string {
return queryExplanation.signature
.map(({ name, type }) => `${C.optionalQuotes(name)}::${type}`)
export function formatColumnMappingsAndSignature(queryExplanation: QueryExplanation): string {
const columnNameToType = lookupBy(
queryExplanation.signature,
c => c.name,
c => c.type,
);
return queryExplanation.columnMappings
.map(({ queryColumn, outputColumn }) => {
const type = columnNameToType[queryColumn];
return `${C.optionalQuotes(queryColumn)}${type ? `::${type}` : ''}${C.optionalQuotes(
outputColumn,
)}`;
})
.join(', ');
}

View File

@ -185,7 +185,7 @@ exports[`ExplainDialog matches snapshot on some data (many queries) 1`] = `
label="Signature"
>
<Blueprint5.InputGroup
defaultValue="channel::STRING"
defaultValue="channel::STRING→channel"
readOnly={true}
/>
</Blueprint5.FormGroup>
@ -287,7 +287,7 @@ exports[`ExplainDialog matches snapshot on some data (many queries) 1`] = `
label="Signature"
>
<Blueprint5.InputGroup
defaultValue="channel::STRING"
defaultValue="channel::STRING→channel"
readOnly={true}
/>
</Blueprint5.FormGroup>
@ -473,7 +473,7 @@ exports[`ExplainDialog matches snapshot on some data (one query) 1`] = `
label="Signature"
>
<Blueprint5.InputGroup
defaultValue="d0::STRING, a0::LONG"
defaultValue="d0::STRING→channel, a0::LONG→"Count""
readOnly={true}
/>
</Blueprint5.FormGroup>

View File

@ -160,6 +160,16 @@ describe('ExplainDialog', () => {
type: 'LONG',
},
],
columnMappings: [
{
queryColumn: 'd0',
outputColumn: 'channel',
},
{
queryColumn: 'a0',
outputColumn: 'Count',
},
],
},
],
});
@ -199,6 +209,12 @@ describe('ExplainDialog', () => {
type: 'STRING',
},
],
columnMappings: [
{
queryColumn: 'channel',
outputColumn: 'channel',
},
],
},
{
query: {
@ -234,6 +250,12 @@ describe('ExplainDialog', () => {
type: 'STRING',
},
],
columnMappings: [
{
queryColumn: 'channel',
outputColumn: 'channel',
},
],
},
],
});

View File

@ -40,7 +40,7 @@ import { Api } from '../../../singletons';
import type { QueryExplanation } from '../../../utils';
import {
deepGet,
formatSignature,
formatColumnMappingsAndSignature,
getDruidErrorMessage,
nonEmptyArray,
queryDruidSql,
@ -141,7 +141,7 @@ export const ExplainDialog = React.memo(function ExplainDialog(props: ExplainDia
/>
</FormGroup>
<FormGroup className="signature-group" label="Signature">
<InputGroup defaultValue={formatSignature(queryExplanation)} readOnly />
<InputGroup defaultValue={formatColumnMappingsAndSignature(queryExplanation)} readOnly />
</FormGroup>
{openQueryLabel && (
<Button