mirror of https://github.com/apache/druid.git
Fix issues with partitioning boundaries for MSQ window functions (#16729)
* Fix issues with partitioning boundaries for MSQ window functions * Address review comments * Address review comments * Add test for coverage check failure * Address review comment * Remove DruidWindowQueryTest and WindowQueryTestBase, move those tests to DrillWindowQueryTest * Update extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java * Address review comments * Add test for equals and hashcode for WindowOperatorQueryFrameProcessorFactory * Address review comment * Fix checkstyle --------- Co-authored-by: Benedict Jin <asdf2014@apache.org>
This commit is contained in:
parent
44b3f8e588
commit
b53c26f5c5
|
@ -38,7 +38,6 @@ import org.apache.druid.java.util.common.logger.Logger;
|
|||
import org.apache.druid.msq.indexing.error.MSQException;
|
||||
import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault;
|
||||
import org.apache.druid.query.groupby.ResultRow;
|
||||
import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
|
||||
import org.apache.druid.query.operator.OffsetLimit;
|
||||
import org.apache.druid.query.operator.Operator;
|
||||
import org.apache.druid.query.operator.OperatorFactory;
|
||||
|
@ -70,6 +69,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
|
|||
private final WindowOperatorQuery query;
|
||||
|
||||
private final List<OperatorFactory> operatorFactoryList;
|
||||
private final List<String> partitionColumnNames;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final ArrayList<RowsAndColumns> frameRowsAndCols;
|
||||
private final ArrayList<RowsAndColumns> resultRowAndCols;
|
||||
|
@ -79,7 +79,6 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
|
|||
private final FrameReader frameReader;
|
||||
private final ArrayList<ResultRow> objectsOfASingleRac;
|
||||
private final int maxRowsMaterialized;
|
||||
List<Integer> partitionColsIndex;
|
||||
private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed
|
||||
private Cursor frameCursor = null;
|
||||
private Supplier<ResultRow> rowSupplierFromFrameCursor;
|
||||
|
@ -97,7 +96,8 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
|
|||
final List<OperatorFactory> operatorFactoryList,
|
||||
final RowSignature rowSignature,
|
||||
final boolean isOverEmpty,
|
||||
final int maxRowsMaterializedInWindow
|
||||
final int maxRowsMaterializedInWindow,
|
||||
final List<String> partitionColumnNames
|
||||
)
|
||||
{
|
||||
this.inputChannel = inputChannel;
|
||||
|
@ -110,9 +110,9 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
|
|||
this.frameRowsAndCols = new ArrayList<>();
|
||||
this.resultRowAndCols = new ArrayList<>();
|
||||
this.objectsOfASingleRac = new ArrayList<>();
|
||||
this.partitionColsIndex = new ArrayList<>();
|
||||
this.isOverEmpty = isOverEmpty;
|
||||
this.maxRowsMaterialized = maxRowsMaterializedInWindow;
|
||||
this.partitionColumnNames = partitionColumnNames;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -177,12 +177,12 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
|
|||
*
|
||||
* Future thoughts: {@link https://github.com/apache/druid/issues/16126}
|
||||
*
|
||||
* 1. We are writing 1 partition to each frame in this way. In case of low cardinality data
|
||||
* we will me making a large number of small frames. We can have a check to keep size of frame to a value
|
||||
* 1. We are writing 1 partition to each frame in this way. In case of high cardinality data
|
||||
* we will be making a large number of small frames. We can have a check to keep size of frame to a value
|
||||
* say 20k rows and keep on adding to the same pending frame and not create a new frame
|
||||
*
|
||||
* 2. Current approach with R&C and operators materialize a single R&C for processing. In case of data
|
||||
* with high cardinality a single R&C might be too big to consume. Same for the case of empty OVER() clause
|
||||
* with low cardinality a single R&C might be too big to consume. Same for the case of empty OVER() clause
|
||||
* Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with 2 passes of the data.
|
||||
* We might think to reimplement them in the MSQ way so that we do not have to materialize so much data
|
||||
*/
|
||||
|
@ -218,7 +218,6 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
|
|||
final Frame frame = inputChannel.read();
|
||||
frameCursor = FrameProcessors.makeCursor(frame, frameReader);
|
||||
final ColumnSelectorFactory frameColumnSelectorFactory = frameCursor.getColumnSelectorFactory();
|
||||
partitionColsIndex = findPartitionColumns(frameReader.signature());
|
||||
final Supplier<Object>[] fieldSuppliers = new Supplier[frameReader.signature().size()];
|
||||
for (int i = 0; i < fieldSuppliers.length; i++) {
|
||||
final ColumnValueSelector<?> selector =
|
||||
|
@ -259,18 +258,17 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
|
|||
if (outputRow == null) {
|
||||
outputRow = currentRow;
|
||||
objectsOfASingleRac.add(currentRow);
|
||||
} else if (comparePartitionKeys(outputRow, currentRow, partitionColsIndex)) {
|
||||
} else if (comparePartitionKeys(outputRow, currentRow, partitionColumnNames)) {
|
||||
// if they have the same partition key
|
||||
// keep adding them after checking
|
||||
// guardrails
|
||||
objectsOfASingleRac.add(currentRow);
|
||||
if (objectsOfASingleRac.size() > maxRowsMaterialized) {
|
||||
throw new MSQException(new TooManyRowsInAWindowFault(
|
||||
objectsOfASingleRac.size(),
|
||||
maxRowsMaterialized
|
||||
));
|
||||
}
|
||||
objectsOfASingleRac.add(currentRow);
|
||||
|
||||
} else {
|
||||
// key change noted
|
||||
// create rac from the rows seen before
|
||||
|
@ -484,37 +482,36 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
|
|||
frameRowsAndCols.add(ldrc);
|
||||
}
|
||||
|
||||
private List<Integer> findPartitionColumns(RowSignature rowSignature)
|
||||
{
|
||||
List<Integer> indexList = new ArrayList<>();
|
||||
for (OperatorFactory of : operatorFactoryList) {
|
||||
if (of instanceof NaivePartitioningOperatorFactory) {
|
||||
for (String s : ((NaivePartitioningOperatorFactory) of).getPartitionColumns()) {
|
||||
indexList.add(rowSignature.indexOf(s));
|
||||
}
|
||||
}
|
||||
}
|
||||
return indexList;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* Compare two rows based only the columns in the partitionIndices
|
||||
* In case the parition indices is empty or null compare entire row
|
||||
*
|
||||
* Compare two rows based on the columns in partitionColumnNames.
|
||||
* If the partitionColumnNames is empty or null, compare entire row.
|
||||
* <p>
|
||||
* For example, say:
|
||||
* <ul>
|
||||
* <li>partitionColumnNames = ["d1", "d2"]</li>
|
||||
* <li>frameReader's row signature = {d1:STRING, d2:STRING, p0:STRING}</li>
|
||||
* <li>frameReader.signature.indexOf("d1") = 0</li>
|
||||
* <li>frameReader.signature.indexOf("d2") = 1</li>
|
||||
* <li>row1 = [d1_row1, d2_row1, p0_row1]</li>
|
||||
* <li>row2 = [d1_row2, d2_row2, p0_row2]</li>
|
||||
* </ul>
|
||||
* <p>
|
||||
* Then this method will return true if d1_row1==d1_row2 && d2_row1==d2_row2, false otherwise.
|
||||
* Returning true would indicate that these 2 rows can be put into the same partition for window function processing.
|
||||
*/
|
||||
private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List<Integer> partitionIndices)
|
||||
private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List<String> partitionColumnNames)
|
||||
{
|
||||
if (partitionIndices == null || partitionIndices.isEmpty()) {
|
||||
if (partitionColumnNames == null || partitionColumnNames.isEmpty()) {
|
||||
return row1.equals(row2);
|
||||
} else {
|
||||
int match = 0;
|
||||
for (int i : partitionIndices) {
|
||||
for (String columnName : partitionColumnNames) {
|
||||
int i = frameReader.signature().indexOf(columnName);
|
||||
if (Objects.equals(row1.get(i), row2.get(i))) {
|
||||
match++;
|
||||
}
|
||||
}
|
||||
return match == partitionIndices.size();
|
||||
return match == partitionColumnNames.size();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -61,6 +61,7 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
|
|||
private final RowSignature stageRowSignature;
|
||||
private final boolean isEmptyOver;
|
||||
private final int maxRowsMaterializedInWindow;
|
||||
private final List<String> partitionColumnNames;
|
||||
|
||||
@JsonCreator
|
||||
public WindowOperatorQueryFrameProcessorFactory(
|
||||
|
@ -68,7 +69,8 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
|
|||
@JsonProperty("operatorList") List<OperatorFactory> operatorFactoryList,
|
||||
@JsonProperty("stageRowSignature") RowSignature stageRowSignature,
|
||||
@JsonProperty("emptyOver") boolean emptyOver,
|
||||
@JsonProperty("maxRowsMaterializedInWindow") int maxRowsMaterializedInWindow
|
||||
@JsonProperty("maxRowsMaterializedInWindow") int maxRowsMaterializedInWindow,
|
||||
@JsonProperty("partitionColumnNames") List<String> partitionColumnNames
|
||||
)
|
||||
{
|
||||
this.query = Preconditions.checkNotNull(query, "query");
|
||||
|
@ -76,6 +78,7 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
|
|||
this.stageRowSignature = Preconditions.checkNotNull(stageRowSignature, "stageSignature");
|
||||
this.isEmptyOver = emptyOver;
|
||||
this.maxRowsMaterializedInWindow = maxRowsMaterializedInWindow;
|
||||
this.partitionColumnNames = partitionColumnNames;
|
||||
}
|
||||
|
||||
@JsonProperty("query")
|
||||
|
@ -90,6 +93,12 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
|
|||
return operatorList;
|
||||
}
|
||||
|
||||
@JsonProperty("partitionColumnNames")
|
||||
public List<String> getPartitionColumnNames()
|
||||
{
|
||||
return partitionColumnNames;
|
||||
}
|
||||
|
||||
@JsonProperty("stageRowSignature")
|
||||
public RowSignature getSignature()
|
||||
{
|
||||
|
@ -148,7 +157,6 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
|
|||
readableInput -> {
|
||||
final OutputChannel outputChannel =
|
||||
outputChannels.get(readableInput.getStagePartition().getPartitionNumber());
|
||||
|
||||
return new WindowOperatorQueryFrameProcessor(
|
||||
query,
|
||||
readableInput.getChannel(),
|
||||
|
@ -159,7 +167,8 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
|
|||
operatorList,
|
||||
stageRowSignature,
|
||||
isEmptyOver,
|
||||
maxRowsMaterializedInWindow
|
||||
maxRowsMaterializedInWindow,
|
||||
partitionColumnNames
|
||||
);
|
||||
}
|
||||
);
|
||||
|
@ -185,12 +194,13 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
|
|||
&& maxRowsMaterializedInWindow == that.maxRowsMaterializedInWindow
|
||||
&& Objects.equals(query, that.query)
|
||||
&& Objects.equals(operatorList, that.operatorList)
|
||||
&& Objects.equals(partitionColumnNames, that.partitionColumnNames)
|
||||
&& Objects.equals(stageRowSignature, that.stageRowSignature);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(query, operatorList, stageRowSignature, isEmptyOver, maxRowsMaterializedInWindow);
|
||||
return Objects.hash(query, operatorList, partitionColumnNames, stageRowSignature, isEmptyOver, maxRowsMaterializedInWindow);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,9 +24,12 @@ import com.google.common.collect.ImmutableMap;
|
|||
import org.apache.druid.frame.key.ClusterBy;
|
||||
import org.apache.druid.frame.key.KeyColumn;
|
||||
import org.apache.druid.frame.key.KeyOrder;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.msq.exec.Limits;
|
||||
import org.apache.druid.msq.input.stage.StageInputSpec;
|
||||
import org.apache.druid.msq.kernel.HashShuffleSpec;
|
||||
import org.apache.druid.msq.kernel.MixShuffleSpec;
|
||||
import org.apache.druid.msq.kernel.QueryDefinition;
|
||||
import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
|
||||
import org.apache.druid.msq.kernel.ShuffleSpec;
|
||||
|
@ -39,6 +42,7 @@ import org.apache.druid.query.operator.NaiveSortOperatorFactory;
|
|||
import org.apache.druid.query.operator.OperatorFactory;
|
||||
import org.apache.druid.query.operator.WindowOperatorQuery;
|
||||
import org.apache.druid.query.operator.window.WindowOperatorFactory;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
import org.apache.druid.segment.column.RowSignature;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -48,6 +52,7 @@ import java.util.Map;
|
|||
|
||||
public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
|
||||
{
|
||||
private static final Logger log = new Logger(WindowOperatorQueryKit.class);
|
||||
private final ObjectMapper jsonMapper;
|
||||
|
||||
public WindowOperatorQueryKit(ObjectMapper jsonMapper)
|
||||
|
@ -65,13 +70,22 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
|
|||
int minStageNumber
|
||||
)
|
||||
{
|
||||
// need to validate query first
|
||||
// populate the group of operators to be processed as each stage
|
||||
// the size of the operators is the number of serialized stages
|
||||
// later we should also check if these can be parallelized
|
||||
// check there is an empty over clause or not
|
||||
List<List<OperatorFactory>> operatorList = new ArrayList<>();
|
||||
boolean isEmptyOverFound = ifEmptyOverPresentInWindowOperstors(originalQuery, operatorList);
|
||||
// Need to validate query first.
|
||||
// Populate the group of operators to be processed at each stage.
|
||||
// The size of the operators is the number of serialized stages.
|
||||
// Later we should also check if these can be parallelized.
|
||||
// Check if there is an empty OVER() clause or not.
|
||||
RowSignature rowSignature = originalQuery.getRowSignature();
|
||||
log.info("Row signature received for query is [%s].", rowSignature);
|
||||
|
||||
boolean isEmptyOverPresent = originalQuery.getOperators()
|
||||
.stream()
|
||||
.filter(of -> of instanceof NaivePartitioningOperatorFactory)
|
||||
.map(of -> (NaivePartitioningOperatorFactory) of)
|
||||
.anyMatch(of -> of.getPartitionColumns().isEmpty());
|
||||
|
||||
List<List<OperatorFactory>> operatorList = getOperatorListFromQuery(originalQuery);
|
||||
log.info("Created operatorList with operator factories: [%s]", operatorList);
|
||||
|
||||
ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount);
|
||||
// add this shuffle spec to the last stage of the inner query
|
||||
|
@ -102,16 +116,14 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
|
|||
final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber());
|
||||
final WindowOperatorQuery queryToRun = (WindowOperatorQuery) originalQuery.withDataSource(dataSourcePlan.getNewDataSource());
|
||||
final int maxRowsMaterialized;
|
||||
RowSignature rowSignature = queryToRun.getRowSignature();
|
||||
|
||||
if (originalQuery.context() != null && originalQuery.context().containsKey(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW)) {
|
||||
maxRowsMaterialized = (int) originalQuery.context()
|
||||
.get(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW);
|
||||
maxRowsMaterialized = (int) originalQuery.context().get(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW);
|
||||
} else {
|
||||
maxRowsMaterialized = Limits.MAX_ROWS_MATERIALIZED_IN_WINDOW;
|
||||
}
|
||||
|
||||
|
||||
if (isEmptyOverFound) {
|
||||
if (isEmptyOverPresent) {
|
||||
// empty over clause found
|
||||
// moving everything to a single partition
|
||||
queryDefBuilder.add(
|
||||
|
@ -125,28 +137,59 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
|
|||
queryToRun.getOperators(),
|
||||
rowSignature,
|
||||
true,
|
||||
maxRowsMaterialized
|
||||
maxRowsMaterialized,
|
||||
new ArrayList<>()
|
||||
))
|
||||
);
|
||||
} else {
|
||||
// there are multiple windows present in the query
|
||||
// Create stages for each window in the query
|
||||
// These stages will be serialized
|
||||
// the partition by clause of the next window will be the shuffle key for the previous window
|
||||
// There are multiple windows present in the query.
|
||||
// Create stages for each window in the query.
|
||||
// These stages will be serialized.
|
||||
// The partition by clause of the next window will be the shuffle key for the previous window.
|
||||
RowSignature.Builder bob = RowSignature.builder();
|
||||
final int numberOfWindows = operatorList.size();
|
||||
final int baseSize = rowSignature.size() - numberOfWindows;
|
||||
for (int i = 0; i < baseSize; i++) {
|
||||
bob.add(rowSignature.getColumnName(i), rowSignature.getColumnType(i).get());
|
||||
RowSignature signatureFromInput = dataSourcePlan.getSubQueryDefBuilder().get().build().getFinalStageDefinition().getSignature();
|
||||
log.info("Row signature received from last stage is [%s].", signatureFromInput);
|
||||
|
||||
for (int i = 0; i < signatureFromInput.getColumnNames().size(); i++) {
|
||||
bob.add(signatureFromInput.getColumnName(i), signatureFromInput.getColumnType(i).get());
|
||||
}
|
||||
|
||||
List<String> partitionColumnNames = new ArrayList<>();
|
||||
|
||||
/*
|
||||
operatorList is a List<List<OperatorFactory>>, where each List<OperatorFactory> corresponds to the operator factories
|
||||
to be used for a different window stage.
|
||||
|
||||
We iterate over operatorList, and add the definition for a window stage to QueryDefinitionBuilder.
|
||||
*/
|
||||
for (int i = 0; i < operatorList.size(); i++) {
|
||||
for (OperatorFactory operatorFactory : operatorList.get(i)) {
|
||||
if (operatorFactory instanceof WindowOperatorFactory) {
|
||||
List<String> outputColumnNames = ((WindowOperatorFactory) operatorFactory).getProcessor().getOutputColumnNames();
|
||||
|
||||
// Need to add column names which are present in outputColumnNames and rowSignature but not in bob,
|
||||
// since they need to be present in the row signature for this window stage.
|
||||
for (String columnName : outputColumnNames) {
|
||||
int indexInRowSignature = rowSignature.indexOf(columnName);
|
||||
if (indexInRowSignature != -1 && bob.build().indexOf(columnName) == -1) {
|
||||
ColumnType columnType = rowSignature.getColumnType(indexInRowSignature).get();
|
||||
bob.add(columnName, columnType);
|
||||
log.info("Added column [%s] of type [%s] to row signature for window stage.", columnName, columnType);
|
||||
} else {
|
||||
throw new ISE(
|
||||
"Found unexpected column [%s] already present in row signature [%s].",
|
||||
columnName,
|
||||
rowSignature
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < numberOfWindows; i++) {
|
||||
bob.add(rowSignature.getColumnName(baseSize + i), rowSignature.getColumnType(baseSize + i).get()).build();
|
||||
// find the shuffle spec of the next stage
|
||||
// if it is the last stage set the next shuffle spec to single partition
|
||||
if (i + 1 == numberOfWindows) {
|
||||
nextShuffleSpec = ShuffleSpecFactories.singlePartition()
|
||||
.build(ClusterBy.none(), false);
|
||||
if (i + 1 == operatorList.size()) {
|
||||
nextShuffleSpec = MixShuffleSpec.instance();
|
||||
} else {
|
||||
nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(i + 1), maxWorkerCount);
|
||||
}
|
||||
|
@ -162,6 +205,28 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
|
|||
);
|
||||
}
|
||||
|
||||
log.info("Using row signature [%s] for window stage.", stageRowSignature);
|
||||
|
||||
boolean partitionOperatorExists = false;
|
||||
List<String> currentPartitionColumns = new ArrayList<>();
|
||||
for (OperatorFactory of : operatorList.get(i)) {
|
||||
if (of instanceof NaivePartitioningOperatorFactory) {
|
||||
for (String s : ((NaivePartitioningOperatorFactory) of).getPartitionColumns()) {
|
||||
currentPartitionColumns.add(s);
|
||||
partitionOperatorExists = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (partitionOperatorExists) {
|
||||
partitionColumnNames = currentPartitionColumns;
|
||||
}
|
||||
|
||||
log.info(
|
||||
"Columns which would be used to define partitioning boundaries for this window stage are [%s]",
|
||||
partitionColumnNames
|
||||
);
|
||||
|
||||
queryDefBuilder.add(
|
||||
StageDefinition.builder(firstStageNumber + i)
|
||||
.inputs(new StageInputSpec(firstStageNumber + i - 1))
|
||||
|
@ -173,7 +238,8 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
|
|||
operatorList.get(i),
|
||||
stageRowSignature,
|
||||
false,
|
||||
maxRowsMaterialized
|
||||
maxRowsMaterialized,
|
||||
partitionColumnNames
|
||||
))
|
||||
);
|
||||
}
|
||||
|
@ -184,14 +250,12 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
|
|||
/**
|
||||
*
|
||||
* @param originalQuery
|
||||
* @param operatorList
|
||||
* @return true if the operator List has a partitioning operator with an empty OVER clause, false otherwise
|
||||
* @return A list of list of operator factories, where each list represents the operator factories for a particular
|
||||
* window stage.
|
||||
*/
|
||||
private boolean ifEmptyOverPresentInWindowOperstors(
|
||||
WindowOperatorQuery originalQuery,
|
||||
List<List<OperatorFactory>> operatorList
|
||||
)
|
||||
private List<List<OperatorFactory>> getOperatorListFromQuery(WindowOperatorQuery originalQuery)
|
||||
{
|
||||
List<List<OperatorFactory>> operatorList = new ArrayList<>();
|
||||
final List<OperatorFactory> operators = originalQuery.getOperators();
|
||||
List<OperatorFactory> operatorFactoryList = new ArrayList<>();
|
||||
for (OperatorFactory of : operators) {
|
||||
|
@ -203,18 +267,17 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
|
|||
if (((NaivePartitioningOperatorFactory) of).getPartitionColumns().isEmpty()) {
|
||||
operatorList.clear();
|
||||
operatorList.add(originalQuery.getOperators());
|
||||
return true;
|
||||
return operatorList;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
return operatorList;
|
||||
}
|
||||
|
||||
private ShuffleSpec findShuffleSpecForNextWindow(List<OperatorFactory> operatorFactories, int maxWorkerCount)
|
||||
{
|
||||
NaivePartitioningOperatorFactory partition = null;
|
||||
NaiveSortOperatorFactory sort = null;
|
||||
List<KeyColumn> keyColsOfWindow = new ArrayList<>();
|
||||
for (OperatorFactory of : operatorFactories) {
|
||||
if (of instanceof NaivePartitioningOperatorFactory) {
|
||||
partition = (NaivePartitioningOperatorFactory) of;
|
||||
|
@ -222,29 +285,31 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
|
|||
sort = (NaiveSortOperatorFactory) of;
|
||||
}
|
||||
}
|
||||
Map<String, ColumnWithDirection.Direction> colMap = new HashMap<>();
|
||||
|
||||
Map<String, ColumnWithDirection.Direction> sortColumnsMap = new HashMap<>();
|
||||
if (sort != null) {
|
||||
for (ColumnWithDirection sortColumn : sort.getSortColumns()) {
|
||||
colMap.put(sortColumn.getColumn(), sortColumn.getDirection());
|
||||
sortColumnsMap.put(sortColumn.getColumn(), sortColumn.getDirection());
|
||||
}
|
||||
}
|
||||
assert partition != null;
|
||||
if (partition.getPartitionColumns().isEmpty()) {
|
||||
|
||||
if (partition == null || partition.getPartitionColumns().isEmpty()) {
|
||||
// If operatorFactories doesn't have any partitioning factory, then we should keep the shuffle spec from previous stage.
|
||||
// This indicates that we already have the data partitioned correctly, and hence we don't need to do any shuffling.
|
||||
return null;
|
||||
}
|
||||
|
||||
List<KeyColumn> keyColsOfWindow = new ArrayList<>();
|
||||
for (String partitionColumn : partition.getPartitionColumns()) {
|
||||
KeyColumn kc;
|
||||
if (colMap.containsKey(partitionColumn)) {
|
||||
if (colMap.get(partitionColumn) == ColumnWithDirection.Direction.ASC) {
|
||||
kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING);
|
||||
} else {
|
||||
if (sortColumnsMap.get(partitionColumn) == ColumnWithDirection.Direction.DESC) {
|
||||
kc = new KeyColumn(partitionColumn, KeyOrder.DESCENDING);
|
||||
}
|
||||
} else {
|
||||
kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING);
|
||||
}
|
||||
keyColsOfWindow.add(kc);
|
||||
}
|
||||
|
||||
return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), maxWorkerCount);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.msq.querykit;
|
||||
|
||||
import nl.jqno.equalsverifier.EqualsVerifier;
|
||||
import org.junit.Test;
|
||||
|
||||
public class WindowOperatorQueryFrameProcessorFactoryTest
|
||||
{
|
||||
@Test
|
||||
public void testEqualsAndHashcode()
|
||||
{
|
||||
EqualsVerifier.forClass(WindowOperatorQueryFrameProcessorFactory.class)
|
||||
.withNonnullFields("query", "operatorList", "stageRowSignature", "isEmptyOver", "maxRowsMaterializedInWindow", "partitionColumnNames")
|
||||
.usingGetClass()
|
||||
.verify();
|
||||
}
|
||||
}
|
|
@ -39,6 +39,7 @@ import org.apache.druid.guice.JoinableFactoryModule;
|
|||
import org.apache.druid.guice.annotations.Self;
|
||||
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.concurrent.Execs;
|
||||
import org.apache.druid.java.util.common.io.Closer;
|
||||
import org.apache.druid.math.expr.ExprMacroTable;
|
||||
|
@ -62,12 +63,14 @@ import org.apache.druid.query.groupby.GroupingEngine;
|
|||
import org.apache.druid.query.groupby.TestGroupByBuffers;
|
||||
import org.apache.druid.segment.IndexBuilder;
|
||||
import org.apache.druid.segment.IndexIO;
|
||||
import org.apache.druid.segment.IndexSpec;
|
||||
import org.apache.druid.segment.QueryableIndex;
|
||||
import org.apache.druid.segment.QueryableIndexStorageAdapter;
|
||||
import org.apache.druid.segment.Segment;
|
||||
import org.apache.druid.segment.StorageAdapter;
|
||||
import org.apache.druid.segment.TestIndex;
|
||||
import org.apache.druid.segment.column.ColumnConfig;
|
||||
import org.apache.druid.segment.incremental.IncrementalIndex;
|
||||
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import org.apache.druid.segment.loading.DataSegmentPusher;
|
||||
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
|
||||
|
@ -91,6 +94,7 @@ import javax.annotation.Nullable;
|
|||
import java.io.File;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
|
@ -99,6 +103,7 @@ import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE1;
|
|||
import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE2;
|
||||
import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE3;
|
||||
import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE5;
|
||||
import static org.apache.druid.sql.calcite.util.CalciteTests.WIKIPEDIA;
|
||||
import static org.apache.druid.sql.calcite.util.TestDataBuilder.INDEX_SCHEMA_LOTS_O_COLUMNS;
|
||||
import static org.apache.druid.sql.calcite.util.TestDataBuilder.INDEX_SCHEMA_NUMERIC_DIMS;
|
||||
import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS1;
|
||||
|
@ -205,6 +210,17 @@ public class CalciteMSQTestsHelper
|
|||
{
|
||||
final QueryableIndex index;
|
||||
switch (segmentId.getDataSource()) {
|
||||
case WIKIPEDIA:
|
||||
try {
|
||||
final File directory = new File(tempFolderProducer.apply("tmpDir"), StringUtils.format("wikipedia-index-%s", UUID.randomUUID()));
|
||||
final IncrementalIndex incrementalIndex = TestIndex.makeWikipediaIncrementalIndex();
|
||||
TestIndex.INDEX_MERGER.persist(incrementalIndex, directory, IndexSpec.DEFAULT, null);
|
||||
index = TestIndex.INDEX_IO.loadIndex(directory);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
break;
|
||||
case DATASOURCE1:
|
||||
IncrementalIndexSchema foo1Schema = new IncrementalIndexSchema.Builder()
|
||||
.withMetrics(
|
||||
|
|
|
@ -126,7 +126,7 @@ public interface Operator
|
|||
*/
|
||||
STOP,
|
||||
/**
|
||||
* Inidcates that the downstream processing should pause its pushing of results and instead return a
|
||||
* Indicates that the downstream processing should pause its pushing of results and instead return a
|
||||
* continuation object that encapsulates whatever state is required to resume processing. When this signal is
|
||||
* received, Operators that are generating data might choose to exert backpressure or otherwise pause their
|
||||
* processing efforts until called again with the returned continuation object.
|
||||
|
|
|
@ -23,7 +23,9 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.apache.druid.query.rowsandcols.RowsAndColumns;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
public class ComposingProcessor implements Processor
|
||||
{
|
||||
|
@ -37,6 +39,16 @@ public class ComposingProcessor implements Processor
|
|||
this.processors = processors;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getOutputColumnNames()
|
||||
{
|
||||
List<String> outputColumnNames = new ArrayList<>();
|
||||
for (Processor processor : processors) {
|
||||
outputColumnNames.addAll(processor.getOutputColumnNames());
|
||||
}
|
||||
return outputColumnNames;
|
||||
}
|
||||
|
||||
@JsonProperty("processors")
|
||||
public Processor[] getProcessors()
|
||||
{
|
||||
|
|
|
@ -31,6 +31,8 @@ import org.apache.druid.query.operator.window.value.WindowLastProcessor;
|
|||
import org.apache.druid.query.operator.window.value.WindowOffsetProcessor;
|
||||
import org.apache.druid.query.rowsandcols.RowsAndColumns;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* A Processor is a bit of logic that processes a single RowsAndColumns object to produce a new RowsAndColumns
|
||||
* object. Generally speaking, it is used to add or alter columns in a batch-oriented fashion.
|
||||
|
@ -80,4 +82,9 @@ public interface Processor
|
|||
* @return boolean identifying if these processors should be considered equivalent to each other.
|
||||
*/
|
||||
boolean validateEquivalent(Processor otherProcessor);
|
||||
|
||||
/**
|
||||
* @return List of output column names for the Processor.
|
||||
*/
|
||||
List<String> getOutputColumnNames();
|
||||
}
|
||||
|
|
|
@ -27,7 +27,9 @@ import org.apache.druid.query.rowsandcols.semantic.DefaultFramedOnHeapAggregatab
|
|||
import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public class WindowFramedAggregateProcessor implements Processor
|
||||
|
@ -45,6 +47,16 @@ public class WindowFramedAggregateProcessor implements Processor
|
|||
private final WindowFrame frame;
|
||||
private final AggregatorFactory[] aggregations;
|
||||
|
||||
@Override
|
||||
public List<String> getOutputColumnNames()
|
||||
{
|
||||
List<String> outputColumnNames = new ArrayList<>();
|
||||
for (AggregatorFactory aggregation : aggregations) {
|
||||
outputColumnNames.add(aggregation.getName());
|
||||
}
|
||||
return outputColumnNames;
|
||||
}
|
||||
|
||||
@JsonCreator
|
||||
public WindowFramedAggregateProcessor(
|
||||
@JsonProperty("frame") WindowFrame frame,
|
||||
|
|
|
@ -28,12 +28,20 @@ import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
|
|||
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class WindowPercentileProcessor implements Processor
|
||||
{
|
||||
private final int numBuckets;
|
||||
private final String outputColumn;
|
||||
|
||||
@Override
|
||||
public List<String> getOutputColumnNames()
|
||||
{
|
||||
return Collections.singletonList(outputColumn);
|
||||
}
|
||||
|
||||
@JsonCreator
|
||||
public WindowPercentileProcessor(
|
||||
@JsonProperty("outputColumn") String outputColumn,
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
|
|||
import org.apache.druid.query.rowsandcols.semantic.ClusteredGroupPartitioner;
|
||||
import org.apache.druid.query.rowsandcols.semantic.DefaultClusteredGroupPartitioner;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Function;
|
||||
|
@ -124,4 +125,9 @@ public abstract class WindowRankingProcessorBase implements Processor
|
|||
return Objects.equals(groupingCols, other.groupingCols) && Objects.equals(outputColumn, other.outputColumn);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getOutputColumnNames()
|
||||
{
|
||||
return Collections.singletonList(outputColumn);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,9 @@ import org.apache.druid.query.rowsandcols.column.ColumnAccessorBasedColumn;
|
|||
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class WindowRowNumberProcessor implements Processor
|
||||
{
|
||||
private final String outputColumn;
|
||||
|
@ -128,4 +131,10 @@ public class WindowRowNumberProcessor implements Processor
|
|||
"outputColumn='" + outputColumn + '\'' +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getOutputColumnNames()
|
||||
{
|
||||
return Collections.singletonList(outputColumn);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,8 @@ import org.apache.druid.query.rowsandcols.RowsAndColumns;
|
|||
import org.apache.druid.query.rowsandcols.column.Column;
|
||||
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
|
||||
public abstract class WindowValueProcessorBase implements Processor
|
||||
|
@ -100,4 +102,10 @@ public abstract class WindowValueProcessorBase implements Processor
|
|||
return "inputColumn=" + inputColumn +
|
||||
", outputColumn='" + outputColumn + '\'';
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getOutputColumnNames()
|
||||
{
|
||||
return Collections.singletonList(outputColumn);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,9 @@ import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class WindowProcessorOperatorTest
|
||||
{
|
||||
@Test
|
||||
|
@ -53,6 +56,12 @@ public class WindowProcessorOperatorTest
|
|||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getOutputColumnNames()
|
||||
{
|
||||
return Collections.emptyList();
|
||||
}
|
||||
},
|
||||
InlineScanOperator.make(rac)
|
||||
);
|
||||
|
|
|
@ -23,6 +23,9 @@ import org.apache.druid.query.rowsandcols.RowsAndColumns;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class ComposingProcessorTest
|
||||
{
|
||||
@Test
|
||||
|
@ -32,6 +35,7 @@ public class ComposingProcessorTest
|
|||
final ProcessorForTesting secondProcessor = new ProcessorForTesting();
|
||||
|
||||
ComposingProcessor proc = new ComposingProcessor(firstProcessor, secondProcessor);
|
||||
Assert.assertTrue(proc.getOutputColumnNames().isEmpty());
|
||||
|
||||
proc.process(null);
|
||||
Assert.assertEquals(1, firstProcessor.processCounter);
|
||||
|
@ -70,5 +74,11 @@ public class ComposingProcessorTest
|
|||
++validateCounter;
|
||||
return validationResult;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getOutputColumnNames()
|
||||
{
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.druid.query.operator.window;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import nl.jqno.equalsverifier.EqualsVerifier;
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
|
@ -51,6 +52,7 @@ public class WindowFramedAggregateProcessorTest
|
|||
new DoubleSumAggregatorFactory("cummSum", "doubleCol")
|
||||
};
|
||||
WindowFramedAggregateProcessor proc = new WindowFramedAggregateProcessor(theFrame, theAggs);
|
||||
Assert.assertEquals(ImmutableList.of("cummMax", "cummSum"), proc.getOutputColumnNames());
|
||||
|
||||
final MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(ImmutableMap.of(
|
||||
"yay", new IntArrayColumn(new int[]{1, 2, 3})
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
|
|||
import org.apache.druid.query.rowsandcols.RowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.column.Column;
|
||||
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
|
@ -42,6 +43,7 @@ public class WindowCumeDistProcessorTest
|
|||
MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(map);
|
||||
|
||||
Processor processor = new WindowCumeDistProcessor(Collections.singletonList("vals"), "CumeDist");
|
||||
Assert.assertEquals(Collections.singletonList("CumeDist"), processor.getOutputColumnNames());
|
||||
|
||||
final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
|
||||
.expectColumn("vals", new int[]{7, 18, 18, 30, 120, 121, 122, 122, 8290, 8290})
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
|
|||
import org.apache.druid.query.rowsandcols.RowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.column.Column;
|
||||
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
|
@ -42,6 +43,7 @@ public class WindowDenseRankProcessorTest
|
|||
MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(map);
|
||||
|
||||
Processor processor = new WindowDenseRankProcessor(Collections.singletonList("vals"), "DenseRank");
|
||||
Assert.assertEquals(Collections.singletonList("DenseRank"), processor.getOutputColumnNames());
|
||||
|
||||
final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
|
||||
.expectColumn("vals", new int[]{7, 18, 18, 30, 120, 121, 122, 122, 8290, 8290})
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.druid.query.operator.window.ranking;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.druid.query.operator.window.ComposingProcessor;
|
||||
import org.apache.druid.query.operator.window.Processor;
|
||||
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
|
||||
|
@ -29,6 +30,7 @@ import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
|
|||
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
|
||||
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
|
@ -63,6 +65,11 @@ public class WindowPercentileProcessorTest
|
|||
new WindowPercentileProcessor("10292", 10292)
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "10292"),
|
||||
processor.getOutputColumnNames()
|
||||
);
|
||||
|
||||
final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
|
||||
.expectColumn("intCol", new int[]{88, 1, 2, 3, 4, 5, 6, 7, 8, 9})
|
||||
.expectColumn("doubleCol", new double[]{0.4728, 1, 2, 3, 4, 5, 6, 7, 8, 9})
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.druid.query.operator.window.ranking;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.druid.query.operator.window.ComposingProcessor;
|
||||
import org.apache.druid.query.operator.window.Processor;
|
||||
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
|
||||
|
@ -26,6 +27,7 @@ import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
|
|||
import org.apache.druid.query.rowsandcols.RowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.column.Column;
|
||||
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
|
@ -49,6 +51,8 @@ public class WindowRankProcessorTest
|
|||
new WindowRankProcessor(orderingCols, "rankAsPercent", true)
|
||||
);
|
||||
|
||||
Assert.assertEquals(ImmutableList.of("rank", "rankAsPercent"), processor.getOutputColumnNames());
|
||||
|
||||
final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
|
||||
.expectColumn("vals", new int[]{7, 18, 18, 30, 120, 121, 122, 122, 8290, 8290})
|
||||
.expectColumn("rank", new int[]{1, 2, 2, 4, 5, 6, 7, 7, 9, 9})
|
||||
|
|
|
@ -28,8 +28,10 @@ import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
|
|||
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
|
||||
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -49,6 +51,7 @@ public class WindowRowNumberProcessorTest
|
|||
MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(map);
|
||||
|
||||
Processor processor = new WindowRowNumberProcessor("rowRow");
|
||||
Assert.assertEquals(Collections.singletonList("rowRow"), processor.getOutputColumnNames());
|
||||
|
||||
final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
|
||||
.expectColumn("intCol", new int[]{88, 1, 2, 3, 4, 5, 6, 7, 8, 9})
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.druid.query.operator.window.value;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.druid.query.operator.window.ComposingProcessor;
|
||||
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
|
||||
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
|
||||
|
@ -28,6 +29,7 @@ import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
|
|||
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
|
||||
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
|
@ -59,6 +61,11 @@ public class WindowFirstProcessorTest
|
|||
new WindowFirstProcessor("nullFirstCol", "NullFirstCol")
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of("FirstIntCol", "FirstDoubleCol", "FirstObjectCol", "NullFirstCol"),
|
||||
processor.getOutputColumnNames()
|
||||
);
|
||||
|
||||
final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
|
||||
.expectColumn("intCol", new int[]{88, 1, 2, 3, 4, 5, 6, 7, 8, 9})
|
||||
.expectColumn("doubleCol", new double[]{0.4728, 1, 2, 3, 4, 5, 6, 7, 8, 9})
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.druid.query.operator.window.value;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.druid.query.operator.window.ComposingProcessor;
|
||||
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
|
||||
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
|
||||
|
@ -28,6 +29,7 @@ import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
|
|||
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
|
||||
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
|
@ -58,6 +60,10 @@ public class WindowLastProcessorTest
|
|||
new WindowLastProcessor("objectCol", "LastObjectCol"),
|
||||
new WindowLastProcessor("nullLastCol", "NullLastCol")
|
||||
);
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of("LastIntCol", "LastDoubleCol", "LastObjectCol", "NullLastCol"),
|
||||
processor.getOutputColumnNames()
|
||||
);
|
||||
|
||||
|
||||
final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
|
||||
|
|
|
@ -7533,4 +7533,78 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
|
|||
{
|
||||
windowQueryTest();
|
||||
}
|
||||
|
||||
/*
|
||||
Druid query tests
|
||||
*/
|
||||
|
||||
@DrillTest("druid_queries/same_window_across_columns/wikipedia_query_1")
|
||||
@Test
|
||||
public void test_same_window_wikipedia_query_1()
|
||||
{
|
||||
windowQueryTest();
|
||||
}
|
||||
|
||||
@DrillTest("druid_queries/same_window_across_columns/wikipedia_query_1_named_window")
|
||||
@Test
|
||||
public void test_same_window_wikipedia_query_1_named_window()
|
||||
{
|
||||
windowQueryTest();
|
||||
}
|
||||
|
||||
@DrillTest("druid_queries/multiple_windows/wikipedia_query_1")
|
||||
@Test
|
||||
public void test_multiple_windows_wikipedia_query_1()
|
||||
{
|
||||
windowQueryTest();
|
||||
}
|
||||
|
||||
@DrillTest("druid_queries/multiple_windows/wikipedia_query_1_named_windows")
|
||||
@Test
|
||||
public void test_multiple_windows_wikipedia_query_1_named_windows()
|
||||
{
|
||||
windowQueryTest();
|
||||
}
|
||||
|
||||
@DrillTest("druid_queries/shuffle_columns/wikipedia_query_1")
|
||||
@Test
|
||||
public void test_shuffle_columns_wikipedia_query_1()
|
||||
{
|
||||
windowQueryTest();
|
||||
}
|
||||
|
||||
@DrillTest("druid_queries/shuffle_columns/wikipedia_query_1_shuffle_1")
|
||||
@Test
|
||||
public void test_shuffle_columns_wikipedia_query_1_shuffle_1()
|
||||
{
|
||||
windowQueryTest();
|
||||
}
|
||||
|
||||
@DrillTest("druid_queries/shuffle_columns/wikipedia_query_2")
|
||||
@Test
|
||||
public void test_shuffle_columns_wikipedia_query_2()
|
||||
{
|
||||
windowQueryTest();
|
||||
}
|
||||
|
||||
@DrillTest("druid_queries/shuffle_columns/wikipedia_query_2_shuffle_1")
|
||||
@Test
|
||||
public void test_shuffle_columns_wikipedia_query_2_shuffle_1()
|
||||
{
|
||||
windowQueryTest();
|
||||
}
|
||||
|
||||
@DrillTest("druid_queries/partition_by_multiple_columns/wikipedia_query_1")
|
||||
@Test
|
||||
public void test_partition_by_multiple_columns_wikipedia_query_1()
|
||||
{
|
||||
windowQueryTest();
|
||||
}
|
||||
|
||||
@DrillTest("druid_queries/partition_by_multiple_columns/wikipedia_query_2")
|
||||
@Test
|
||||
public void test_partition_by_multiple_columns_wikipedia_query_2()
|
||||
{
|
||||
windowQueryTest();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
null Austria 1 1
|
||||
null Republic of Korea 1 2
|
||||
null Republic of Korea 2 3
|
||||
null Republic of Korea 3 4
|
||||
Horsching Austria 2 1
|
||||
Jeonju Republic of Korea 4 1
|
||||
Seongnam-si Republic of Korea 5 1
|
||||
Seoul Republic of Korea 6 1
|
||||
Suwon-si Republic of Korea 7 1
|
||||
Vienna Austria 3 1
|
||||
Vienna Austria 4 2
|
||||
Vienna Austria 5 3
|
||||
Yongsan-dong Republic of Korea 8 1
|
|
@ -0,0 +1,6 @@
|
|||
select cityName, countryName,
|
||||
row_number() over (partition by countryName order by countryName, cityName, channel) as c1,
|
||||
count(channel) over (partition by cityName order by countryName, cityName, channel) as c2
|
||||
from wikipedia
|
||||
where countryName in ('Austria', 'Republic of Korea')
|
||||
group by countryName, cityName, channel
|
|
@ -0,0 +1,13 @@
|
|||
null Austria 1 1
|
||||
null Republic of Korea 1 2
|
||||
null Republic of Korea 2 3
|
||||
null Republic of Korea 3 4
|
||||
Horsching Austria 2 1
|
||||
Jeonju Republic of Korea 4 1
|
||||
Seongnam-si Republic of Korea 5 1
|
||||
Seoul Republic of Korea 6 1
|
||||
Suwon-si Republic of Korea 7 1
|
||||
Vienna Austria 3 1
|
||||
Vienna Austria 4 2
|
||||
Vienna Austria 5 3
|
||||
Yongsan-dong Republic of Korea 8 1
|
|
@ -0,0 +1,9 @@
|
|||
select cityName, countryName,
|
||||
row_number() over w1 as c1,
|
||||
count(channel) over w2 as c2
|
||||
from wikipedia
|
||||
where countryName in ('Austria', 'Republic of Korea')
|
||||
group by countryName, cityName, channel
|
||||
WINDOW
|
||||
w1 AS (partition by countryName order by countryName, cityName, channel),
|
||||
w2 AS (partition by cityName order by countryName, cityName, channel)
|
|
@ -0,0 +1,15 @@
|
|||
Austria null 94 7
|
||||
Austria null 4685 7
|
||||
Austria null 14 7
|
||||
Austria null 0 7
|
||||
Austria null 272 7
|
||||
Austria null 0 7
|
||||
Austria null 6979 7
|
||||
Guatemala null 0 1
|
||||
Guatemala El Salvador 1 1
|
||||
Guatemala Guatemala City 173 1
|
||||
Austria Horsching 0 1
|
||||
Austria Vienna 93 4
|
||||
Austria Vienna 72 4
|
||||
Austria Vienna 0 4
|
||||
Austria Vienna 0 4
|
|
@ -0,0 +1,7 @@
|
|||
SELECT
|
||||
countryName,
|
||||
cityName,
|
||||
added,
|
||||
count(added) OVER (PARTITION BY countryName, cityName)
|
||||
FROM "wikipedia"
|
||||
where countryName in ('Guatemala', 'Austria')
|
|
@ -0,0 +1,15 @@
|
|||
Austria null 0 7 12044 1
|
||||
Austria null 0 7 12044 2
|
||||
Austria null 14 7 12044 1
|
||||
Austria null 94 7 12044 1
|
||||
Austria null 272 7 12044 1
|
||||
Austria null 4685 7 12044 1
|
||||
Austria null 6979 7 12044 1
|
||||
Guatemala null 0 1 0 1
|
||||
Guatemala El Salvador 1 1 1 1
|
||||
Guatemala Guatemala City 173 1 173 1
|
||||
Austria Horsching 0 1 0 1
|
||||
Austria Vienna 0 4 165 1
|
||||
Austria Vienna 0 4 165 2
|
||||
Austria Vienna 72 4 165 1
|
||||
Austria Vienna 93 4 165 1
|
|
@ -0,0 +1,9 @@
|
|||
SELECT
|
||||
countryName,
|
||||
cityName,
|
||||
added,
|
||||
count(added) OVER (PARTITION BY countryName, cityName),
|
||||
sum(added) OVER (PARTITION BY countryName, cityName),
|
||||
ROW_NUMBER() OVER (PARTITION BY countryName, cityName, added)
|
||||
FROM "wikipedia"
|
||||
where countryName in ('Guatemala', 'Austria')
|
|
@ -0,0 +1,15 @@
|
|||
Austria 162 12047 12209
|
||||
Austria 162 12047 12209
|
||||
Austria 162 12047 12209
|
||||
Austria 162 12047 12209
|
||||
Austria 162 12047 12209
|
||||
Austria 162 12047 12209
|
||||
Austria 162 12047 12209
|
||||
Austria 162 12047 12209
|
||||
Austria 162 12047 12209
|
||||
Austria 162 12047 12209
|
||||
Austria 162 12047 12209
|
||||
Austria 162 12047 12209
|
||||
Guatemala 167 7 174
|
||||
Guatemala 167 7 174
|
||||
Guatemala 167 7 174
|
|
@ -0,0 +1,6 @@
|
|||
SELECT countryName,
|
||||
sum("deleted") OVER (PARTITION BY countryName) as count_c3,
|
||||
sum(delta) OVER (PARTITION BY countryName) as count_c1,
|
||||
sum(added) OVER (PARTITION BY countryName) as count_c2
|
||||
FROM "wikipedia"
|
||||
where countryName in ('Guatemala', 'Austria')
|
|
@ -0,0 +1,15 @@
|
|||
Austria 162 12047 12209
|
||||
Austria 162 12047 12209
|
||||
Austria 162 12047 12209
|
||||
Austria 162 12047 12209
|
||||
Austria 162 12047 12209
|
||||
Austria 162 12047 12209
|
||||
Austria 162 12047 12209
|
||||
Austria 162 12047 12209
|
||||
Austria 162 12047 12209
|
||||
Austria 162 12047 12209
|
||||
Austria 162 12047 12209
|
||||
Austria 162 12047 12209
|
||||
Guatemala 167 7 174
|
||||
Guatemala 167 7 174
|
||||
Guatemala 167 7 174
|
|
@ -0,0 +1,7 @@
|
|||
SELECT countryName,
|
||||
sum("deleted") OVER w as count_c3,
|
||||
sum(delta) OVER w as count_c1,
|
||||
sum(added) OVER w as count_c2
|
||||
FROM "wikipedia"
|
||||
where countryName in ('Guatemala', 'Austria')
|
||||
WINDOW w AS (PARTITION BY countryName)
|
|
@ -0,0 +1,15 @@
|
|||
Austria 1017.4166666666666
|
||||
Austria 1017.4166666666666
|
||||
Austria 1017.4166666666666
|
||||
Austria 1017.4166666666666
|
||||
Austria 1017.4166666666666
|
||||
Austria 1017.4166666666666
|
||||
Austria 1017.4166666666666
|
||||
Austria 1017.4166666666666
|
||||
Austria 1017.4166666666666
|
||||
Austria 1017.4166666666666
|
||||
Austria 1017.4166666666666
|
||||
Austria 1017.4166666666666
|
||||
Guatemala 58
|
||||
Guatemala 58
|
||||
Guatemala 58
|
|
@ -0,0 +1,5 @@
|
|||
SELECT
|
||||
countryName,
|
||||
AVG(added) OVER(PARTITION BY countryName)
|
||||
FROM wikipedia
|
||||
where countryName in ('Guatemala', 'Austria')
|
|
@ -0,0 +1,15 @@
|
|||
1017.4166666666666 Austria
|
||||
1017.4166666666666 Austria
|
||||
1017.4166666666666 Austria
|
||||
1017.4166666666666 Austria
|
||||
1017.4166666666666 Austria
|
||||
1017.4166666666666 Austria
|
||||
1017.4166666666666 Austria
|
||||
1017.4166666666666 Austria
|
||||
1017.4166666666666 Austria
|
||||
1017.4166666666666 Austria
|
||||
1017.4166666666666 Austria
|
||||
1017.4166666666666 Austria
|
||||
58 Guatemala
|
||||
58 Guatemala
|
||||
58 Guatemala
|
|
@ -0,0 +1,5 @@
|
|||
SELECT
|
||||
AVG(added) OVER(PARTITION BY countryName),
|
||||
countryName
|
||||
FROM wikipedia
|
||||
where countryName in ('Guatemala', 'Austria')
|
|
@ -0,0 +1,16 @@
|
|||
Austria null 1 #de.wikipedia 1
|
||||
Guatemala null 1 #es.wikipedia 2
|
||||
Republic of Korea null 1 #en.wikipedia 3
|
||||
Republic of Korea null 2 #ja.wikipedia 4
|
||||
Republic of Korea null 3 #ko.wikipedia 5
|
||||
Guatemala El Salvador 2 #es.wikipedia 1
|
||||
Guatemala Guatemala City 3 #es.wikipedia 1
|
||||
Austria Horsching 2 #de.wikipedia 1
|
||||
Republic of Korea Jeonju 4 #ko.wikipedia 1
|
||||
Republic of Korea Seongnam-si 5 #ko.wikipedia 1
|
||||
Republic of Korea Seoul 6 #ko.wikipedia 1
|
||||
Republic of Korea Suwon-si 7 #ko.wikipedia 1
|
||||
Austria Vienna 3 #de.wikipedia 1
|
||||
Austria Vienna 4 #es.wikipedia 2
|
||||
Austria Vienna 5 #tr.wikipedia 3
|
||||
Republic of Korea Yongsan-dong 8 #ko.wikipedia 1
|
|
@ -0,0 +1,9 @@
|
|||
SELECT
|
||||
countryName,
|
||||
cityName,
|
||||
ROW_NUMBER() OVER(PARTITION BY countryName),
|
||||
channel,
|
||||
COUNT(channel) over (PARTITION BY cityName order by countryName, cityName, channel)
|
||||
FROM wikipedia
|
||||
where countryName in ('Guatemala', 'Austria', 'Republic of Korea')
|
||||
group by countryName, cityName, channel
|
|
@ -0,0 +1,16 @@
|
|||
1 Austria null 1 #de.wikipedia
|
||||
1 Guatemala null 2 #es.wikipedia
|
||||
1 Republic of Korea null 3 #en.wikipedia
|
||||
2 Republic of Korea null 4 #ja.wikipedia
|
||||
3 Republic of Korea null 5 #ko.wikipedia
|
||||
2 Guatemala El Salvador 1 #es.wikipedia
|
||||
3 Guatemala Guatemala City 1 #es.wikipedia
|
||||
2 Austria Horsching 1 #de.wikipedia
|
||||
4 Republic of Korea Jeonju 1 #ko.wikipedia
|
||||
5 Republic of Korea Seongnam-si 1 #ko.wikipedia
|
||||
6 Republic of Korea Seoul 1 #ko.wikipedia
|
||||
7 Republic of Korea Suwon-si 1 #ko.wikipedia
|
||||
3 Austria Vienna 1 #de.wikipedia
|
||||
4 Austria Vienna 2 #es.wikipedia
|
||||
5 Austria Vienna 3 #tr.wikipedia
|
||||
8 Republic of Korea Yongsan-dong 1 #ko.wikipedia
|
|
@ -0,0 +1,9 @@
|
|||
SELECT
|
||||
ROW_NUMBER() OVER(PARTITION BY countryName),
|
||||
countryName,
|
||||
cityName,
|
||||
COUNT(channel) over (PARTITION BY cityName order by countryName, cityName, channel),
|
||||
channel
|
||||
FROM wikipedia
|
||||
where countryName in ('Guatemala', 'Austria', 'Republic of Korea')
|
||||
group by countryName, cityName, channel
|
Loading…
Reference in New Issue