Add GlueingPartitioningOperator + Corresponding changes in window function layer to consume it for MSQ (#17038)

*    GlueingPartitioningOperator: It continuously receives data, and outputs batches of partitioned RACs. It maintains a last-partitioning-boundary of the last-pushed-RAC, and attempts to glue it with the next RAC it receives, ensuring that partitions are handled correctly, even across multiple RACs. You can check GlueingPartitioningOperatorTest for some good examples of the "glueing" work.
*    PartitionSortOperator: It sorts rows inside partitioned RACs, on the sort columns. The input RACs it receives are expected to be "complete / separate" partitions of data.
This commit is contained in:
Akshat Jain 2024-10-17 10:54:52 +05:30 committed by GitHub
parent 90175b8927
commit 450fb0147b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 1679 additions and 783 deletions

View File

@ -34,29 +34,23 @@ import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.util.SettableLongVirtualColumn;
import org.apache.druid.frame.write.FrameWriter;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.Unit;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.operator.OffsetLimit;
import org.apache.druid.query.operator.Operator;
import org.apache.druid.query.operator.OperatorFactory;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.NullableTypeStrategy;
import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable;
@ -66,37 +60,25 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
{
private static final Logger log = new Logger(WindowOperatorQueryFrameProcessor.class);
private final WindowOperatorQuery query;
private final List<OperatorFactory> operatorFactoryList;
private final List<String> partitionColumnNames;
private final ObjectMapper jsonMapper;
private final ArrayList<RowsAndColumns> frameRowsAndCols;
private final ArrayList<RowsAndColumns> resultRowAndCols;
private final RowsAndColumnsBuilder frameRowsAndColsBuilder;
private final ReadableFrameChannel inputChannel;
private final WritableFrameChannel outputChannel;
private final FrameWriterFactory frameWriterFactory;
private final FrameReader frameReader;
private final int maxRowsMaterialized;
private Cursor frameCursor = null;
private Supplier<ResultRow> rowSupplierFromFrameCursor;
private ResultRow outputRow = null;
private FrameWriter frameWriter = null;
private final VirtualColumns frameWriterVirtualColumns;
private final SettableLongVirtualColumn partitionBoostVirtualColumn;
// List of type strategies to compare the partition columns across rows.
// Type strategies are pushed in the same order as column types in frameReader.signature()
private final NullableTypeStrategy[] typeStrategies;
private final ArrayList<ResultRow> rowsToProcess;
private int lastPartitionIndex = -1;
private Operator operator = null;
final AtomicInteger rowId = new AtomicInteger(0);
@ -107,29 +89,18 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
FrameWriterFactory frameWriterFactory,
FrameReader frameReader,
ObjectMapper jsonMapper,
final List<OperatorFactory> operatorFactoryList,
final RowSignature rowSignature,
final int maxRowsMaterializedInWindow,
final List<String> partitionColumnNames
final List<OperatorFactory> operatorFactoryList
)
{
this.inputChannel = inputChannel;
this.outputChannel = outputChannel;
this.frameWriterFactory = frameWriterFactory;
this.operatorFactoryList = operatorFactoryList;
this.jsonMapper = jsonMapper;
this.query = query;
this.frameRowsAndCols = new ArrayList<>();
this.resultRowAndCols = new ArrayList<>();
this.rowsToProcess = new ArrayList<>();
this.maxRowsMaterialized = maxRowsMaterializedInWindow;
this.partitionColumnNames = partitionColumnNames;
this.maxRowsMaterialized = MultiStageQueryContext.getMaxRowsMaterializedInWindow(query.context());
this.frameRowsAndColsBuilder = new RowsAndColumnsBuilder(this.maxRowsMaterialized);
this.frameReader = frameReader;
this.typeStrategies = new NullableTypeStrategy[frameReader.signature().size()];
for (int i = 0; i < frameReader.signature().size(); i++) {
typeStrategies[i] = frameReader.signature().getColumnType(i).get().getNullableStrategy();
}
// Get virtual columns to be added to the frame writer.
this.partitionBoostVirtualColumn = new SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN);
@ -141,6 +112,8 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
}
frameWriterVirtualColumns.add(this.partitionBoostVirtualColumn);
this.frameWriterVirtualColumns = VirtualColumns.create(frameWriterVirtualColumns);
initialiseOperator();
}
@Override
@ -158,174 +131,68 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
@Override
public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs) throws IOException
{
/*
There are 2 scenarios:
*** Scenario 1: Query has atleast one window function with an OVER() clause without a PARTITION BY ***
In this scenario, we add all the RACs to a single RowsAndColumns to be processed. We do it via ConcatRowsAndColumns, and run all the operators on the ConcatRowsAndColumns.
This is done because we anyway need to run the operators on the entire set of rows when we have an OVER() clause without a PARTITION BY.
This scenario corresponds to partitionColumnNames.isEmpty()=true code flow.
*** Scenario 2: All window functions in the query have OVER() clause with a PARTITION BY ***
In this scenario, we need to process rows for each PARTITION BY group together, but we can batch multiple PARTITION BY keys into the same RAC before passing it to the operators for processing.
Batching is fine since the operators list would have the required NaivePartitioningOperatorFactory to segregate each PARTITION BY group during the processing.
The flow for this scenario can be summarised as following:
1. Frame Reading and Cursor Initialization: We start by reading a frame from the inputChannel and initializing frameCursor to iterate over the rows in that frame.
2. Row Comparison: For each row in the frame, we decide whether it belongs to the same PARTITION BY group as the previous row.
This is determined by comparePartitionKeys() method.
Please refer to the Javadoc of that method for further details and an example illustration.
2.1. If the PARTITION BY columns of current row matches the PARTITION BY columns of the previous row,
they belong to the same PARTITION BY group, and gets added to rowsToProcess.
If the number of total rows materialized exceed maxRowsMaterialized, we process the pending batch via processRowsUpToLastPartition() method.
2.2. If they don't match, then we have reached a partition boundary.
In this case, we update the value for lastPartitionIndex.
3. End of Input: If the input channel is finished, any remaining rows in rowsToProcess are processed.
*Illustration of Row Comparison step*
Let's say we have window_function() OVER (PARTITION BY A ORDER BY B) in our query, and we get 3 frames in the input channel to process.
Frame 1
A, B
1, 2
1, 3
2, 1 --> PARTITION BY key (column A) changed from 1 to 2.
2, 2
Frame 2
A, B
3, 1 --> PARTITION BY key (column A) changed from 2 to 3.
3, 2
3, 3
3, 4
Frame 3
A, B
3, 5
3, 6
4, 1 --> PARTITION BY key (column A) changed from 3 to 4.
4, 2
*Why batching?*
We batch multiple PARTITION BY keys for processing together to avoid the overhead of creating different RACs for each PARTITION BY keys, as that would be unnecessary in scenarios where we have a large number of PARTITION BY keys, but each key having a single row.
*Future thoughts: https://github.com/apache/druid/issues/16126*
Current approach with R&C and operators materialize a single R&C for processing. In case of data with low cardinality a single R&C might be too big to consume. Same for the case of empty OVER() clause.
Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with 2 passes of the data. We might think to reimplement them in the MSQ way so that we do not have to materialize so much data.
*/
// If there are rows pending flush, flush them and run again before processing any more rows.
if (frameHasRowsPendingFlush()) {
flushAllRowsAndCols();
return ReturnOrAwait.runAgain();
}
if (partitionColumnNames.isEmpty()) {
// Scenario 1: Query has atleast one window function with an OVER() clause without a PARTITION BY.
if (inputChannel.canRead()) {
final Frame frame = inputChannel.read();
convertRowFrameToRowsAndColumns(frame);
if (inputChannel.canRead()) {
final Frame frame = inputChannel.read();
LazilyDecoratedRowsAndColumns ldrc = convertRowFrameToRowsAndColumns(frame);
frameRowsAndColsBuilder.add(ldrc);
if (needToProcessBatch()) {
runAllOpsOnBatch();
flushAllRowsAndCols();
}
return ReturnOrAwait.runAgain();
} else if (inputChannel.isFinished()) {
if (rowId.get() == 0) {
runAllOpsOnBatch();
}
// If there are still rows pending after operations, run again.
if (frameHasRowsPendingFlush()) {
return ReturnOrAwait.runAgain();
}
if (inputChannel.isFinished()) {
// If no rows are flushed yet, process all rows.
if (rowId.get() == 0) {
runAllOpsOnMultipleRac(frameRowsAndCols);
}
// If there are still rows pending after operations, run again.
if (frameHasRowsPendingFlush()) {
return ReturnOrAwait.runAgain();
}
return ReturnOrAwait.returnObject(Unit.instance());
}
return ReturnOrAwait.returnObject(Unit.instance());
} else {
return ReturnOrAwait.awaitAll(inputChannels().size());
}
// Scenario 2: All window functions in the query have OVER() clause with a PARTITION BY
if (frameCursor == null || frameCursor.isDone()) {
if (readableInputs.isEmpty()) {
return ReturnOrAwait.awaitAll(1);
}
if (inputChannel.canRead()) {
final Frame frame = inputChannel.read();
frameCursor = FrameProcessors.makeCursor(frame, frameReader);
makeRowSupplierFromFrameCursor();
} else if (inputChannel.isFinished()) {
// If we have some rows pending processing, process them.
// We run it again as it's possible that frame writer's capacity got reached and some output rows are
// pending flush to the output channel.
if (!rowsToProcess.isEmpty()) {
lastPartitionIndex = rowsToProcess.size() - 1;
processRowsUpToLastPartition();
return ReturnOrAwait.runAgain();
}
return ReturnOrAwait.returnObject(Unit.instance());
} else {
return ReturnOrAwait.runAgain();
}
}
while (!frameCursor.isDone()) {
final ResultRow currentRow = rowSupplierFromFrameCursor.get();
if (outputRow == null) {
outputRow = currentRow;
rowsToProcess.add(currentRow);
} else if (comparePartitionKeys(outputRow, currentRow, partitionColumnNames)) {
// Add current row to the same batch of rows for processing.
rowsToProcess.add(currentRow);
} else {
lastPartitionIndex = rowsToProcess.size() - 1;
outputRow = currentRow.copy();
rowsToProcess.add(currentRow);
}
frameCursor.advance();
if (rowsToProcess.size() > maxRowsMaterialized) {
// We don't want to materialize more than maxRowsMaterialized rows at any point in time, so process the pending batch.
processRowsUpToLastPartition();
ensureMaxRowsInAWindowConstraint(rowsToProcess.size());
return ReturnOrAwait.runAgain();
}
}
return ReturnOrAwait.runAgain();
}
/**
* @param listOfRacs Concat this list of {@link RowsAndColumns} to a {@link ConcatRowsAndColumns} to use as a single input for the operators to be run
*/
private void runAllOpsOnMultipleRac(ArrayList<RowsAndColumns> listOfRacs)
private void initialiseOperator()
{
Operator op = new Operator()
operator = new Operator()
{
@Nullable
@Override
public Closeable goOrContinue(Closeable continuationObject, Receiver receiver)
{
RowsAndColumns rac = new ConcatRowsAndColumns(listOfRacs);
ensureMaxRowsInAWindowConstraint(rac.numRows());
RowsAndColumns rac = frameRowsAndColsBuilder.build();
ensureMaxRowsInAWindowConstraint(rac.numRows(), maxRowsMaterialized);
receiver.push(rac);
receiver.completed();
return null;
if (inputChannel.isFinished()) {
// Only call completed() when the input channel is finished.
receiver.completed();
return null; // Signal that the operator has completed its work
}
// Return a non-null continuation object to indicate that we want to continue processing.
return () -> {
};
}
};
runOperatorsAfterThis(op);
for (OperatorFactory of : operatorFactoryList) {
operator = of.wrap(operator);
}
}
/**
* @param op Base operator for the operators to be run. Other operators are wrapped under this to run
*/
private void runOperatorsAfterThis(Operator op)
private void runAllOpsOnBatch()
{
for (OperatorFactory of : operatorFactoryList) {
op = of.wrap(op);
}
Operator.go(op, new Operator.Receiver()
operator.goOrContinue(null, new Operator.Receiver()
{
@Override
public Operator.Signal push(RowsAndColumns rac)
@ -349,6 +216,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
/**
* Flushes {@link #resultRowAndCols} to the frame starting from {@link #rowId}, upto the frame writer's capacity.
*
* @throws IOException
*/
private void flushAllRowsAndCols() throws IOException
@ -359,7 +227,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
}
/**
* @param rac The frame writer to write this {@link RowsAndColumns} object
* @param rac The frame writer to write this {@link RowsAndColumns} object
*/
private void createFrameWriterIfNeeded(RowsAndColumns rac)
{
@ -373,7 +241,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
}
/**
* @param rac {@link RowsAndColumns} to be written to frame
* @param rac {@link RowsAndColumns} to be written to frame
* @throws IOException
*/
public void writeRacToFrame(RowsAndColumns rac) throws IOException
@ -432,9 +300,10 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
/**
* @param frame Row based frame to be converted to a {@link RowsAndColumns} object
* Throw an exception if the resultant rac used goes above the guardrail value
* Throw an exception if the resultant rac used goes above the guardrail value
* @return A {@link LazilyDecoratedRowsAndColumns} encapsulating the frame.
*/
private void convertRowFrameToRowsAndColumns(Frame frame)
private LazilyDecoratedRowsAndColumns convertRowFrameToRowsAndColumns(Frame frame)
{
final RowSignature signature = frameReader.signature();
RowBasedFrameRowsAndColumns frameRowsAndColumns = new RowBasedFrameRowsAndColumns(frame, signature);
@ -445,94 +314,13 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
null,
OffsetLimit.limit(Integer.MAX_VALUE),
null,
null
null,
frameWriterFactory.allocatorCapacity()
);
// check if existing + newly added rows exceed guardrails
ensureMaxRowsInAWindowConstraint(frameRowsAndCols.size() + ldrc.numRows());
frameRowsAndCols.add(ldrc);
return ldrc;
}
/**
* Compare two rows based on the columns in partitionColumnNames.
* If the partitionColumnNames is empty, the method will end up returning true.
* <p>
* For example, say:
* <ul>
* <li>partitionColumnNames = ["d1", "d2"]</li>
* <li>frameReader's row signature = {d1:STRING, d2:STRING, p0:STRING}</li>
* <li>frameReader.signature.indexOf("d1") = 0</li>
* <li>frameReader.signature.indexOf("d2") = 1</li>
* <li>row1 = [d1_row1, d2_row1, p0_row1]</li>
* <li>row2 = [d1_row2, d2_row2, p0_row2]</li>
* </ul>
* <p>
* Then this method will return true if d1_row1==d1_row2 && d2_row1==d2_row2, false otherwise.
* Returning true would indicate that these 2 rows can be put into the same partition for window function processing.
*/
private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List<String> partitionColumnNames)
{
int match = 0;
for (String columnName : partitionColumnNames) {
int i = frameReader.signature().indexOf(columnName);
if (ColumnType.STRING.equals(frameReader.signature().getColumnType(columnName).get()) && (row1.get(i) instanceof List || row2.get(i) instanceof List)) {
// special handling to reject MVDs
throw new UOE(
"Encountered a multi value column [%s]. Window processing does not support MVDs. "
+ "Consider using UNNEST or MV_TO_ARRAY.",
columnName
);
}
if (typeStrategies[i].compare(row1.get(i), row2.get(i)) == 0) {
match++;
}
}
return match == partitionColumnNames.size();
}
private void makeRowSupplierFromFrameCursor()
{
final ColumnSelectorFactory frameColumnSelectorFactory = frameCursor.getColumnSelectorFactory();
final Supplier<Object>[] fieldSuppliers = new Supplier[frameReader.signature().size()];
for (int i = 0; i < fieldSuppliers.length; i++) {
final ColumnValueSelector<?> selector =
frameColumnSelectorFactory.makeColumnValueSelector(frameReader.signature().getColumnName(i));
fieldSuppliers[i] = selector::getObject;
}
rowSupplierFromFrameCursor = () -> {
final ResultRow row = ResultRow.create(fieldSuppliers.length);
for (int i = 0; i < fieldSuppliers.length; i++) {
row.set(i, fieldSuppliers[i].get());
}
return row;
};
}
/**
* Process rows from rowsToProcess[0, lastPartitionIndex].
*/
private void processRowsUpToLastPartition()
{
if (lastPartitionIndex == -1) {
return;
}
RowsAndColumns singleRac = MapOfColumnsRowsAndColumns.fromResultRowTillIndex(
rowsToProcess,
frameReader.signature(),
lastPartitionIndex
);
ArrayList<RowsAndColumns> rowsAndColumns = new ArrayList<>();
rowsAndColumns.add(singleRac);
runAllOpsOnMultipleRac(rowsAndColumns);
// Remove elements in the range [0, lastPartitionIndex] from the list.
// The call to list.subList(a, b).clear() deletes the elements in the range [a, b - 1],
// causing the remaining elements to shift and start from index 0.
rowsToProcess.subList(0, lastPartitionIndex + 1).clear();
lastPartitionIndex = -1;
}
private void ensureMaxRowsInAWindowConstraint(int numRowsInWindow)
private static void ensureMaxRowsInAWindowConstraint(int numRowsInWindow, int maxRowsMaterialized)
{
if (numRowsInWindow > maxRowsMaterialized) {
throw new MSQException(new TooManyRowsInAWindowFault(
@ -542,6 +330,50 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
}
}
private boolean needToProcessBatch()
{
return frameRowsAndColsBuilder.getNumRows() >= maxRowsMaterialized / 2; // Can this be improved further?
}
private static class RowsAndColumnsBuilder
{
private final List<RowsAndColumns> racList;
private int totalRows;
private final int maxRowsMaterialized;
public RowsAndColumnsBuilder(int maxRowsMaterialized)
{
this.racList = new ArrayList<>();
this.totalRows = 0;
this.maxRowsMaterialized = maxRowsMaterialized;
}
public void add(RowsAndColumns rac)
{
racList.add(rac);
totalRows += rac.numRows();
ensureMaxRowsInAWindowConstraint(getNumRows(), maxRowsMaterialized);
}
public int getNumRows()
{
return totalRows;
}
public RowsAndColumns build()
{
ConcatRowsAndColumns concatRowsAndColumns = new ConcatRowsAndColumns(new ArrayList<>(racList));
clear();
return concatRowsAndColumns;
}
public void clear()
{
racList.clear();
totalRows = 0;
}
}
/**
* Increments the value of the partition boosting column. It should be called once the row value has been written
* to the frame
@ -561,7 +393,6 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
private void clearRACBuffers()
{
frameRowsAndCols.clear();
resultRowAndCols.clear();
rowId.set(0);
}

View File

@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectSortedMap;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.processor.OutputChannel;
import org.apache.druid.frame.processor.OutputChannelFactory;
@ -60,27 +59,17 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
private final WindowOperatorQuery query;
private final List<OperatorFactory> operatorList;
private final RowSignature stageRowSignature;
private final int maxRowsMaterializedInWindow;
private final List<String> partitionColumnNames;
@JsonCreator
public WindowOperatorQueryFrameProcessorFactory(
@JsonProperty("query") WindowOperatorQuery query,
@JsonProperty("operatorList") List<OperatorFactory> operatorFactoryList,
@JsonProperty("stageRowSignature") RowSignature stageRowSignature,
@JsonProperty("maxRowsMaterializedInWindow") int maxRowsMaterializedInWindow,
@JsonProperty("partitionColumnNames") List<String> partitionColumnNames
@JsonProperty("stageRowSignature") RowSignature stageRowSignature
)
{
this.query = Preconditions.checkNotNull(query, "query");
this.operatorList = Preconditions.checkNotNull(operatorFactoryList, "bad operator");
this.stageRowSignature = Preconditions.checkNotNull(stageRowSignature, "stageSignature");
this.maxRowsMaterializedInWindow = maxRowsMaterializedInWindow;
if (partitionColumnNames == null) {
throw DruidException.defensive("List of partition column names encountered as null.");
}
this.partitionColumnNames = partitionColumnNames;
}
@JsonProperty("query")
@ -95,24 +84,12 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
return operatorList;
}
@JsonProperty("partitionColumnNames")
public List<String> getPartitionColumnNames()
{
return partitionColumnNames;
}
@JsonProperty("stageRowSignature")
public RowSignature getSignature()
{
return stageRowSignature;
}
@JsonProperty("maxRowsMaterializedInWindow")
public int getMaxRowsMaterializedInWindow()
{
return maxRowsMaterializedInWindow;
}
@Override
public ProcessorsAndChannels<Object, Long> makeProcessors(
StageDefinition stageDefinition,
@ -153,6 +130,7 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
readableInput -> {
final OutputChannel outputChannel =
outputChannels.get(readableInput.getStagePartition().getPartitionNumber());
return new WindowOperatorQueryFrameProcessor(
query,
readableInput.getChannel(),
@ -160,10 +138,7 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
stageDefinition.createFrameWriterFactory(outputChannel.getFrameMemoryAllocator(), removeNullBytes),
readableInput.getChannelFrameReader(),
frameContext.jsonMapper(),
operatorList,
stageRowSignature,
maxRowsMaterializedInWindow,
partitionColumnNames
operatorList
);
}
);
@ -190,16 +165,14 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
return false;
}
WindowOperatorQueryFrameProcessorFactory that = (WindowOperatorQueryFrameProcessorFactory) o;
return maxRowsMaterializedInWindow == that.maxRowsMaterializedInWindow
&& Objects.equals(query, that.query)
return Objects.equals(query, that.query)
&& Objects.equals(operatorList, that.operatorList)
&& Objects.equals(partitionColumnNames, that.partitionColumnNames)
&& Objects.equals(stageRowSignature, that.stageRowSignature);
}
@Override
public int hashCode()
{
return Objects.hash(query, operatorList, partitionColumnNames, stageRowSignature, maxRowsMaterializedInWindow);
return Objects.hash(query, operatorList, stageRowSignature);
}
}

View File

@ -26,7 +26,6 @@ import org.apache.druid.frame.key.KeyOrder;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.input.stage.StageInputSpec;
import org.apache.druid.msq.kernel.HashShuffleSpec;
import org.apache.druid.msq.kernel.MixShuffleSpec;
@ -35,10 +34,12 @@ import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
import org.apache.druid.msq.kernel.ShuffleSpec;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.operator.AbstractPartitioningOperatorFactory;
import org.apache.druid.query.operator.AbstractSortOperatorFactory;
import org.apache.druid.query.operator.ColumnWithDirection;
import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
import org.apache.druid.query.operator.NaiveSortOperatorFactory;
import org.apache.druid.query.operator.GlueingPartitioningOperatorFactory;
import org.apache.druid.query.operator.OperatorFactory;
import org.apache.druid.query.operator.PartitionSortOperatorFactory;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.operator.window.WindowOperatorFactory;
import org.apache.druid.segment.column.ColumnType;
@ -68,20 +69,9 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
int minStageNumber
)
{
// Need to validate query first.
// Populate the group of operators to be processed at each stage.
// The size of the operators is the number of serialized stages.
// Later we should also check if these can be parallelized.
// Check if there is an empty OVER() clause or not.
RowSignature rowSignature = originalQuery.getRowSignature();
log.info("Row signature received for query is [%s].", rowSignature);
boolean isEmptyOverPresent = originalQuery.getOperators()
.stream()
.filter(of -> of instanceof NaivePartitioningOperatorFactory)
.map(of -> (NaivePartitioningOperatorFactory) of)
.anyMatch(of -> of.getPartitionColumns().isEmpty());
List<List<OperatorFactory>> operatorList = getOperatorListFromQuery(originalQuery);
log.info("Created operatorList with operator factories: [%s]", operatorList);
@ -112,135 +102,84 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
final ShuffleSpec finalWindowStageShuffleSpec = resultShuffleSpecFactory.build(finalWindowClusterBy, false);
final RowSignature finalWindowStageRowSignature = computeSignatureForFinalWindowStage(rowSignature, finalWindowClusterBy, segmentGranularity);
final int maxRowsMaterialized;
if (originalQuery.context() != null && originalQuery.context().containsKey(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW)) {
maxRowsMaterialized = (int) originalQuery.context().get(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW);
} else {
maxRowsMaterialized = Limits.MAX_ROWS_MATERIALIZED_IN_WINDOW;
final int maxRowsMaterialized = MultiStageQueryContext.getMaxRowsMaterializedInWindow(originalQuery.context());
// There are multiple windows present in the query.
// Create stages for each window in the query.
// These stages will be serialized.
// The partition by clause of the next window will be the shuffle key for the previous window.
RowSignature.Builder bob = RowSignature.builder();
RowSignature signatureFromInput = dataSourcePlan.getSubQueryDefBuilder().get().build().getFinalStageDefinition().getSignature();
log.info("Row signature received from last stage is [%s].", signatureFromInput);
for (int i = 0; i < signatureFromInput.getColumnNames().size(); i++) {
bob.add(signatureFromInput.getColumnName(i), signatureFromInput.getColumnType(i).get());
}
if (isEmptyOverPresent) {
// Move everything to a single partition since we have to load all the data on a single worker anyway to compute empty over() clause.
log.info(
"Empty over clause is present in the query. Creating a single stage with all operator factories [%s].",
queryToRun.getOperators()
);
/*
operatorList is a List<List<OperatorFactory>>, where each List<OperatorFactory> corresponds to the operator factories
to be used for a different window stage.
We iterate over operatorList, and add the definition for a window stage to QueryDefinitionBuilder.
*/
for (int i = 0; i < operatorList.size(); i++) {
for (OperatorFactory operatorFactory : operatorList.get(i)) {
if (operatorFactory instanceof WindowOperatorFactory) {
List<String> outputColumnNames = ((WindowOperatorFactory) operatorFactory).getProcessor().getOutputColumnNames();
// Need to add column names which are present in outputColumnNames and rowSignature but not in bob,
// since they need to be present in the row signature for this window stage.
for (String columnName : outputColumnNames) {
int indexInRowSignature = rowSignature.indexOf(columnName);
if (indexInRowSignature != -1 && bob.build().indexOf(columnName) == -1) {
ColumnType columnType = rowSignature.getColumnType(indexInRowSignature).get();
bob.add(columnName, columnType);
log.info("Added column [%s] of type [%s] to row signature for window stage.", columnName, columnType);
} else {
throw new ISE(
"Found unexpected column [%s] already present in row signature [%s].",
columnName,
rowSignature
);
}
}
}
}
final RowSignature intermediateSignature = bob.build();
final RowSignature stageRowSignature;
if (i + 1 == operatorList.size()) {
stageRowSignature = finalWindowStageRowSignature;
nextShuffleSpec = finalWindowStageShuffleSpec;
} else {
nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(i + 1), queryKitSpec.getNumPartitionsForShuffle());
if (nextShuffleSpec == null) {
stageRowSignature = intermediateSignature;
} else {
stageRowSignature = QueryKitUtils.sortableSignature(
intermediateSignature,
nextShuffleSpec.clusterBy().getColumns()
);
}
}
log.info("Using row signature [%s] for window stage.", stageRowSignature);
queryDefBuilder.add(
StageDefinition.builder(firstStageNumber)
.inputs(new StageInputSpec(firstStageNumber - 1))
.signature(finalWindowStageRowSignature)
StageDefinition.builder(firstStageNumber + i)
.inputs(new StageInputSpec(firstStageNumber + i - 1))
.signature(stageRowSignature)
.maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount())
.shuffleSpec(finalWindowStageShuffleSpec)
.shuffleSpec(nextShuffleSpec)
.processorFactory(new WindowOperatorQueryFrameProcessorFactory(
queryToRun,
queryToRun.getOperators(),
finalWindowStageRowSignature,
maxRowsMaterialized,
Collections.emptyList()
getOperatorFactoryListForStageDefinition(operatorList.get(i), maxRowsMaterialized),
stageRowSignature
))
);
} else {
// There are multiple windows present in the query.
// Create stages for each window in the query.
// These stages will be serialized.
// The partition by clause of the next window will be the shuffle key for the previous window.
RowSignature.Builder bob = RowSignature.builder();
RowSignature signatureFromInput = dataSourcePlan.getSubQueryDefBuilder().get().build().getFinalStageDefinition().getSignature();
log.info("Row signature received from last stage is [%s].", signatureFromInput);
for (int i = 0; i < signatureFromInput.getColumnNames().size(); i++) {
bob.add(signatureFromInput.getColumnName(i), signatureFromInput.getColumnType(i).get());
}
List<String> partitionColumnNames = new ArrayList<>();
/*
operatorList is a List<List<OperatorFactory>>, where each List<OperatorFactory> corresponds to the operator factories
to be used for a different window stage.
We iterate over operatorList, and add the definition for a window stage to QueryDefinitionBuilder.
*/
for (int i = 0; i < operatorList.size(); i++) {
for (OperatorFactory operatorFactory : operatorList.get(i)) {
if (operatorFactory instanceof WindowOperatorFactory) {
List<String> outputColumnNames = ((WindowOperatorFactory) operatorFactory).getProcessor().getOutputColumnNames();
// Need to add column names which are present in outputColumnNames and rowSignature but not in bob,
// since they need to be present in the row signature for this window stage.
for (String columnName : outputColumnNames) {
int indexInRowSignature = rowSignature.indexOf(columnName);
if (indexInRowSignature != -1 && bob.build().indexOf(columnName) == -1) {
ColumnType columnType = rowSignature.getColumnType(indexInRowSignature).get();
bob.add(columnName, columnType);
log.info("Added column [%s] of type [%s] to row signature for window stage.", columnName, columnType);
} else {
throw new ISE(
"Found unexpected column [%s] already present in row signature [%s].",
columnName,
rowSignature
);
}
}
}
}
final RowSignature intermediateSignature = bob.build();
final RowSignature stageRowSignature;
if (i + 1 == operatorList.size()) {
stageRowSignature = finalWindowStageRowSignature;
nextShuffleSpec = finalWindowStageShuffleSpec;
} else {
nextShuffleSpec =
findShuffleSpecForNextWindow(operatorList.get(i + 1), queryKitSpec.getNumPartitionsForShuffle());
if (nextShuffleSpec == null) {
stageRowSignature = intermediateSignature;
} else {
stageRowSignature = QueryKitUtils.sortableSignature(
intermediateSignature,
nextShuffleSpec.clusterBy().getColumns()
);
}
}
log.info("Using row signature [%s] for window stage.", stageRowSignature);
boolean partitionOperatorExists = false;
List<String> currentPartitionColumns = new ArrayList<>();
for (OperatorFactory of : operatorList.get(i)) {
if (of instanceof NaivePartitioningOperatorFactory) {
for (String s : ((NaivePartitioningOperatorFactory) of).getPartitionColumns()) {
currentPartitionColumns.add(s);
partitionOperatorExists = true;
}
}
}
if (partitionOperatorExists) {
partitionColumnNames = currentPartitionColumns;
}
log.info(
"Columns which would be used to define partitioning boundaries for this window stage are [%s]",
partitionColumnNames
);
queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + i)
.inputs(new StageInputSpec(firstStageNumber + i - 1))
.signature(stageRowSignature)
.maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount())
.shuffleSpec(nextShuffleSpec)
.processorFactory(new WindowOperatorQueryFrameProcessorFactory(
queryToRun,
operatorList.get(i),
stageRowSignature,
maxRowsMaterialized,
partitionColumnNames
))
);
}
}
return queryDefBuilder.build();
}
@ -287,13 +226,13 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
private ShuffleSpec findShuffleSpecForNextWindow(List<OperatorFactory> operatorFactories, int partitionCount)
{
NaivePartitioningOperatorFactory partition = null;
NaiveSortOperatorFactory sort = null;
AbstractPartitioningOperatorFactory partition = null;
AbstractSortOperatorFactory sort = null;
for (OperatorFactory of : operatorFactories) {
if (of instanceof NaivePartitioningOperatorFactory) {
partition = (NaivePartitioningOperatorFactory) of;
} else if (of instanceof NaiveSortOperatorFactory) {
sort = (NaiveSortOperatorFactory) of;
if (of instanceof AbstractPartitioningOperatorFactory) {
partition = (AbstractPartitioningOperatorFactory) of;
} else if (of instanceof AbstractSortOperatorFactory) {
sort = (AbstractSortOperatorFactory) of;
}
}
@ -377,4 +316,37 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
finalWindowClusterBy.getColumns()
);
}
/**
* This method converts the operator chain received from native plan into MSQ plan.
* (NaiveSortOperator -> Naive/GlueingPartitioningOperator -> WindowOperator) is converted into (GlueingPartitioningOperator -> PartitionSortOperator -> WindowOperator).
* We rely on MSQ's shuffling to do the clustering on partitioning keys for us at every stage.
* This conversion allows us to blindly read N rows from input channel and push them into the operator chain, and repeat until the input channel isn't finished.
* @param operatorFactoryListFromQuery
* @param maxRowsMaterializedInWindow
* @return
*/
private List<OperatorFactory> getOperatorFactoryListForStageDefinition(List<OperatorFactory> operatorFactoryListFromQuery, int maxRowsMaterializedInWindow)
{
final List<OperatorFactory> operatorFactoryList = new ArrayList<>();
final List<OperatorFactory> sortOperatorFactoryList = new ArrayList<>();
for (OperatorFactory operatorFactory : operatorFactoryListFromQuery) {
if (operatorFactory instanceof AbstractPartitioningOperatorFactory) {
AbstractPartitioningOperatorFactory partition = (AbstractPartitioningOperatorFactory) operatorFactory;
operatorFactoryList.add(new GlueingPartitioningOperatorFactory(partition.getPartitionColumns(), maxRowsMaterializedInWindow));
} else if (operatorFactory instanceof AbstractSortOperatorFactory) {
AbstractSortOperatorFactory sortOperatorFactory = (AbstractSortOperatorFactory) operatorFactory;
sortOperatorFactoryList.add(new PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns()));
} else {
// Add all the PartitionSortOperator(s) before every window operator.
operatorFactoryList.addAll(sortOperatorFactoryList);
sortOperatorFactoryList.clear();
operatorFactoryList.add(operatorFactory);
}
}
operatorFactoryList.addAll(sortOperatorFactoryList);
sortOperatorFactoryList.clear();
return operatorFactoryList;
}
}

View File

@ -209,6 +209,14 @@ public class MultiStageQueryContext
);
}
public static int getMaxRowsMaterializedInWindow(final QueryContext queryContext)
{
return queryContext.getInt(
MAX_ROWS_MATERIALIZED_IN_WINDOW,
Limits.MAX_ROWS_MATERIALIZED_IN_WINDOW
);
}
public static int getMaxConcurrentStagesWithDefault(
final QueryContext queryContext,
final int defaultMaxConcurrentStages

View File

@ -2219,65 +2219,107 @@ public class MSQWindowTest extends MSQTestBase
2, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(13).bytes(1379).frames(1),
CounterSnapshotMatcher.with().rows(13).bytes(989).frames(1),
2, 0, "output"
)
// Stage 3, Worker 0
// Stage 3, Worker 1 (window stage)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(3).bytes(318).frames(1),
3, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(3).bytes(330).frames(1),
3, 0, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(3).bytes(318).frames(1),
3, 0, "shuffle"
)
// Stage 3, Worker 1
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(0, 3).bytes(0, 333).frames(0, 1),
CounterSnapshotMatcher.with().rows(0, 6).bytes(0, 461).frames(0, 1),
3, 1, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(3).bytes(345).frames(1),
CounterSnapshotMatcher.with().rows(0, 6).bytes(0, 641).frames(0, 1),
3, 1, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(3).bytes(333).frames(1),
CounterSnapshotMatcher.with().rows(1, 1, 2, 2).bytes(122, 132, 230, 235).frames(1, 1, 1, 1),
3, 1, "shuffle"
)
// Stage 3, Worker 2
// Stage 3, Worker 2 (window stage)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(0, 0, 3).bytes(0, 0, 352).frames(0, 0, 1),
CounterSnapshotMatcher.with().rows(0, 0, 1).bytes(0, 0, 114).frames(0, 0, 1),
3, 2, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(3).bytes(364).frames(1),
CounterSnapshotMatcher.with().rows(0, 0, 1).bytes(0, 0, 144).frames(0, 0, 1),
3, 2, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(3).bytes(352).frames(1),
CounterSnapshotMatcher.with().rows(1).bytes(140).frames(1),
3, 2, "shuffle"
)
// Stage 3, Worker 3
// Stage 3, Worker 3 (window stage)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(0, 0, 0, 4).bytes(0, 0, 0, 426).frames(0, 0, 0, 1),
CounterSnapshotMatcher.with().rows(0, 0, 0, 6).bytes(0, 0, 0, 482).frames(0, 0, 0, 1),
3, 3, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(4).bytes(442).frames(1),
CounterSnapshotMatcher.with().rows(0, 0, 0, 6).bytes(0, 0, 0, 662).frames(0, 0, 0, 1),
3, 3, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(4).bytes(426).frames(1),
CounterSnapshotMatcher.with().rows(1, 1, 2, 2).bytes(143, 137, 222, 238).frames(1, 1, 1, 1),
3, 3, "shuffle"
)
// Stage 4, Worker 0
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(3).bytes(337).frames(1),
4, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(3).bytes(349).frames(1),
4, 0, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(3).bytes(337).frames(1),
4, 0, "shuffle"
)
// Stage 4, Worker 1
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(0, 2).bytes(0, 235).frames(0, 1),
4, 1, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(2).bytes(243).frames(1),
4, 1, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(2).bytes(235).frames(1),
4, 1, "shuffle"
)
// Stage 4, Worker 2
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(0, 0, 4).bytes(0, 0, 418).frames(0, 0, 1),
4, 2, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(4).bytes(434).frames(1),
4, 2, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(4).bytes(418).frames(1),
4, 2, "shuffle"
)
// Stage 4, Worker 3
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(0, 0, 0, 4).bytes(0, 0, 0, 439).frames(0, 0, 0, 1),
4, 3, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(4).bytes(455).frames(1),
4, 3, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher.with().rows(4).bytes(439).frames(1),
4, 3, "shuffle"
)
.verifyResults();
}
@ -2331,7 +2373,7 @@ public class MSQWindowTest extends MSQTestBase
.setExpectedExecutionErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(ISE.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
"Encountered a multi value column [v0]. Window processing does not support MVDs. Consider using UNNEST or MV_TO_ARRAY."))
"Encountered a multi value column. Window processing does not support MVDs. Consider using UNNEST or MV_TO_ARRAY."))
))
.verifyExecutionError();
}
@ -2350,7 +2392,7 @@ public class MSQWindowTest extends MSQTestBase
.setExpectedExecutionErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(ISE.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
"Encountered a multi value column [v0]. Window processing does not support MVDs. Consider using UNNEST or MV_TO_ARRAY."))
"Encountered a multi value column. Window processing does not support MVDs. Consider using UNNEST or MV_TO_ARRAY."))
))
.verifyExecutionError();
}

View File

@ -28,7 +28,7 @@ public class WindowOperatorQueryFrameProcessorFactoryTest
public void testEqualsAndHashcode()
{
EqualsVerifier.forClass(WindowOperatorQueryFrameProcessorFactory.class)
.withNonnullFields("query", "operatorList", "stageRowSignature", "maxRowsMaterializedInWindow", "partitionColumnNames")
.withNonnullFields("query", "operatorList", "stageRowSignature")
.usingGetClass()
.verify();
}

View File

@ -22,8 +22,7 @@ package org.apache.druid.msq.querykit;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.frame.allocation.HeapMemoryAllocator;
import org.apache.druid.frame.allocation.SingleMemoryAllocatorFactory;
import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory;
import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.frame.read.FrameReader;
@ -39,6 +38,7 @@ import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.test.LimitedFrameWriterFactory;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
@ -115,7 +115,7 @@ public class WindowOperatorQueryFrameProcessorTest extends FrameProcessorTestBas
final FrameWriterFactory frameWriterFactory = new LimitedFrameWriterFactory(
FrameWriters.makeRowBasedFrameWriterFactory(
new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
new ArenaMemoryAllocatorFactory(1 << 20),
outputSignature,
Collections.emptyList(),
false
@ -133,10 +133,7 @@ public class WindowOperatorQueryFrameProcessorTest extends FrameProcessorTestBas
new ObjectMapper(),
ImmutableList.of(
new WindowOperatorFactory(new WindowRowNumberProcessor("w0"))
),
inputSignature,
100,
ImmutableList.of("added")
)
);
exec.runFully(processor, null);
@ -160,47 +157,26 @@ public class WindowOperatorQueryFrameProcessorTest extends FrameProcessorTestBas
}
@Test
public void testBatchingOfPartitionByKeys_singleBatch() throws Exception
public void testProcessorRun() throws Exception
{
// With maxRowsMaterialized=100, we will get 1 frame:
// [1, 1, 2, 2, 2, 3, 3]
validateBatching(100, 1);
runProcessor(100, 1);
}
@Test
public void testBatchingOfPartitionByKeys_multipleBatches_1() throws Exception
{
// With maxRowsMaterialized=5, we will get 2 frames:
// [1, 1, 2, 2, 2]
// [3, 3]
validateBatching(5, 2);
}
@Test
public void testBatchingOfPartitionByKeys_multipleBatches_2() throws Exception
{
// With maxRowsMaterialized=4, we will get 3 frames:
// [1, 1]
// [2, 2, 2]
// [3, 3]
validateBatching(4, 3);
}
@Test
public void testBatchingOfPartitionByKeys_TooManyRowsInAWindowFault()
public void testMaxRowsMaterializedConstraint()
{
final RuntimeException e = Assert.assertThrows(
RuntimeException.class,
() -> validateBatching(2, 3)
() -> runProcessor(2, 3)
);
MatcherAssert.assertThat(
((MSQException) e.getCause().getCause()).getFault(),
CoreMatchers.instanceOf(TooManyRowsInAWindowFault.class)
);
Assert.assertTrue(e.getMessage().contains("TooManyRowsInAWindow: Too many rows in a window (requested = 3, max = 2)"));
Assert.assertTrue(e.getMessage().contains("TooManyRowsInAWindow: Too many rows in a window (requested = 7, max = 2)"));
}
public void validateBatching(int maxRowsMaterialized, int numFramesWritten) throws Exception
public void runProcessor(int maxRowsMaterialized, int expectedNumFramesWritten) throws Exception
{
final ReadableInput factChannel = buildWindowTestInputChannel();
@ -234,7 +210,9 @@ public class WindowOperatorQueryFrameProcessorTest extends FrameProcessorTestBas
.context(new HashMap<>())
.build()),
new LegacySegmentSpec(Intervals.ETERNITY),
new HashMap<>(),
new HashMap<>(
ImmutableMap.of(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW, maxRowsMaterialized)
),
outputSignature,
ImmutableList.of(
new WindowOperatorFactory(new WindowRowNumberProcessor("w0"))
@ -245,7 +223,7 @@ public class WindowOperatorQueryFrameProcessorTest extends FrameProcessorTestBas
// Limit output frames to 1 row to ensure we test edge cases
final FrameWriterFactory frameWriterFactory = new LimitedFrameWriterFactory(
FrameWriters.makeRowBasedFrameWriterFactory(
new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
new ArenaMemoryAllocatorFactory(1 << 20),
outputSignature,
Collections.emptyList(),
false
@ -262,10 +240,7 @@ public class WindowOperatorQueryFrameProcessorTest extends FrameProcessorTestBas
new ObjectMapper(),
ImmutableList.of(
new WindowOperatorFactory(new WindowRowNumberProcessor("w0"))
),
inputSignature,
maxRowsMaterialized,
ImmutableList.of("added")
)
);
exec.runFully(processor, null);
@ -278,7 +253,7 @@ public class WindowOperatorQueryFrameProcessorTest extends FrameProcessorTestBas
final List<List<Object>> rows = rowsFromProcessor.toList();
long actualNumFrames = Arrays.stream(channelCounters.snapshot().getFrames()).findFirst().getAsLong();
Assert.assertEquals(numFramesWritten, actualNumFrames);
Assert.assertEquals(expectedNumFramesWritten, actualNumFrames);
Assert.assertEquals(7, rows.size());
}

View File

@ -0,0 +1,226 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.operator;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
public abstract class AbstractPartitioningOperator implements Operator
{
protected final List<String> partitionColumns;
protected final Operator child;
public AbstractPartitioningOperator(
List<String> partitionColumns,
Operator child
)
{
this.partitionColumns = partitionColumns;
this.child = child;
}
@Override
public Closeable goOrContinue(Closeable continuation, Receiver receiver)
{
if (continuation != null) {
Continuation cont = (Continuation) continuation;
if (cont.iter != null) {
HandleContinuationResult handleContinuationResult = handleContinuation(receiver, cont);
if (!handleContinuationResult.needToContinueProcessing()) {
return handleContinuationResult.getContinuation();
}
if (cont.subContinuation == null) {
receiver.completed();
return null;
}
}
continuation = cont.subContinuation;
}
AtomicReference<Iterator<RowsAndColumns>> iterHolder = new AtomicReference<>();
final Closeable retVal = child.goOrContinue(
continuation,
createReceiver(receiver, iterHolder)
);
if (iterHolder.get() != null || retVal != null) {
return new Continuation(
iterHolder.get(),
retVal
);
} else {
return null;
}
}
protected abstract static class AbstractReceiver implements Receiver
{
protected final Receiver delegate;
protected final AtomicReference<Iterator<RowsAndColumns>> iterHolder;
protected final List<String> partitionColumns;
public AbstractReceiver(
Receiver delegate,
AtomicReference<Iterator<RowsAndColumns>> iterHolder,
List<String> partitionColumns
)
{
this.delegate = delegate;
this.iterHolder = iterHolder;
this.partitionColumns = partitionColumns;
}
@Override
public Signal push(RowsAndColumns rac)
{
if (rac == null) {
throw DruidException.defensive("Should never get a null rac here.");
}
Iterator<RowsAndColumns> partitionsIter = getIteratorForRAC(rac);
Signal keepItGoing = Signal.GO;
while (keepItGoing == Signal.GO && partitionsIter.hasNext()) {
final RowsAndColumns rowsAndColumns = partitionsIter.next();
keepItGoing = pushPartition(rowsAndColumns, !partitionsIter.hasNext(), keepItGoing);
}
if (keepItGoing == Signal.PAUSE && partitionsIter.hasNext()) {
iterHolder.set(partitionsIter);
return Signal.PAUSE;
}
return keepItGoing;
}
@Override
public void completed()
{
if (iterHolder.get() == null) {
delegate.completed();
}
}
protected Signal pushPartition(RowsAndColumns partition, boolean isLastPartition, Signal previousSignal)
{
return delegate.push(partition);
}
protected abstract Iterator<RowsAndColumns> getIteratorForRAC(RowsAndColumns rac);
}
protected abstract HandleContinuationResult handleContinuation(Receiver receiver, Continuation cont);
protected abstract Receiver createReceiver(Receiver delegate, AtomicReference<Iterator<RowsAndColumns>> iterHolder);
protected HandleContinuationResult handleNonGoCases(Signal signal, Iterator<RowsAndColumns> iter, Receiver receiver, Continuation cont)
{
switch (signal) {
case PAUSE:
if (iter.hasNext()) {
return HandleContinuationResult.of(cont);
}
if (cont.subContinuation == null) {
// We were finished anyway
receiver.completed();
return HandleContinuationResult.of(null);
}
return HandleContinuationResult.of(new Continuation(null, cont.subContinuation));
case STOP:
receiver.completed();
try {
cont.close();
}
catch (IOException e) {
throw new RE(e, "Unable to close continuation");
}
return HandleContinuationResult.of(null);
default:
throw new RE("Unknown signal[%s]", signal);
}
}
protected static class Continuation implements Closeable
{
Iterator<RowsAndColumns> iter;
Closeable subContinuation;
public Continuation(Iterator<RowsAndColumns> iter, Closeable subContinuation)
{
this.iter = iter;
this.subContinuation = subContinuation;
}
@Override
public void close() throws IOException
{
if (subContinuation != null) {
subContinuation.close();
}
}
}
/**
* This helper class helps us distinguish whether we need to continue processing or not.
*/
protected static class HandleContinuationResult
{
private final Closeable continuation;
private final boolean continueProcessing;
protected static final HandleContinuationResult CONTINUE_PROCESSING = new HandleContinuationResult(null, true);
private HandleContinuationResult(Closeable continuation, boolean continueProcessing)
{
this.continuation = continuation;
this.continueProcessing = continueProcessing;
}
protected static HandleContinuationResult of(Closeable closeable)
{
return new HandleContinuationResult(closeable, false);
}
private boolean needToContinueProcessing()
{
return continueProcessing;
}
private Closeable getContinuation()
{
return continuation;
}
}
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.operator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
public abstract class AbstractPartitioningOperatorFactory implements OperatorFactory
{
protected final List<String> partitionColumns;
@JsonCreator
public AbstractPartitioningOperatorFactory(
@JsonProperty("partitionColumns") List<String> partitionColumns
)
{
this.partitionColumns = partitionColumns == null ? new ArrayList<>() : partitionColumns;
}
@JsonProperty("partitionColumns")
public List<String> getPartitionColumns()
{
return partitionColumns;
}
@Override
public abstract Operator wrap(Operator op);
@Override
public boolean validateEquivalent(OperatorFactory other)
{
if (other instanceof AbstractPartitioningOperatorFactory) {
return partitionColumns.equals(((AbstractPartitioningOperatorFactory) other).getPartitionColumns());
}
return false;
}
@Override
public int hashCode()
{
return Objects.hash(partitionColumns);
}
@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
AbstractPartitioningOperatorFactory other = (AbstractPartitioningOperatorFactory) obj;
return Objects.equals(partitionColumns, other.partitionColumns);
}
@Override
public String toString()
{
return getClass().getSimpleName() + "{partitionColumns=" + partitionColumns + "}";
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.operator;
import java.io.Closeable;
import java.util.List;
/**
* Base class for sort operators.
*/
public abstract class AbstractSortOperator implements Operator
{
protected final Operator child;
protected final List<ColumnWithDirection> sortColumns;
public AbstractSortOperator(
Operator child,
List<ColumnWithDirection> sortColumns
)
{
this.child = child;
this.sortColumns = sortColumns;
}
@Override
public abstract Closeable goOrContinue(Closeable continuation, Receiver receiver);
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.operator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Objects;
public abstract class AbstractSortOperatorFactory implements OperatorFactory
{
protected final List<ColumnWithDirection> sortColumns;
@JsonCreator
public AbstractSortOperatorFactory(
@JsonProperty("columns") List<ColumnWithDirection> sortColumns
)
{
this.sortColumns = sortColumns;
}
@JsonProperty("columns")
public List<ColumnWithDirection> getSortColumns()
{
return sortColumns;
}
@Override
public abstract Operator wrap(Operator op);
@Override
public boolean validateEquivalent(OperatorFactory other)
{
if (other instanceof AbstractSortOperatorFactory) {
return sortColumns.equals(((AbstractSortOperatorFactory) other).getSortColumns());
}
return false;
}
@Override
public int hashCode()
{
return Objects.hash(sortColumns);
}
@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
AbstractSortOperatorFactory other = (AbstractSortOperatorFactory) obj;
return Objects.equals(sortColumns, other.sortColumns);
}
@Override
public String toString()
{
return getClass().getSimpleName() + "{sortColumns=" + sortColumns + "}";
}
}

View File

@ -0,0 +1,277 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.operator;
import com.google.common.base.Preconditions;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
import org.apache.druid.query.rowsandcols.LimitedRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.semantic.ClusteredGroupPartitioner;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicReference;
/**
* This glueing partitioning operator is supposed to continuously receive data, and output batches of partitioned RACs.
* It maintains a last-partitioning-boundary of the last-pushed-RAC, and attempts to glue it with the next RAC it receives,
* ensuring that partitions are handled correctly, even across multiple RACs.
* <p>
* Additionally, this assumes that data has been pre-sorted according to the partitioning columns.
*/
public class GlueingPartitioningOperator extends AbstractPartitioningOperator
{
private final int maxRowsMaterialized;
private final AtomicReference<RowsAndColumns> previousRacRef = new AtomicReference<>(null);
private static final Integer MAX_ROWS_MATERIALIZED_NO_LIMIT = Integer.MAX_VALUE;
public GlueingPartitioningOperator(
Operator child,
List<String> partitionColumns
)
{
this(child, partitionColumns, MAX_ROWS_MATERIALIZED_NO_LIMIT);
}
public GlueingPartitioningOperator(
Operator child,
List<String> partitionColumns,
Integer maxRowsMaterialized
)
{
super(partitionColumns, child);
Preconditions.checkNotNull(maxRowsMaterialized, "maxRowsMaterialized cannot be null");
this.maxRowsMaterialized = maxRowsMaterialized;
}
@Override
protected HandleContinuationResult handleContinuation(Receiver receiver, Continuation cont)
{
while (cont.iter.hasNext()) {
RowsAndColumns next = cont.iter.next();
if (!cont.iter.hasNext()) {
// We are at the last RAC. Process it only if subContinuation is null, otherwise save it in previousRac.
if (cont.subContinuation == null) {
receiver.push(next);
receiver.completed();
return HandleContinuationResult.of(null);
} else {
previousRacRef.set(next);
break;
}
}
final Signal signal = receiver.push(next);
if (signal != Signal.GO) {
return handleNonGoCases(signal, cont.iter, receiver, cont);
}
}
return HandleContinuationResult.CONTINUE_PROCESSING;
}
private static class GlueingReceiver extends AbstractReceiver
{
private final AtomicReference<RowsAndColumns> previousRacRef;
private final int maxRowsMaterialized;
public GlueingReceiver(
Receiver delegate,
AtomicReference<Iterator<RowsAndColumns>> iterHolder,
AtomicReference<RowsAndColumns> previousRacRef,
List<String> partitionColumns,
int maxRowsMaterialized
)
{
super(delegate, iterHolder, partitionColumns);
this.previousRacRef = previousRacRef;
this.maxRowsMaterialized = maxRowsMaterialized;
}
@Override
public Signal push(RowsAndColumns rac)
{
if (rac == null) {
throw DruidException.defensive("Should never get a null rac here.");
}
ensureMaxRowsMaterializedConstraint(rac.numRows(), maxRowsMaterialized);
return super.push(rac);
}
@Override
public void completed()
{
if (previousRacRef.get() != null) {
delegate.push(previousRacRef.get());
previousRacRef.set(null);
}
super.completed();
}
@Override
protected Signal pushPartition(RowsAndColumns partition, boolean isLastPartition, Signal previousSignal)
{
if (isLastPartition) {
// If it's the last partition, save it in previousRac instead of pushing to receiver.
previousRacRef.set(partition);
return previousSignal;
} else {
return super.pushPartition(partition, isLastPartition, previousSignal);
}
}
@Override
protected Iterator<RowsAndColumns> getIteratorForRAC(RowsAndColumns rac)
{
return new GluedRACsIterator(rac, previousRacRef, partitionColumns, maxRowsMaterialized);
}
}
/**
* Iterator implementation for gluing partitioned RowsAndColumns.
* It handles the boundaries of partitions within a single RAC and potentially glues
* the first partition of the current RAC with the previous RAC if needed.
*/
private static class GluedRACsIterator implements Iterator<RowsAndColumns>
{
private final RowsAndColumns rac;
private final int[] boundaries;
private int currentIndex = 0;
private final AtomicReference<RowsAndColumns> previousRacRef;
private final int maxRowsMaterialized;
private final List<String> partitionColumns;
public GluedRACsIterator(RowsAndColumns rac, AtomicReference<RowsAndColumns> previousRacRef, List<String> partitionColumns, int maxRowsMaterialized)
{
this.rac = rac;
final ClusteredGroupPartitioner groupPartitioner = ClusteredGroupPartitioner.fromRAC(rac);
this.boundaries = groupPartitioner.computeBoundaries(partitionColumns);
this.previousRacRef = previousRacRef;
this.partitionColumns = partitionColumns;
this.maxRowsMaterialized = maxRowsMaterialized;
}
@Override
public boolean hasNext()
{
return currentIndex < boundaries.length - 1;
}
/**
* Retrieves the next partition in the RowsAndColumns. If the first partition has not been handled yet,
* it may be glued with the previous RowsAndColumns if the partition columns match.
*
* @return The next RowsAndColumns partition, potentially glued with the previous one.
* @throws NoSuchElementException if there are no more partitions.
*/
@Override
public RowsAndColumns next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
int start = boundaries[currentIndex];
int end = boundaries[currentIndex + 1];
if (previousRacRef.get() != null) {
if (currentIndex != 0) {
throw new ISE("previousRac should be non-null only while handling the first partition boundary.");
}
final RowsAndColumns previousRac = previousRacRef.get();
previousRacRef.set(null);
final LimitedRowsAndColumns limitedRAC = new LimitedRowsAndColumns(rac, start, end);
final ConcatRowsAndColumns concatRacForFirstPartition = getConcatRacForFirstPartition(previousRac, limitedRAC);
if (isGlueingNeeded(concatRacForFirstPartition)) {
ensureMaxRowsMaterializedConstraint(concatRacForFirstPartition.numRows(), maxRowsMaterialized);
currentIndex++;
return concatRacForFirstPartition;
} else {
return previousRac;
}
}
// If previousRac is null, just return the next partitioned RAC.
currentIndex++;
return new LimitedRowsAndColumns(rac, start, end);
}
/**
* Determines whether glueing is needed between 2 RACs represented as a ConcatRowsAndColumns, by comparing a row belonging to each RAC.
* We do this by comparing the first and last rows of the Concat RAC, as they would belong to the two respective RACs.
* If the columns match, we can glue the 2 RACs and use the ConcatRAC.
* @param rac A {@link ConcatRowsAndColumns containing 2 RACs}
* @return true if gluing is needed, false otherwise.
*/
private boolean isGlueingNeeded(ConcatRowsAndColumns rac)
{
for (String column : partitionColumns) {
final Column theCol = rac.findColumn(column);
if (theCol == null) {
throw new ISE("Partition column [%s] not found in RAC.", column);
}
final ColumnAccessor accessor = theCol.toAccessor();
int comparison = accessor.compareRows(0, rac.numRows() - 1);
if (comparison != 0) {
return false;
}
}
return true;
}
private ConcatRowsAndColumns getConcatRacForFirstPartition(RowsAndColumns previousRac, RowsAndColumns firstPartitionOfCurrentRac)
{
if (previousRac == null) {
return new ConcatRowsAndColumns(new ArrayList<>(Collections.singletonList(firstPartitionOfCurrentRac)));
}
return new ConcatRowsAndColumns(new ArrayList<>(Arrays.asList(previousRac, firstPartitionOfCurrentRac)));
}
}
private static void ensureMaxRowsMaterializedConstraint(int numRows, int maxRowsMaterialized)
{
if (numRows > maxRowsMaterialized) {
throw InvalidInput.exception(
"Too many rows to process (requested = %d, max = %d).",
numRows,
maxRowsMaterialized
);
}
}
@Override
protected Receiver createReceiver(Receiver delegate, AtomicReference<Iterator<RowsAndColumns>> iterHolder)
{
return new GlueingReceiver(delegate, iterHolder, previousRacRef, partitionColumns, maxRowsMaterialized);
}
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.operator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Objects;
public class GlueingPartitioningOperatorFactory extends AbstractPartitioningOperatorFactory
{
private final Integer maxRowsMaterialized;
@JsonCreator
public GlueingPartitioningOperatorFactory(
@JsonProperty("partitionColumns") List<String> partitionColumns,
@JsonProperty("maxRowsMaterialized") Integer maxRowsMaterialized
)
{
super(partitionColumns);
this.maxRowsMaterialized = maxRowsMaterialized;
}
@JsonProperty("maxRowsMaterialized")
public Integer getMaxRowsMaterialized()
{
return maxRowsMaterialized;
}
@Override
public Operator wrap(Operator op)
{
return new GlueingPartitioningOperator(op, partitionColumns, maxRowsMaterialized);
}
@Override
public boolean validateEquivalent(OperatorFactory other)
{
if (!super.validateEquivalent(other)) {
return false;
}
if (!(other instanceof GlueingPartitioningOperatorFactory)) {
return false;
}
return Objects.equals(maxRowsMaterialized, ((GlueingPartitioningOperatorFactory) other).getMaxRowsMaterialized());
}
@Override
public String toString()
{
return "GlueingPartitioningOperatorFactory{" +
"partitionColumns=" + partitionColumns +
"maxRowsMaterialized=" + maxRowsMaterialized +
'}';
}
@Override
public final int hashCode()
{
return Objects.hash(partitionColumns, maxRowsMaterialized);
}
@Override
public final boolean equals(Object obj)
{
return super.equals(obj) &&
Objects.equals(maxRowsMaterialized, ((GlueingPartitioningOperatorFactory) obj).getMaxRowsMaterialized());
}
}

View File

@ -19,14 +19,9 @@
package org.apache.druid.query.operator;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.ClusteredGroupPartitioner;
import org.apache.druid.query.rowsandcols.semantic.DefaultClusteredGroupPartitioner;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
@ -40,137 +35,50 @@ import java.util.concurrent.atomic.AtomicReference;
* Additionally, this assumes that data has been pre-sorted according to the partitioning columns. If it is
* given data that has not been pre-sorted, an exception is expected to be thrown.
*/
public class NaivePartitioningOperator implements Operator
public class NaivePartitioningOperator extends AbstractPartitioningOperator
{
private final List<String> partitionColumns;
private final Operator child;
public NaivePartitioningOperator(
List<String> partitionColumns,
Operator child
)
{
this.partitionColumns = partitionColumns;
this.child = child;
super(partitionColumns, child);
}
@Override
public Closeable goOrContinue(Closeable continuation, Receiver receiver)
protected HandleContinuationResult handleContinuation(Receiver receiver, Continuation cont)
{
if (continuation != null) {
Continuation cont = (Continuation) continuation;
if (cont.iter != null) {
while (cont.iter.hasNext()) {
final Signal signal = receiver.push(cont.iter.next());
switch (signal) {
case GO:
break;
case PAUSE:
if (cont.iter.hasNext()) {
return cont;
}
if (cont.subContinuation == null) {
// We were finished anyway
receiver.completed();
return null;
}
return new Continuation(null, cont.subContinuation);
case STOP:
receiver.completed();
try {
cont.close();
}
catch (IOException e) {
throw new RE(e, "Unable to close continutation");
}
return null;
default:
throw new RE("Unknown signal[%s]", signal);
}
}
if (cont.subContinuation == null) {
receiver.completed();
return null;
}
while (cont.iter.hasNext()) {
final Signal signal = receiver.push(cont.iter.next());
if (signal != Signal.GO) {
return handleNonGoCases(signal, cont.iter, receiver, cont);
}
continuation = cont.subContinuation;
}
AtomicReference<Iterator<RowsAndColumns>> iterHolder = new AtomicReference<>();
final Closeable retVal = child.goOrContinue(
continuation,
new Receiver()
{
@Override
public Signal push(RowsAndColumns rac)
{
if (rac == null) {
throw DruidException.defensive("Should never get a null rac here.");
}
ClusteredGroupPartitioner groupPartitioner = rac.as(ClusteredGroupPartitioner.class);
if (groupPartitioner == null) {
groupPartitioner = new DefaultClusteredGroupPartitioner(rac);
}
Iterator<RowsAndColumns> partitionsIter =
groupPartitioner.partitionOnBoundaries(partitionColumns).iterator();
Signal keepItGoing = Signal.GO;
while (keepItGoing == Signal.GO && partitionsIter.hasNext()) {
keepItGoing = receiver.push(partitionsIter.next());
}
if (keepItGoing == Signal.PAUSE && partitionsIter.hasNext()) {
iterHolder.set(partitionsIter);
return Signal.PAUSE;
}
return keepItGoing;
}
@Override
public void completed()
{
if (iterHolder.get() == null) {
receiver.completed();
}
}
}
);
if (iterHolder.get() != null || retVal != null) {
return new Continuation(
iterHolder.get(),
retVal
);
} else {
return null;
}
return HandleContinuationResult.CONTINUE_PROCESSING;
}
private static class Continuation implements Closeable
private static class NaiveReceiver extends AbstractReceiver
{
Iterator<RowsAndColumns> iter;
Closeable subContinuation;
public Continuation(Iterator<RowsAndColumns> iter, Closeable subContinuation)
public NaiveReceiver(
Receiver delegate,
AtomicReference<Iterator<RowsAndColumns>> iterHolder,
List<String> partitionColumns
)
{
this.iter = iter;
this.subContinuation = subContinuation;
super(delegate, iterHolder, partitionColumns);
}
@Override
public void close() throws IOException
protected Iterator<RowsAndColumns> getIteratorForRAC(RowsAndColumns rac)
{
if (subContinuation != null) {
subContinuation.close();
}
final ClusteredGroupPartitioner groupPartitioner = ClusteredGroupPartitioner.fromRAC(rac);
return groupPartitioner.partitionOnBoundaries(partitionColumns).iterator();
}
}
@Override
protected Receiver createReceiver(Receiver delegate, AtomicReference<Iterator<RowsAndColumns>> iterHolder)
{
return new NaiveReceiver(delegate, iterHolder, partitionColumns);
}
}

View File

@ -22,26 +22,16 @@ package org.apache.druid.query.operator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
public class NaivePartitioningOperatorFactory implements OperatorFactory
public class NaivePartitioningOperatorFactory extends AbstractPartitioningOperatorFactory
{
private final List<String> partitionColumns;
@JsonCreator
public NaivePartitioningOperatorFactory(
@JsonProperty("partitionColumns") List<String> partitionColumns
)
{
this.partitionColumns = partitionColumns == null ? new ArrayList<>() : partitionColumns;
}
@JsonProperty("partitionColumns")
public List<String> getPartitionColumns()
{
return partitionColumns;
super(partitionColumns);
}
@Override
@ -49,40 +39,4 @@ public class NaivePartitioningOperatorFactory implements OperatorFactory
{
return new NaivePartitioningOperator(partitionColumns, op);
}
@Override
public boolean validateEquivalent(OperatorFactory other)
{
if (other instanceof NaivePartitioningOperatorFactory) {
return partitionColumns.equals(((NaivePartitioningOperatorFactory) other).getPartitionColumns());
}
return false;
}
@Override
public String toString()
{
return "NaivePartitioningOperatorFactory{" +
"partitionColumns=" + partitionColumns +
'}';
}
@Override
public final int hashCode()
{
return Objects.hash(partitionColumns);
}
@Override
public final boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if (obj == null || obj.getClass() != getClass()) {
return false;
}
NaivePartitioningOperatorFactory other = (NaivePartitioningOperatorFactory) obj;
return Objects.equals(partitionColumns, other.partitionColumns);
}
}

View File

@ -31,18 +31,14 @@ import java.util.List;
* that it has to accumulate all of the data of its child operator first before it can sort. This limitation
* means that hopefully this operator is only planned in a very small number of circumstances.
*/
public class NaiveSortOperator implements Operator
public class NaiveSortOperator extends AbstractSortOperator
{
private final Operator child;
private final List<ColumnWithDirection> sortColumns;
public NaiveSortOperator(
Operator child,
List<ColumnWithDirection> sortColumns
)
{
this.child = child;
this.sortColumns = sortColumns;
super(child, sortColumns);
}
@Override

View File

@ -23,24 +23,15 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Objects;
public class NaiveSortOperatorFactory implements OperatorFactory
public class NaiveSortOperatorFactory extends AbstractSortOperatorFactory
{
private final List<ColumnWithDirection> sortColumns;
@JsonCreator
public NaiveSortOperatorFactory(
@JsonProperty("columns") List<ColumnWithDirection> sortColumns
)
{
this.sortColumns = sortColumns;
}
@JsonProperty("columns")
public List<ColumnWithDirection> getSortColumns()
{
return sortColumns;
super(sortColumns);
}
@Override
@ -48,38 +39,4 @@ public class NaiveSortOperatorFactory implements OperatorFactory
{
return new NaiveSortOperator(op, sortColumns);
}
@Override
public boolean validateEquivalent(OperatorFactory other)
{
if (other instanceof NaiveSortOperatorFactory) {
return sortColumns.equals(((NaiveSortOperatorFactory) other).getSortColumns());
}
return false;
}
@Override
public int hashCode()
{
return Objects.hash(sortColumns);
}
@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
NaiveSortOperatorFactory other = (NaiveSortOperatorFactory) obj;
return Objects.equals(sortColumns, other.sortColumns);
}
@Override
public String toString()
{
return "NaiveSortOperatorFactory{sortColumns=" + sortColumns + "}";
}
}

View File

@ -68,7 +68,7 @@ public interface Operator
* to indicate its degree of readiness for more data to be received.
* <p>
* If a Receiver returns a {@link Signal#PAUSE} signal, then if there is processing left to do, then it is expected
* that a non-null "continuation" object nwill be returned. This allows for flow control to be returned to the
* that a non-null "continuation" object will be returned. This allows for flow control to be returned to the
* caller to, e.g., process another Operator or just exert backpressure. In this case, when the controller wants to
* resume, it must call this method again and include the continuation object that it received.
* <p>
@ -99,7 +99,7 @@ public interface Operator
* if there is any state that an Operator requires to be able to resume its processing, then it is expected that the
* Operator will cast the object back to an instance of the type that it had originally returned.
*
* @param receiver a receiver that will receiver data
* @param receiver a receiver that will receive data
* @return null if processing is complete, non-null if the Receiver returned a {@link Signal#PAUSE} signal
*/
@Nullable

View File

@ -31,7 +31,9 @@ import org.apache.druid.query.operator.window.WindowOperatorFactory;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "naivePartition", value = NaivePartitioningOperatorFactory.class),
@JsonSubTypes.Type(name = "glueingPartition", value = GlueingPartitioningOperatorFactory.class),
@JsonSubTypes.Type(name = "naiveSort", value = NaiveSortOperatorFactory.class),
@JsonSubTypes.Type(name = "partitionSort", value = PartitionSortOperatorFactory.class),
@JsonSubTypes.Type(name = "window", value = WindowOperatorFactory.class),
@JsonSubTypes.Type(name = "scan", value = ScanOperatorFactory.class),
})

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.operator;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.NaiveSortMaker;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
/**
* This operator sorts rows inside partitioned RACs, on the sort columns.
* This operator expects to receive a "complete" partition of data. Each input RAC is expected to be a separate partition.
*/
public class PartitionSortOperator extends AbstractSortOperator
{
public PartitionSortOperator(
Operator child,
List<ColumnWithDirection> sortColumns
)
{
super(child, sortColumns);
}
@Override
public Closeable goOrContinue(Closeable continuation, Receiver receiver)
{
return child.goOrContinue(
continuation,
new Receiver()
{
@Override
public Signal push(RowsAndColumns rac)
{
NaiveSortMaker.NaiveSorter sorter = NaiveSortMaker.fromRAC(rac).make(new ArrayList<>(sortColumns));
receiver.push(sorter.complete());
return Signal.GO;
}
@Override
public void completed()
{
receiver.completed();
}
}
);
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.operator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
public class PartitionSortOperatorFactory extends AbstractSortOperatorFactory
{
@JsonCreator
public PartitionSortOperatorFactory(
@JsonProperty("columns") List<ColumnWithDirection> sortColumns
)
{
super(sortColumns);
}
@Override
public Operator wrap(Operator op)
{
return new PartitionSortOperator(op, sortColumns);
}
}

View File

@ -77,6 +77,7 @@ public class LazilyDecoratedRowsAndColumns implements RowsAndColumns
private OffsetLimit limit;
private LinkedHashSet<String> viewableColumns;
private List<ColumnWithDirection> ordering;
private final Integer allocatorCapacity;
public LazilyDecoratedRowsAndColumns(
RowsAndColumns base,
@ -85,7 +86,8 @@ public class LazilyDecoratedRowsAndColumns implements RowsAndColumns
VirtualColumns virtualColumns,
OffsetLimit limit,
List<ColumnWithDirection> ordering,
LinkedHashSet<String> viewableColumns
LinkedHashSet<String> viewableColumns,
Long allocatorCapacity
)
{
this.base = base;
@ -95,6 +97,7 @@ public class LazilyDecoratedRowsAndColumns implements RowsAndColumns
this.limit = limit;
this.ordering = ordering;
this.viewableColumns = viewableColumns;
this.allocatorCapacity = allocatorCapacity != null ? allocatorCapacity.intValue() : 200 << 20;
}
@Override
@ -268,7 +271,7 @@ public class LazilyDecoratedRowsAndColumns implements RowsAndColumns
}
final FrameWriterFactory frameWriterFactory = FrameWriters.makeColumnBasedFrameWriterFactory(
new ArenaMemoryAllocatorFactory(200 << 20), // 200 MB, because, why not?
new ArenaMemoryAllocatorFactory(allocatorCapacity),
signature,
sortColumns
);
@ -367,8 +370,7 @@ public class LazilyDecoratedRowsAndColumns implements RowsAndColumns
// is being left as an exercise for the future.
final RowSignature.Builder sigBob = RowSignature.builder();
final ArenaMemoryAllocatorFactory memFactory = new ArenaMemoryAllocatorFactory(200 << 20);
final ArenaMemoryAllocatorFactory memFactory = new ArenaMemoryAllocatorFactory(allocatorCapacity);
for (String column : columnsToGenerate) {
final Column racColumn = rac.findColumn(column);

View File

@ -20,7 +20,6 @@
package org.apache.druid.query.rowsandcols;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.column.ColumnAccessorBasedColumn;
@ -29,7 +28,6 @@ import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
@ -130,16 +128,7 @@ public class RearrangedRowsAndColumns implements RowsAndColumns
@Override
public Object getObject(int rowNum)
{
Object value = accessor.getObject(pointers[start + rowNum]);
if (ColumnType.STRING.equals(getType()) && value instanceof List) {
// special handling to reject MVDs
throw new UOE(
"Encountered a multi value column [%s]. Window processing does not support MVDs. "
+ "Consider using UNNEST or MV_TO_ARRAY.",
name
);
}
return value;
return accessor.getObject(pointers[start + rowNum]);
}
@Override

View File

@ -133,7 +133,8 @@ public class DefaultRowsAndColumnsDecorator implements RowsAndColumnsDecorator
virtualColumns,
offsetLimit,
ordering,
columns == null ? null : new LinkedHashSet<>(columns)
columns == null ? null : new LinkedHashSet<>(columns),
null
);
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.operator;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.junit.Test;
public class GlueingPartitioningOperatorFactoryTest
{
@Test
public void testEquals()
{
EqualsVerifier.forClass(GlueingPartitioningOperatorFactory.class)
.usingGetClass()
.verify();
}
}

View File

@ -0,0 +1,233 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.operator;
import com.google.common.collect.ImmutableList;
import org.apache.druid.error.DruidException;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
public class GlueingPartitioningOperatorTest
{
@Test
public void testPartitioning()
{
InlineScanOperator inlineScanOperator = InlineScanOperator.make(
RowsAndColumnsHelper.makeSingleColumnRac(1, 1, 1, 2, 2, 1)
);
GlueingPartitioningOperator op = new GlueingPartitioningOperator(
inlineScanOperator,
ImmutableList.of("column")
);
new OperatorTestHelper()
.expectRowsAndColumns(
RowsAndColumnsHelper.expectedSingleColumnRac(1, 1, 1),
RowsAndColumnsHelper.expectedSingleColumnRac(2, 2),
RowsAndColumnsHelper.expectedSingleColumnRac(1)
)
.runToCompletion(op);
}
@Test
public void testPartitioningWithMultipleRACs()
{
InlineScanOperator inlineScanOperator = InlineScanOperator.make(
RowsAndColumnsHelper.makeSingleColumnRac(1, 1, 1, 2, 2, 1),
RowsAndColumnsHelper.makeSingleColumnRac(1, 1, 1, 2, 2, 1),
RowsAndColumnsHelper.makeSingleColumnRac(1, 1, 2, 2, 1)
);
GlueingPartitioningOperator op = new GlueingPartitioningOperator(
inlineScanOperator,
ImmutableList.of("column")
);
new OperatorTestHelper()
.expectRowsAndColumns(
RowsAndColumnsHelper.expectedSingleColumnRac(1, 1, 1),
RowsAndColumnsHelper.expectedSingleColumnRac(2, 2),
RowsAndColumnsHelper.expectedSingleColumnRac(1, 1, 1, 1),
RowsAndColumnsHelper.expectedSingleColumnRac(2, 2),
RowsAndColumnsHelper.expectedSingleColumnRac(1, 1, 1),
RowsAndColumnsHelper.expectedSingleColumnRac(2, 2),
RowsAndColumnsHelper.expectedSingleColumnRac(1)
)
.runToCompletion(op);
}
@Test
public void testPartitioningWithMultipleConcatenationBetweenRACs()
{
InlineScanOperator inlineScanOperator = InlineScanOperator.make(
RowsAndColumnsHelper.makeSingleColumnRac(1, 1),
RowsAndColumnsHelper.makeSingleColumnRac(1, 1),
RowsAndColumnsHelper.makeSingleColumnRac(1, 2)
);
GlueingPartitioningOperator op = new GlueingPartitioningOperator(
inlineScanOperator,
ImmutableList.of("column")
);
new OperatorTestHelper()
.expectRowsAndColumns(
RowsAndColumnsHelper.expectedSingleColumnRac(1, 1, 1, 1, 1),
RowsAndColumnsHelper.expectedSingleColumnRac(2)
)
.runToCompletion(op);
}
@Test
public void testPartitioningWithNoGlueing()
{
InlineScanOperator inlineScanOperator = InlineScanOperator.make(
RowsAndColumnsHelper.makeSingleColumnRac(1, 2, 3),
RowsAndColumnsHelper.makeSingleColumnRac(4, 5, 6),
RowsAndColumnsHelper.makeSingleColumnRac(7, 8, 9)
);
GlueingPartitioningOperator op = new GlueingPartitioningOperator(
inlineScanOperator,
ImmutableList.of("column")
);
new OperatorTestHelper()
.expectRowsAndColumns(
RowsAndColumnsHelper.expectedSingleColumnRac(1),
RowsAndColumnsHelper.expectedSingleColumnRac(2),
RowsAndColumnsHelper.expectedSingleColumnRac(3),
RowsAndColumnsHelper.expectedSingleColumnRac(4),
RowsAndColumnsHelper.expectedSingleColumnRac(5),
RowsAndColumnsHelper.expectedSingleColumnRac(6),
RowsAndColumnsHelper.expectedSingleColumnRac(7),
RowsAndColumnsHelper.expectedSingleColumnRac(8),
RowsAndColumnsHelper.expectedSingleColumnRac(9)
)
.runToCompletion(op);
}
@Test
public void testPartitioningWithNoPartitionColumns()
{
InlineScanOperator inlineScanOperator = InlineScanOperator.make(
RowsAndColumnsHelper.makeSingleColumnRac(1, 1, 1, 2, 2, 1),
RowsAndColumnsHelper.makeSingleColumnRac(1, 1, 1, 2, 2, 1)
);
GlueingPartitioningOperator op = new GlueingPartitioningOperator(
inlineScanOperator,
Collections.emptyList()
);
new OperatorTestHelper()
.expectRowsAndColumns(
RowsAndColumnsHelper.expectedSingleColumnRac(1, 1, 1, 2, 2, 1, 1, 1, 1, 2, 2, 1)
)
.runToCompletion(op);
}
@Test
public void testMaxRowsConstraintViolation()
{
InlineScanOperator inlineScanOperator = InlineScanOperator.make(
RowsAndColumnsHelper.makeSingleColumnRac(1, 1, 1)
);
GlueingPartitioningOperator op = new GlueingPartitioningOperator(
inlineScanOperator,
ImmutableList.of("column"),
2
);
Assert.assertThrows(
"Too many rows to process (requested = 3, max = 2).",
DruidException.class,
() -> new OperatorTestHelper().expectRowsAndColumns().runToCompletion(op)
);
}
@Test
public void testMaxRowsConstraintViolationWhenGlueing()
{
InlineScanOperator inlineScanOperator = InlineScanOperator.make(
RowsAndColumnsHelper.makeSingleColumnRac(1, 1, 1),
RowsAndColumnsHelper.makeSingleColumnRac(1, 2, 3)
);
GlueingPartitioningOperator op = new GlueingPartitioningOperator(
inlineScanOperator,
ImmutableList.of("column"),
3
);
Assert.assertThrows(
"Too many rows to process (requested = 4, max = 3).",
DruidException.class,
() -> new OperatorTestHelper().expectRowsAndColumns().runToCompletion(op)
);
}
@Test
public void testMaxRowsConstraintWhenGlueing()
{
InlineScanOperator inlineScanOperator = InlineScanOperator.make(
RowsAndColumnsHelper.makeSingleColumnRac(1, 1, 1),
RowsAndColumnsHelper.makeSingleColumnRac(2, 2, 2)
);
GlueingPartitioningOperator op = new GlueingPartitioningOperator(
inlineScanOperator,
ImmutableList.of("column"),
3
);
new OperatorTestHelper()
.expectRowsAndColumns(
RowsAndColumnsHelper.expectedSingleColumnRac(1, 1, 1),
RowsAndColumnsHelper.expectedSingleColumnRac(2, 2, 2)
)
.runToCompletion(op);
}
@Test
public void testStopMidStream()
{
InlineScanOperator inlineScanOperator = InlineScanOperator.make(
RowsAndColumnsHelper.makeSingleColumnRac(1, 1, 1, 2, 2, 1)
);
GlueingPartitioningOperator op = new GlueingPartitioningOperator(
inlineScanOperator,
ImmutableList.of("column")
);
new OperatorTestHelper()
.expectAndStopAfter(
RowsAndColumnsHelper.expectedSingleColumnRac(1, 1, 1),
RowsAndColumnsHelper.expectedSingleColumnRac(2, 2)
)
.runToCompletion(op);
}
}

View File

@ -32,38 +32,46 @@ import java.util.function.BiFunction;
public class NaivePartitioningOperatorTest
{
@Test
public void testDefaultImplementation()
public void testPartitioning()
{
RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(
ImmutableMap.of(
"sorted", new IntArrayColumn(new int[]{0, 0, 0, 1, 1, 2, 4, 4, 4}),
"unsorted", new IntArrayColumn(new int[]{3, 54, 21, 1, 5, 54, 2, 3, 92})
)
InlineScanOperator inlineScanOperator = InlineScanOperator.make(
RowsAndColumnsHelper.makeSingleColumnRac(0, 0, 0, 1, 1, 2, 4, 4, 4)
);
NaivePartitioningOperator op = new NaivePartitioningOperator(
ImmutableList.of("sorted"),
InlineScanOperator.make(rac)
ImmutableList.of("column"),
inlineScanOperator
);
new OperatorTestHelper()
.expectRowsAndColumns(
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{0, 0, 0})
.expectColumn("unsorted", new int[]{3, 54, 21})
.allColumnsRegistered(),
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{1, 1})
.expectColumn("unsorted", new int[]{1, 5})
.allColumnsRegistered(),
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{2})
.expectColumn("unsorted", new int[]{54})
.allColumnsRegistered(),
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{4, 4, 4})
.expectColumn("unsorted", new int[]{2, 3, 92})
.allColumnsRegistered()
RowsAndColumnsHelper.expectedSingleColumnRac(0, 0, 0),
RowsAndColumnsHelper.expectedSingleColumnRac(1, 1),
RowsAndColumnsHelper.expectedSingleColumnRac(2),
RowsAndColumnsHelper.expectedSingleColumnRac(4, 4, 4)
)
.runToCompletion(op);
}
@Test
public void testPartitioningWithMultipleRACs()
{
InlineScanOperator inlineScanOperator = InlineScanOperator.make(
RowsAndColumnsHelper.makeSingleColumnRac(0, 0, 0, 1, 1),
RowsAndColumnsHelper.makeSingleColumnRac(1, 2, 2, 2)
);
NaivePartitioningOperator op = new NaivePartitioningOperator(
ImmutableList.of("column"),
inlineScanOperator
);
new OperatorTestHelper()
.expectRowsAndColumns(
RowsAndColumnsHelper.expectedSingleColumnRac(0, 0, 0),
RowsAndColumnsHelper.expectedSingleColumnRac(1, 1),
RowsAndColumnsHelper.expectedSingleColumnRac(1),
RowsAndColumnsHelper.expectedSingleColumnRac(2, 2, 2)
)
.runToCompletion(op);
}
@ -71,26 +79,19 @@ public class NaivePartitioningOperatorTest
@Test
public void testStopMidStream()
{
RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(
ImmutableMap.of(
"sorted", new IntArrayColumn(new int[]{0, 0, 0, 1, 1, 2, 4, 4, 4}),
"unsorted", new IntArrayColumn(new int[]{3, 54, 21, 1, 5, 54, 2, 3, 92})
)
InlineScanOperator inlineScanOperator = InlineScanOperator.make(
RowsAndColumnsHelper.makeSingleColumnRac(0, 0, 0, 1, 1, 2, 4, 4, 4)
);
NaivePartitioningOperator op = new NaivePartitioningOperator(
ImmutableList.of("sorted"),
InlineScanOperator.make(rac)
ImmutableList.of("column"),
inlineScanOperator
);
new OperatorTestHelper()
.expectAndStopAfter(
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{0, 0, 0})
.expectColumn("unsorted", new int[]{3, 54, 21}),
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{1, 1})
.expectColumn("unsorted", new int[]{1, 5})
RowsAndColumnsHelper.expectedSingleColumnRac(0, 0, 0),
RowsAndColumnsHelper.expectedSingleColumnRac(1, 1)
)
.runToCompletion(op);
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.operator;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.junit.Test;
public class PartitionSortOperatorFactoryTest
{
@Test
public void testEquals()
{
EqualsVerifier.forClass(NaiveSortOperatorFactory.class)
.usingGetClass()
.verify();
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.operator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.junit.Test;
public class PartitionSortOperatorTest
{
@Test
public void testDefaultImplementation()
{
RowsAndColumns rac1 = MapOfColumnsRowsAndColumns.fromMap(
ImmutableMap.of(
"sorted", new IntArrayColumn(new int[]{1, 1, 1, 2, 2, 1}),
"unsorted", new IntArrayColumn(new int[]{10, 10, 10, 20, 20, 11})
)
);
InlineScanOperator inlineScanOperator = InlineScanOperator.make(rac1);
PartitionSortOperator op = new PartitionSortOperator(
inlineScanOperator,
ImmutableList.of(new ColumnWithDirection("unsorted", ColumnWithDirection.Direction.ASC))
);
new OperatorTestHelper()
.expectRowsAndColumns(
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{1, 1, 1, 1, 2, 2})
.expectColumn("unsorted", new int[]{10, 10, 10, 11, 20, 20})
.allColumnsRegistered()
)
.runToCompletion(op);
}
}

View File

@ -19,13 +19,16 @@
package org.apache.druid.query.operator.window;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.segment.column.ColumnType;
import org.junit.Assert;
@ -129,6 +132,18 @@ public class RowsAndColumnsHelper
return this;
}
public static RowsAndColumns makeSingleColumnRac(int... values)
{
return MapOfColumnsRowsAndColumns.fromMap(
ImmutableMap.of("column", new IntArrayColumn(values))
);
}
public static RowsAndColumnsHelper expectedSingleColumnRac(int... values)
{
return new RowsAndColumnsHelper().expectColumn("column", values).allColumnsRegistered();
}
public ColumnHelper columnHelper(String column, int expectedSize, ColumnType expectedType)
{
if (this.expectedSize.get() == null) {

View File

@ -44,6 +44,7 @@ public class ColumnBasedFrameRowsAndColumnsTest extends RowsAndColumnsTestBase
null,
OffsetLimit.limit(Integer.MAX_VALUE),
null,
null,
null
);

View File

@ -89,6 +89,7 @@ public class EvaluateRowsAndColumnsTest extends SemanticTestBase
TestExprMacroTable.INSTANCE)),
OffsetLimit.NONE,
null,
null,
null);
// do the materialziation