MSQ WF: Batch multiple PARTITION BY keys for processing (#16823)

Currently, if we have a query with window function having PARTITION BY xyz, and we have a million unique values for xyz each having 1 row, we'd end up creating a million individual RACs for processing, each having a single row. This is unnecessary, and we can batch the PARTITION BY keys together for processing, and process them only when we can't batch further rows to adhere to maxRowsMaterialized config.

The previous iteration of this PR was simplifying WindowOperatorQueryFrameProcessor to run all operators on all the rows instead of creating smaller RACs per partition by key. That approach was discarded in favor of the batching approach, and the details are summarized here: #16823 (comment).
This commit is contained in:
Akshat Jain 2024-08-28 11:32:47 +05:30 committed by GitHub
parent 862ccda59b
commit fbd305af0f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 509 additions and 290 deletions

View File

@ -80,7 +80,6 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
private final WritableFrameChannel outputChannel;
private final FrameWriterFactory frameWriterFactory;
private final FrameReader frameReader;
private final ArrayList<ResultRow> objectsOfASingleRac;
private final int maxRowsMaterialized;
private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed
private Cursor frameCursor = null;
@ -95,6 +94,9 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
// Type strategies are pushed in the same order as column types in frameReader.signature()
private final NullableTypeStrategy[] typeStrategies;
private final ArrayList<ResultRow> rowsToProcess;
private int lastPartitionIndex = -1;
public WindowOperatorQueryFrameProcessor(
WindowOperatorQuery query,
ReadableFrameChannel inputChannel,
@ -116,7 +118,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
this.query = query;
this.frameRowsAndCols = new ArrayList<>();
this.resultRowAndCols = new ArrayList<>();
this.objectsOfASingleRac = new ArrayList<>();
this.rowsToProcess = new ArrayList<>();
this.maxRowsMaterialized = maxRowsMaterializedInWindow;
this.partitionColumnNames = partitionColumnNames;
@ -153,188 +155,117 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
{
/*
*
* PARTITION BY A ORDER BY B
*
* Frame 1 -> rac1
* A B
* 1, 2
* 1, 3
* 2, 1 --> key changed
* 2, 2
*
*
* Frame 2 -> rac2
* 3, 1 --> key changed
* 3, 2
* 3, 3
* 3, 4
*
* Frame 3 -> rac3
*
* 3, 5
* 3, 6
* 4, 1 --> key changed
* 4, 2
*
* In case of empty OVER clause, all these racs need to be added to a single rows and columns
* to be processed. The way we can do this is to use a ConcatRowsAndColumns
* ConcatRC [rac1, rac2, rac3]
* Run all ops on this
*
*
* The flow would look like:
* 1. Validate if the operator doesn't have any OVER() clause with PARTITION BY for this stage.
* 2. If 1 is true make a giant rows and columns (R&C) using concat as shown above
* Let all operators run amok on that R&C
* 3. If 1 is false
* Read a frame
* keep the older row in a class variable
* check row by row and compare current with older row to check if partition boundary is reached
* when frame partition by changes
* create R&C for those particular set of columns, they would have the same partition key
* output will be a single R&C
* write to output channel
*
*
* Future thoughts: {@link https://github.com/apache/druid/issues/16126}
*
* 1. We are writing 1 partition to each frame in this way. In case of high cardinality data
* we will be making a large number of small frames. We can have a check to keep size of frame to a value
* say 20k rows and keep on adding to the same pending frame and not create a new frame
*
* 2. Current approach with R&C and operators materialize a single R&C for processing. In case of data
* with low cardinality a single R&C might be too big to consume. Same for the case of empty OVER() clause
* Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with 2 passes of the data.
* We might think to reimplement them in the MSQ way so that we do not have to materialize so much data
There are 2 scenarios:
*** Scenario 1: Query has atleast one window function with an OVER() clause without a PARTITION BY ***
In this scenario, we add all the RACs to a single RowsAndColumns to be processed. We do it via ConcatRowsAndColumns, and run all the operators on the ConcatRowsAndColumns.
This is done because we anyway need to run the operators on the entire set of rows when we have an OVER() clause without a PARTITION BY.
This scenario corresponds to partitionColumnNames.isEmpty()=true code flow.
*** Scenario 2: All window functions in the query have OVER() clause with a PARTITION BY ***
In this scenario, we need to process rows for each PARTITION BY group together, but we can batch multiple PARTITION BY keys into the same RAC before passing it to the operators for processing.
Batching is fine since the operators list would have the required NaivePartitioningOperatorFactory to segregate each PARTITION BY group during the processing.
The flow for this scenario can be summarised as following:
1. Frame Reading and Cursor Initialization: We start by reading a frame from the inputChannel and initializing frameCursor to iterate over the rows in that frame.
2. Row Comparison: For each row in the frame, we decide whether it belongs to the same PARTITION BY group as the previous row.
This is determined by comparePartitionKeys() method.
Please refer to the Javadoc of that method for further details and an example illustration.
2.1. If the PARTITION BY columns of current row matches the PARTITION BY columns of the previous row,
they belong to the same PARTITION BY group, and gets added to rowsToProcess.
If the number of total rows materialized exceed maxRowsMaterialized, we process the pending batch via processRowsUpToLastPartition() method.
2.2. If they don't match, then we have reached a partition boundary.
In this case, we update the value for lastPartitionIndex.
3. End of Input: If the input channel is finished, any remaining rows in rowsToProcess are processed.
*Illustration of Row Comparison step*
Let's say we have window_function() OVER (PARTITION BY A ORDER BY B) in our query, and we get 3 frames in the input channel to process.
Frame 1
A, B
1, 2
1, 3
2, 1 --> PARTITION BY key (column A) changed from 1 to 2.
2, 2
Frame 2
A, B
3, 1 --> PARTITION BY key (column A) changed from 2 to 3.
3, 2
3, 3
3, 4
Frame 3
A, B
3, 5
3, 6
4, 1 --> PARTITION BY key (column A) changed from 3 to 4.
4, 2
*Why batching?*
We batch multiple PARTITION BY keys for processing together to avoid the overhead of creating different RACs for each PARTITION BY keys, as that would be unnecessary in scenarios where we have a large number of PARTITION BY keys, but each key having a single row.
*Future thoughts: https://github.com/apache/druid/issues/16126*
Current approach with R&C and operators materialize a single R&C for processing. In case of data with low cardinality a single R&C might be too big to consume. Same for the case of empty OVER() clause.
Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with 2 passes of the data. We might think to reimplement them in the MSQ way so that we do not have to materialize so much data.
*/
if (partitionColumnNames.isEmpty()) {
// If we do not have any OVER() clause with PARTITION BY for this stage.
// Bring all data to a single executor for processing.
// Convert each frame to RAC.
// Concatenate all the racs to make a giant RAC.
// Let all operators run on the giant RAC until channel is finished.
// Scenario 1: Query has atleast one window function with an OVER() clause without a PARTITION BY.
if (inputChannel.canRead()) {
final Frame frame = inputChannel.read();
convertRowFrameToRowsAndColumns(frame);
return ReturnOrAwait.runAgain();
} else if (inputChannel.isFinished()) {
runAllOpsOnMultipleRac(frameRowsAndCols);
return ReturnOrAwait.returnObject(Unit.instance());
} else {
return ReturnOrAwait.awaitAll(inputChannels().size());
}
return ReturnOrAwait.runAgain();
} else {
// Aha, you found a PARTITION BY and maybe ORDER BY TO
// PARTITION BY can also be on multiple keys
// typically the last stage would already partition and sort for you
// figure out frame boundaries and convert each distinct group to a rac
// then run the windowing operator only on each rac
if (frameCursor == null || frameCursor.isDone()) {
if (readableInputs.isEmpty()) {
return ReturnOrAwait.awaitAll(1);
} else if (inputChannel.canRead()) {
final Frame frame = inputChannel.read();
frameCursor = FrameProcessors.makeCursor(frame, frameReader);
final ColumnSelectorFactory frameColumnSelectorFactory = frameCursor.getColumnSelectorFactory();
final Supplier<Object>[] fieldSuppliers = new Supplier[frameReader.signature().size()];
for (int i = 0; i < fieldSuppliers.length; i++) {
final ColumnValueSelector<?> selector =
frameColumnSelectorFactory.makeColumnValueSelector(frameReader.signature().getColumnName(i));
fieldSuppliers[i] = selector::getObject;
}
}
rowSupplierFromFrameCursor = () -> {
final ResultRow row = ResultRow.create(fieldSuppliers.length);
for (int i = 0; i < fieldSuppliers.length; i++) {
row.set(i, fieldSuppliers[i].get());
}
return row;
};
} else if (inputChannel.isFinished()) {
// reaached end of channel
// if there is data remaining
// write it into a rac
// and run operators on it
if (!objectsOfASingleRac.isEmpty()) {
if (objectsOfASingleRac.size() > maxRowsMaterialized) {
throw new MSQException(new TooManyRowsInAWindowFault(objectsOfASingleRac.size(), maxRowsMaterialized));
}
RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromResultRow(
objectsOfASingleRac,
frameReader.signature()
);
runAllOpsOnSingleRac(rac);
objectsOfASingleRac.clear();
}
return ReturnOrAwait.returnObject(Unit.instance());
} else {
return ReturnOrAwait.runAgain();
}
}
while (!frameCursor.isDone()) {
final ResultRow currentRow = rowSupplierFromFrameCursor.get();
if (outputRow == null) {
outputRow = currentRow;
objectsOfASingleRac.add(currentRow);
} else if (comparePartitionKeys(outputRow, currentRow, partitionColumnNames)) {
// if they have the same partition key
// keep adding them after checking
// guardrails
objectsOfASingleRac.add(currentRow);
if (objectsOfASingleRac.size() > maxRowsMaterialized) {
throw new MSQException(new TooManyRowsInAWindowFault(
objectsOfASingleRac.size(),
maxRowsMaterialized
));
}
} else {
// key change noted
// create rac from the rows seen before
// run the operators on these rows and columns
// clean up the object to hold the new rows only
if (objectsOfASingleRac.size() > maxRowsMaterialized) {
throw new MSQException(new TooManyRowsInAWindowFault(
objectsOfASingleRac.size(),
maxRowsMaterialized
));
}
RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromResultRow(
objectsOfASingleRac,
frameReader.signature()
);
runAllOpsOnSingleRac(rac);
objectsOfASingleRac.clear();
outputRow = currentRow.copy();
return ReturnOrAwait.runAgain();
}
frameCursor.advance();
// Scenario 2: All window functions in the query have OVER() clause with a PARTITION BY
if (frameCursor == null || frameCursor.isDone()) {
if (readableInputs.isEmpty()) {
return ReturnOrAwait.awaitAll(1);
} else if (inputChannel.canRead()) {
final Frame frame = inputChannel.read();
frameCursor = FrameProcessors.makeCursor(frame, frameReader);
makeRowSupplierFromFrameCursor();
} else if (inputChannel.isFinished()) {
// Handle any remaining data.
lastPartitionIndex = rowsToProcess.size() - 1;
processRowsUpToLastPartition();
return ReturnOrAwait.returnObject(Unit.instance());
} else {
return ReturnOrAwait.runAgain();
}
}
return ReturnOrAwait.runAgain();
}
/**
* @param singleRac Use this {@link RowsAndColumns} as a single input for the operators to be run
*/
private void runAllOpsOnSingleRac(RowsAndColumns singleRac)
{
Operator op = new Operator()
{
@Nullable
@Override
public Closeable goOrContinue(Closeable continuationObject, Receiver receiver)
{
receiver.push(singleRac);
if (singleRac.numRows() > maxRowsMaterialized) {
throw new MSQException(new TooManyRowsInAWindowFault(singleRac.numRows(), maxRowsMaterialized));
while (!frameCursor.isDone()) {
final ResultRow currentRow = rowSupplierFromFrameCursor.get();
if (outputRow == null) {
outputRow = currentRow;
rowsToProcess.add(currentRow);
} else if (comparePartitionKeys(outputRow, currentRow, partitionColumnNames)) {
// Add current row to the same batch of rows for processing.
rowsToProcess.add(currentRow);
if (rowsToProcess.size() > maxRowsMaterialized) {
// We don't want to materialize more than maxRowsMaterialized rows at any point in time, so process the pending batch.
processRowsUpToLastPartition();
}
receiver.completed();
return null;
ensureMaxRowsInAWindowConstraint(rowsToProcess.size());
} else {
lastPartitionIndex = rowsToProcess.size() - 1;
outputRow = currentRow.copy();
return ReturnOrAwait.runAgain();
}
};
runOperatorsAfterThis(op);
frameCursor.advance();
}
return ReturnOrAwait.runAgain();
}
/**
@ -349,9 +280,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
public Closeable goOrContinue(Closeable continuationObject, Receiver receiver)
{
RowsAndColumns rac = new ConcatRowsAndColumns(listOfRacs);
if (rac.numRows() > maxRowsMaterialized) {
throw new MSQException(new TooManyRowsInAWindowFault(rac.numRows(), maxRowsMaterialized));
}
ensureMaxRowsInAWindowConstraint(rac.numRows());
receiver.push(rac);
receiver.completed();
return null;
@ -496,12 +425,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
null
);
// check if existing + newly added rows exceed guardrails
if (frameRowsAndCols.size() + ldrc.numRows() > maxRowsMaterialized) {
throw new MSQException(new TooManyRowsInAWindowFault(
frameRowsAndCols.size() + ldrc.numRows(),
maxRowsMaterialized
));
}
ensureMaxRowsInAWindowConstraint(frameRowsAndCols.size() + ldrc.numRows());
frameRowsAndCols.add(ldrc);
}
@ -533,4 +457,57 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
}
return match == partitionColumnNames.size();
}
private void makeRowSupplierFromFrameCursor()
{
final ColumnSelectorFactory frameColumnSelectorFactory = frameCursor.getColumnSelectorFactory();
final Supplier<Object>[] fieldSuppliers = new Supplier[frameReader.signature().size()];
for (int i = 0; i < fieldSuppliers.length; i++) {
final ColumnValueSelector<?> selector =
frameColumnSelectorFactory.makeColumnValueSelector(frameReader.signature().getColumnName(i));
fieldSuppliers[i] = selector::getObject;
}
rowSupplierFromFrameCursor = () -> {
final ResultRow row = ResultRow.create(fieldSuppliers.length);
for (int i = 0; i < fieldSuppliers.length; i++) {
row.set(i, fieldSuppliers[i].get());
}
return row;
};
}
/**
* Process rows from rowsToProcess[0, lastPartitionIndex].
*/
private void processRowsUpToLastPartition()
{
if (lastPartitionIndex == -1) {
return;
}
RowsAndColumns singleRac = MapOfColumnsRowsAndColumns.fromResultRowTillIndex(
rowsToProcess,
frameReader.signature(),
lastPartitionIndex
);
ArrayList<RowsAndColumns> rowsAndColumns = new ArrayList<>();
rowsAndColumns.add(singleRac);
runAllOpsOnMultipleRac(rowsAndColumns);
// Remove elements in the range [0, lastPartitionIndex] from the list.
// The call to list.subList(a, b).clear() deletes the elements in the range [a, b - 1],
// causing the remaining elements to shift and start from index 0.
rowsToProcess.subList(0, lastPartitionIndex + 1).clear();
lastPartitionIndex = -1;
}
private void ensureMaxRowsInAWindowConstraint(int numRowsInWindow)
{
if (numRowsInWindow > maxRowsMaterialized) {
throw new MSQException(new TooManyRowsInAWindowFault(
numRowsInWindow,
maxRowsMaterialized
));
}
}
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.querykit;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.frame.processor.FrameProcessorExecutor;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.segment.FrameStorageAdapter;
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.kernel.StagePartition;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class FrameProcessorTestBase extends InitializedNullHandlingTest
{
protected static final StagePartition STAGE_PARTITION = new StagePartition(new StageId("q", 0), 0);
protected FrameProcessorExecutor exec;
@Before
public void setUp()
{
exec = new FrameProcessorExecutor(MoreExecutors.listeningDecorator(Execs.singleThreaded("test-exec")));
}
@After
public void tearDown() throws Exception
{
exec.getExecutorService().shutdownNow();
exec.getExecutorService().awaitTermination(10, TimeUnit.MINUTES);
}
protected ReadableInput makeChannelFromAdapter(
final StorageAdapter adapter,
final List<KeyColumn> keyColumns,
int rowsPerInputFrame
) throws IOException
{
// Create a single, sorted frame.
final FrameSequenceBuilder singleFrameBuilder =
FrameSequenceBuilder.fromAdapter(adapter)
.frameType(FrameType.ROW_BASED)
.maxRowsPerFrame(Integer.MAX_VALUE)
.sortBy(keyColumns);
final RowSignature signature = singleFrameBuilder.signature();
final Frame frame = Iterables.getOnlyElement(singleFrameBuilder.frames().toList());
// Split it up into frames that match rowsPerFrame. Set max size enough to hold all rows we might ever want to use.
final BlockingQueueFrameChannel channel = new BlockingQueueFrameChannel(10_000);
final FrameReader frameReader = FrameReader.create(signature);
final FrameSequenceBuilder frameSequenceBuilder =
FrameSequenceBuilder.fromAdapter(new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY))
.frameType(FrameType.ROW_BASED)
.maxRowsPerFrame(rowsPerInputFrame);
final Sequence<Frame> frames = frameSequenceBuilder.frames();
frames.forEach(
f -> {
try {
channel.writable().write(f);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
);
channel.writable().close();
return ReadableInput.channel(channel.readable(), FrameReader.create(signature), STAGE_PARTITION);
}
}

View File

@ -0,0 +1,235 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.querykit;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.frame.allocation.HeapMemoryAllocator;
import org.apache.druid.frame.allocation.SingleMemoryAllocatorFactory;
import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.indexing.CountingWritableFrameChannel;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.test.LimitedFrameWriterFactory;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.operator.window.WindowOperatorFactory;
import org.apache.druid.query.operator.window.ranking.WindowRowNumberProcessor;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.LegacySegmentSpec;
import org.apache.druid.segment.RowBasedSegment;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.timeline.SegmentId;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class WindowOperatorQueryFrameProcessorTest extends FrameProcessorTestBase
{
@Test
public void testBatchingOfPartitionByKeys_singleBatch() throws Exception
{
// With maxRowsMaterialized=100, we will get 1 frame:
// [1, 1, 2, 2, 2, 3, 3]
validateBatching(100, 1);
}
@Test
public void testBatchingOfPartitionByKeys_multipleBatches_1() throws Exception
{
// With maxRowsMaterialized=5, we will get 2 frames:
// [1, 1, 2, 2, 2]
// [3, 3]
validateBatching(5, 2);
}
@Test
public void testBatchingOfPartitionByKeys_multipleBatches_2() throws Exception
{
// With maxRowsMaterialized=4, we will get 3 frames:
// [1, 1]
// [2, 2, 2]
// [3, 3]
validateBatching(4, 3);
}
@Test
public void testBatchingOfPartitionByKeys_TooManyRowsInAWindowFault()
{
final RuntimeException e = Assert.assertThrows(
RuntimeException.class,
() -> validateBatching(2, 3)
);
MatcherAssert.assertThat(
((MSQException) e.getCause().getCause()).getFault(),
CoreMatchers.instanceOf(TooManyRowsInAWindowFault.class)
);
Assert.assertTrue(e.getMessage().contains("TooManyRowsInAWindow: Too many rows in a window (requested = 3, max = 2)"));
}
public void validateBatching(int maxRowsMaterialized, int numFramesWritten) throws Exception
{
final ReadableInput factChannel = buildWindowTestInputChannel();
RowSignature inputSignature = RowSignature.builder()
.add("cityName", ColumnType.STRING)
.add("added", ColumnType.LONG)
.build();
FrameReader frameReader = FrameReader.create(inputSignature);
RowSignature outputSignature = RowSignature.builder()
.addAll(inputSignature)
.add("w0", ColumnType.LONG)
.build();
final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
ChannelCounters channelCounters = new ChannelCounters();
final CountingWritableFrameChannel countingWritableFrameChannel = new CountingWritableFrameChannel(
outputChannel.writable(),
channelCounters,
0
);
final WindowOperatorQuery query = new WindowOperatorQuery(
new QueryDataSource(
Druids.newScanQueryBuilder()
.dataSource(new TableDataSource("test"))
.intervals(new LegacySegmentSpec(Intervals.ETERNITY))
.columns("cityName", "added")
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(new HashMap<>())
.build()),
new LegacySegmentSpec(Intervals.ETERNITY),
new HashMap<>(),
outputSignature,
ImmutableList.of(
new WindowOperatorFactory(new WindowRowNumberProcessor("w0"))
),
ImmutableList.of()
);
// Limit output frames to 1 row to ensure we test edge cases
final FrameWriterFactory frameWriterFactory = new LimitedFrameWriterFactory(
FrameWriters.makeRowBasedFrameWriterFactory(
new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
outputSignature,
Collections.emptyList(),
false
),
100
);
final WindowOperatorQueryFrameProcessor processor = new WindowOperatorQueryFrameProcessor(
query,
factChannel.getChannel(),
countingWritableFrameChannel,
frameWriterFactory,
frameReader,
new ObjectMapper(),
ImmutableList.of(
new WindowOperatorFactory(new WindowRowNumberProcessor("w0"))
),
inputSignature,
maxRowsMaterialized,
ImmutableList.of("added")
);
exec.runFully(processor, null);
final Sequence<List<Object>> rowsFromProcessor = FrameTestUtil.readRowsFromFrameChannel(
outputChannel.readable(),
FrameReader.create(outputSignature)
);
final List<List<Object>> rows = rowsFromProcessor.toList();
long actualNumFrames = Arrays.stream(channelCounters.snapshot().getFrames()).findFirst().getAsLong();
Assert.assertEquals(numFramesWritten, actualNumFrames);
Assert.assertEquals(7, rows.size());
}
private ReadableInput buildWindowTestInputChannel() throws IOException
{
RowSignature inputSignature = RowSignature.builder()
.add("cityName", ColumnType.STRING)
.add("added", ColumnType.LONG)
.build();
List<Map<String, Object>> rows = ImmutableList.of(
ImmutableMap.of("added", 1L, "cityName", "city1"),
ImmutableMap.of("added", 1L, "cityName", "city2"),
ImmutableMap.of("added", 2L, "cityName", "city3"),
ImmutableMap.of("added", 2L, "cityName", "city4"),
ImmutableMap.of("added", 2L, "cityName", "city5"),
ImmutableMap.of("added", 3L, "cityName", "city6"),
ImmutableMap.of("added", 3L, "cityName", "city7")
);
return makeChannelFromRows(rows, inputSignature, Collections.emptyList());
}
private ReadableInput makeChannelFromRows(
List<Map<String, Object>> rows,
RowSignature signature,
List<KeyColumn> keyColumns
) throws IOException
{
RowBasedSegment<Map<String, Object>> segment = new RowBasedSegment<>(
SegmentId.dummy("test"),
Sequences.simple(rows),
columnName -> m -> m.get(columnName),
signature
);
return makeChannelFromAdapter(segment.asStorageAdapter(), keyColumns);
}
private ReadableInput makeChannelFromAdapter(
final StorageAdapter adapter,
final List<KeyColumn> keyColumns
) throws IOException
{
return makeChannelFromAdapter(adapter, keyColumns, 1000);
}
}

View File

@ -21,15 +21,11 @@ package org.apache.druid.msq.querykit.common;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
import org.apache.druid.frame.allocation.SingleMemoryAllocatorFactory;
import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
@ -37,23 +33,17 @@ import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.ReadableNilFrameChannel;
import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.frame.key.KeyOrder;
import org.apache.druid.frame.processor.FrameProcessorExecutor;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.segment.FrameStorageAdapter;
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Unit;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.TooManyRowsWithSameKeyFault;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.kernel.StagePartition;
import org.apache.druid.msq.querykit.FrameProcessorTestBase;
import org.apache.druid.msq.test.LimitedFrameWriterFactory;
import org.apache.druid.segment.RowBasedSegment;
import org.apache.druid.segment.StorageAdapter;
@ -61,15 +51,12 @@ import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.join.JoinTestHelper;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.SegmentId;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@ -82,19 +69,15 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@RunWith(Parameterized.class)
public class SortMergeJoinFrameProcessorTest extends InitializedNullHandlingTest
public class SortMergeJoinFrameProcessorTest extends FrameProcessorTestBase
{
private static final StagePartition STAGE_PARTITION = new StagePartition(new StageId("q", 0), 0);
private static final long MAX_BUFFERED_BYTES = 10_000_000;
private final int rowsPerInputFrame;
private final int rowsPerOutputFrame;
private FrameProcessorExecutor exec;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@ -118,19 +101,6 @@ public class SortMergeJoinFrameProcessorTest extends InitializedNullHandlingTest
return constructors;
}
@Before
public void setUp()
{
exec = new FrameProcessorExecutor(MoreExecutors.listeningDecorator(Execs.singleThreaded("test-exec")));
}
@After
public void tearDown() throws Exception
{
exec.getExecutorService().shutdownNow();
exec.getExecutorService().awaitTermination(10, TimeUnit.MINUTES);
}
@Test
public void testLeftJoinEmptyLeftSide() throws Exception
{
@ -1531,40 +1501,7 @@ public class SortMergeJoinFrameProcessorTest extends InitializedNullHandlingTest
final List<KeyColumn> keyColumns
) throws IOException
{
// Create a single, sorted frame.
final FrameSequenceBuilder singleFrameBuilder =
FrameSequenceBuilder.fromAdapter(adapter)
.frameType(FrameType.ROW_BASED)
.maxRowsPerFrame(Integer.MAX_VALUE)
.sortBy(keyColumns);
final RowSignature signature = singleFrameBuilder.signature();
final Frame frame = Iterables.getOnlyElement(singleFrameBuilder.frames().toList());
// Split it up into frames that match rowsPerFrame. Set max size enough to hold all rows we might ever want to use.
final BlockingQueueFrameChannel channel = new BlockingQueueFrameChannel(10_000);
final FrameReader frameReader = FrameReader.create(signature);
final FrameSequenceBuilder frameSequenceBuilder =
FrameSequenceBuilder.fromAdapter(new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY))
.frameType(FrameType.ROW_BASED)
.maxRowsPerFrame(rowsPerInputFrame);
final Sequence<Frame> frames = frameSequenceBuilder.frames();
frames.forEach(
f -> {
try {
channel.writable().write(f);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
);
channel.writable().close();
return ReadableInput.channel(channel.readable(), FrameReader.create(signature), STAGE_PARTITION);
return makeChannelFromAdapter(adapter, keyColumns, rowsPerInputFrame);
}
private FrameWriterFactory makeFrameWriterFactory(final RowSignature signature)

View File

@ -20,54 +20,32 @@
package org.apache.druid.msq.querykit.results;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
import org.apache.druid.frame.channel.WritableFrameChannel;
import org.apache.druid.frame.processor.FrameProcessorExecutor;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.java.util.common.Unit;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.kernel.StagePartition;
import org.apache.druid.msq.querykit.FrameProcessorTestBase;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class QueryResultsFrameProcessorTest extends InitializedNullHandlingTest
public class QueryResultsFrameProcessorTest extends FrameProcessorTestBase
{
private FrameProcessorExecutor exec;
@Before
public void setUp()
{
exec = new FrameProcessorExecutor(MoreExecutors.listeningDecorator(Execs.singleThreaded("test-exec")));
}
@After
public void tearDown() throws Exception
{
exec.getExecutorService().shutdownNow();
exec.getExecutorService().awaitTermination(10, TimeUnit.MINUTES);
}
@Test
public void sanityTest() throws ExecutionException, InterruptedException, IOException
{

View File

@ -20,7 +20,6 @@
package org.apache.druid.msq.querykit.scan;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.frame.Frame;
@ -30,7 +29,6 @@ import org.apache.druid.frame.allocation.HeapMemoryAllocator;
import org.apache.druid.frame.allocation.SingleMemoryAllocatorFactory;
import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
import org.apache.druid.frame.channel.WritableFrameChannel;
import org.apache.druid.frame.processor.FrameProcessorExecutor;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.frame.testutil.FrameTestUtil;
@ -39,11 +37,11 @@ import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Unit;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.kernel.StagePartition;
import org.apache.druid.msq.querykit.FrameProcessorTestBase;
import org.apache.druid.msq.test.LimitedFrameWriterFactory;
import org.apache.druid.query.Druids;
import org.apache.druid.query.scan.ScanQuery;
@ -51,35 +49,16 @@ import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
public class ScanQueryFrameProcessorTest extends InitializedNullHandlingTest
public class ScanQueryFrameProcessorTest extends FrameProcessorTestBase
{
private FrameProcessorExecutor exec;
@Before
public void setUp()
{
exec = new FrameProcessorExecutor(MoreExecutors.listeningDecorator(Execs.singleThreaded("test-exec")));
}
@After
public void tearDown() throws Exception
{
exec.getExecutorService().shutdownNow();
exec.getExecutorService().awaitTermination(10, TimeUnit.MINUTES);
}
@Test
public void test_runWithInputChannel() throws Exception
{

View File

@ -84,11 +84,16 @@ public class MapOfColumnsRowsAndColumns implements RowsAndColumns
}
public static MapOfColumnsRowsAndColumns fromResultRow(ArrayList<ResultRow> objs, RowSignature signature)
{
return fromResultRowTillIndex(objs, signature, objs.size() - 1);
}
public static MapOfColumnsRowsAndColumns fromResultRowTillIndex(ArrayList<ResultRow> objs, RowSignature signature, int index)
{
final Builder bob = builder();
if (!objs.isEmpty()) {
Object[][] columnOriented = new Object[objs.get(0).length()][objs.size()];
for (int i = 0; i < objs.size(); ++i) {
Object[][] columnOriented = new Object[objs.get(0).length()][index + 1];
for (int i = 0; i <= index; ++i) {
for (int j = 0; j < objs.get(i).length(); ++j) {
columnOriented[j][i] = objs.get(i).get(j);
}