[Backport] WindowOperatorQueryFrameProcessor fixes (#17209) (#17211) (#17231)

* WindowOperatorQueryFrameProcessor: Fix frame writer capacity issues + adhere to FrameProcessor's contract (#17209)
* WindowOperatorQueryFrameProcessor: Avoid unnecessary re-runs of runIncrementally() (#17211)
This commit is contained in:
Akshat Jain 2024-10-03 19:53:43 +05:30 committed by GitHub
parent e4d1848aba
commit feae7f7649
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 237 additions and 57 deletions

View File

@ -83,7 +83,6 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
private final FrameWriterFactory frameWriterFactory;
private final FrameReader frameReader;
private final int maxRowsMaterialized;
private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed
private Cursor frameCursor = null;
private Supplier<ResultRow> rowSupplierFromFrameCursor;
private ResultRow outputRow = null;
@ -99,6 +98,8 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
private final ArrayList<ResultRow> rowsToProcess;
private int lastPartitionIndex = -1;
final AtomicInteger rowId = new AtomicInteger(0);
public WindowOperatorQueryFrameProcessor(
WindowOperatorQuery query,
ReadableFrameChannel inputChannel,
@ -155,7 +156,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
}
@Override
public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs) throws IOException
{
/*
There are 2 scenarios:
@ -216,32 +217,54 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with 2 passes of the data. We might think to reimplement them in the MSQ way so that we do not have to materialize so much data.
*/
// If there are rows pending flush, flush them and run again before processing any more rows.
if (frameHasRowsPendingFlush()) {
flushAllRowsAndCols();
return ReturnOrAwait.runAgain();
}
if (partitionColumnNames.isEmpty()) {
// Scenario 1: Query has atleast one window function with an OVER() clause without a PARTITION BY.
if (inputChannel.canRead()) {
final Frame frame = inputChannel.read();
convertRowFrameToRowsAndColumns(frame);
return ReturnOrAwait.runAgain();
} else if (inputChannel.isFinished()) {
runAllOpsOnMultipleRac(frameRowsAndCols);
return ReturnOrAwait.returnObject(Unit.instance());
} else {
return ReturnOrAwait.awaitAll(inputChannels().size());
}
if (inputChannel.isFinished()) {
// If no rows are flushed yet, process all rows.
if (rowId.get() == 0) {
runAllOpsOnMultipleRac(frameRowsAndCols);
}
// If there are still rows pending after operations, run again.
if (frameHasRowsPendingFlush()) {
return ReturnOrAwait.runAgain();
}
return ReturnOrAwait.returnObject(Unit.instance());
}
return ReturnOrAwait.awaitAll(inputChannels().size());
}
// Scenario 2: All window functions in the query have OVER() clause with a PARTITION BY
if (frameCursor == null || frameCursor.isDone()) {
if (readableInputs.isEmpty()) {
return ReturnOrAwait.awaitAll(1);
} else if (inputChannel.canRead()) {
}
if (inputChannel.canRead()) {
final Frame frame = inputChannel.read();
frameCursor = FrameProcessors.makeCursor(frame, frameReader);
makeRowSupplierFromFrameCursor();
} else if (inputChannel.isFinished()) {
// Handle any remaining data.
lastPartitionIndex = rowsToProcess.size() - 1;
processRowsUpToLastPartition();
// If we have some rows pending processing, process them.
// We run it again as it's possible that frame writer's capacity got reached and some output rows are
// pending flush to the output channel.
if (!rowsToProcess.isEmpty()) {
lastPartitionIndex = rowsToProcess.size() - 1;
processRowsUpToLastPartition();
return ReturnOrAwait.runAgain();
}
return ReturnOrAwait.returnObject(Unit.instance());
} else {
return ReturnOrAwait.runAgain();
@ -256,17 +279,19 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
} else if (comparePartitionKeys(outputRow, currentRow, partitionColumnNames)) {
// Add current row to the same batch of rows for processing.
rowsToProcess.add(currentRow);
if (rowsToProcess.size() > maxRowsMaterialized) {
// We don't want to materialize more than maxRowsMaterialized rows at any point in time, so process the pending batch.
processRowsUpToLastPartition();
}
ensureMaxRowsInAWindowConstraint(rowsToProcess.size());
} else {
lastPartitionIndex = rowsToProcess.size() - 1;
outputRow = currentRow.copy();
return ReturnOrAwait.runAgain();
rowsToProcess.add(currentRow);
}
frameCursor.advance();
if (rowsToProcess.size() > maxRowsMaterialized) {
// We don't want to materialize more than maxRowsMaterialized rows at any point in time, so process the pending batch.
processRowsUpToLastPartition();
ensureMaxRowsInAWindowConstraint(rowsToProcess.size());
return ReturnOrAwait.runAgain();
}
}
return ReturnOrAwait.runAgain();
}
@ -313,41 +338,30 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
public void completed()
{
try {
// resultRowsAndCols has reference to frameRowsAndCols
// due to the chain of calls across the ops
// so we can clear after writing to output
flushAllRowsAndCols(resultRowAndCols);
frameRowsAndCols.clear();
flushAllRowsAndCols();
}
catch (IOException e) {
throw new RuntimeException(e);
}
finally {
frameRowsAndCols.clear();
resultRowAndCols.clear();
}
}
});
}
/**
* @param resultRowAndCols Flush the list of {@link RowsAndColumns} to a frame
* Flushes {@link #resultRowAndCols} to the frame starting from {@link #rowId}, upto the frame writer's capacity.
* @throws IOException
*/
private void flushAllRowsAndCols(ArrayList<RowsAndColumns> resultRowAndCols) throws IOException
private void flushAllRowsAndCols() throws IOException
{
RowsAndColumns rac = new ConcatRowsAndColumns(resultRowAndCols);
AtomicInteger rowId = new AtomicInteger(0);
createFrameWriterIfNeeded(rac, rowId);
writeRacToFrame(rac, rowId);
createFrameWriterIfNeeded(rac);
writeRacToFrame(rac);
}
/**
* @param rac The frame writer to write this {@link RowsAndColumns} object
* @param rowId RowId to get the column selector factory from the {@link RowsAndColumns} object
*/
private void createFrameWriterIfNeeded(RowsAndColumns rac, AtomicInteger rowId)
private void createFrameWriterIfNeeded(RowsAndColumns rac)
{
if (frameWriter == null) {
final ColumnSelectorFactoryMaker csfm = ColumnSelectorFactoryMaker.fromRAC(rac);
@ -355,32 +369,38 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
final ColumnSelectorFactory frameWriterColumnSelectorFactoryWithVirtualColumns =
frameWriterVirtualColumns.wrap(frameWriterColumnSelectorFactory);
frameWriter = frameWriterFactory.newFrameWriter(frameWriterColumnSelectorFactoryWithVirtualColumns);
currentAllocatorCapacity = frameWriterFactory.allocatorCapacity();
}
}
/**
* @param rac {@link RowsAndColumns} to be written to frame
* @param rowId Counter to keep track of how many rows are added
* @throws IOException
*/
public void writeRacToFrame(RowsAndColumns rac, AtomicInteger rowId) throws IOException
public void writeRacToFrame(RowsAndColumns rac) throws IOException
{
final int numRows = rac.numRows();
rowId.set(0);
while (rowId.get() < numRows) {
final boolean didAddToFrame = frameWriter.addSelection();
if (didAddToFrame) {
if (frameWriter.addSelection()) {
incrementBoostColumn();
rowId.incrementAndGet();
partitionBoostVirtualColumn.setValue(partitionBoostVirtualColumn.getValue() + 1);
} else if (frameWriter.getNumRows() == 0) {
throw new FrameRowTooLargeException(currentAllocatorCapacity);
} else {
} else if (frameWriter.getNumRows() > 0) {
flushFrameWriter();
return;
createFrameWriterIfNeeded(rac);
if (frameWriter.addSelection()) {
incrementBoostColumn();
rowId.incrementAndGet();
return;
} else {
throw new FrameRowTooLargeException(frameWriterFactory.allocatorCapacity());
}
} else {
throw new FrameRowTooLargeException(frameWriterFactory.allocatorCapacity());
}
}
flushFrameWriter();
clearRACBuffers();
}
@Override
@ -521,4 +541,28 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
));
}
}
/**
* Increments the value of the partition boosting column. It should be called once the row value has been written
* to the frame
*/
private void incrementBoostColumn()
{
partitionBoostVirtualColumn.setValue(partitionBoostVirtualColumn.getValue() + 1);
}
/**
* @return true if frame has rows pending flush to the output channel, false otherwise.
*/
private boolean frameHasRowsPendingFlush()
{
return frameWriter != null && frameWriter.getNumRows() > 0;
}
private void clearRACBuffers()
{
frameRowsAndCols.clear();
resultRowAndCols.clear();
rowId.set(0);
}
}

View File

@ -66,6 +66,99 @@ import java.util.Map;
public class WindowOperatorQueryFrameProcessorTest extends FrameProcessorTestBase
{
private static final List<Map<String, Object>> INPUT_ROWS = ImmutableList.of(
ImmutableMap.of("added", 1L, "cityName", "city1"),
ImmutableMap.of("added", 1L, "cityName", "city2"),
ImmutableMap.of("added", 2L, "cityName", "city3"),
ImmutableMap.of("added", 2L, "cityName", "city4"),
ImmutableMap.of("added", 2L, "cityName", "city5"),
ImmutableMap.of("added", 3L, "cityName", "city6"),
ImmutableMap.of("added", 3L, "cityName", "city7")
);
@Test
public void testFrameWriterReachingCapacity() throws IOException
{
// This test validates that all output rows are flushed to the output channel even if frame writer's
// capacity is reached, by subsequent iterations of runIncrementally.
final ReadableInput factChannel = buildWindowTestInputChannel();
RowSignature inputSignature = RowSignature.builder()
.add("cityName", ColumnType.STRING)
.add("added", ColumnType.LONG)
.build();
FrameReader frameReader = FrameReader.create(inputSignature);
RowSignature outputSignature = RowSignature.builder()
.addAll(inputSignature)
.add("w0", ColumnType.LONG)
.build();
final WindowOperatorQuery query = new WindowOperatorQuery(
new QueryDataSource(
Druids.newScanQueryBuilder()
.dataSource(new TableDataSource("test"))
.intervals(new LegacySegmentSpec(Intervals.ETERNITY))
.columns("cityName", "added")
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(new HashMap<>())
.build()),
new LegacySegmentSpec(Intervals.ETERNITY),
new HashMap<>(),
outputSignature,
ImmutableList.of(
new WindowOperatorFactory(new WindowRowNumberProcessor("w0"))
),
ImmutableList.of()
);
final FrameWriterFactory frameWriterFactory = new LimitedFrameWriterFactory(
FrameWriters.makeRowBasedFrameWriterFactory(
new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
outputSignature,
Collections.emptyList(),
false
),
INPUT_ROWS.size() / 4 // This forces frameWriter's capacity to be reached.
);
final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
final WindowOperatorQueryFrameProcessor processor = new WindowOperatorQueryFrameProcessor(
query,
factChannel.getChannel(),
outputChannel.writable(),
frameWriterFactory,
frameReader,
new ObjectMapper(),
ImmutableList.of(
new WindowOperatorFactory(new WindowRowNumberProcessor("w0"))
),
inputSignature,
100,
ImmutableList.of("added")
);
exec.runFully(processor, null);
final Sequence<List<Object>> rowsFromProcessor = FrameTestUtil.readRowsFromFrameChannel(
outputChannel.readable(),
FrameReader.create(outputSignature)
);
List<List<Object>> outputRows = rowsFromProcessor.toList();
Assert.assertEquals(INPUT_ROWS.size(), outputRows.size());
for (int i = 0; i < INPUT_ROWS.size(); i++) {
Map<String, Object> inputRow = INPUT_ROWS.get(i);
List<Object> outputRow = outputRows.get(i);
Assert.assertEquals("cityName should match", inputRow.get("cityName"), outputRow.get(0));
Assert.assertEquals("added should match", inputRow.get("added"), outputRow.get(1));
Assert.assertEquals("row_number() should be correct", (long) i + 1, outputRow.get(2));
}
}
@Test
public void testBatchingOfPartitionByKeys_singleBatch() throws Exception
{
@ -195,18 +288,7 @@ public class WindowOperatorQueryFrameProcessorTest extends FrameProcessorTestBas
.add("cityName", ColumnType.STRING)
.add("added", ColumnType.LONG)
.build();
List<Map<String, Object>> rows = ImmutableList.of(
ImmutableMap.of("added", 1L, "cityName", "city1"),
ImmutableMap.of("added", 1L, "cityName", "city2"),
ImmutableMap.of("added", 2L, "cityName", "city3"),
ImmutableMap.of("added", 2L, "cityName", "city4"),
ImmutableMap.of("added", 2L, "cityName", "city5"),
ImmutableMap.of("added", 3L, "cityName", "city6"),
ImmutableMap.of("added", 3L, "cityName", "city7")
);
return makeChannelFromRows(rows, inputSignature, Collections.emptyList());
return makeChannelFromRows(INPUT_ROWS, inputSignature, Collections.emptyList());
}
private ReadableInput makeChannelFromRows(

View File

@ -0,0 +1,54 @@
!set plannerStrategy DECOUPLED
!use druidtest://?componentSupplier=DrillWindowQueryMSQComponentSupplier
!set outputformat mysql
# This test validates that all output rows are flushed to the output channel even if frame writer's capacity is reached.
select count(*) as actualNumRows
from (
select countryName, cityName, channel, added, delta, row_number() over() as rowNumber
from wikipedia
group by countryName, cityName, channel, added, delta
);
+---------------+
| actualNumRows |
+---------------+
| 11631 |
+---------------+
(1 row)
!ok
# Validate that all rows are outputted by window WindowOperatorQueryFrameProcessor layer for empty over() clause scenario.
select count(*) as numRows, max(rowNumber) as maxRowNumber
from (
select countryName, cityName, channel, added, delta, row_number() over() as rowNumber
from wikipedia
group by countryName, cityName, channel, added, delta
);
+---------+--------------+
| numRows | maxRowNumber |
+---------+--------------+
| 11631 | 11631 |
+---------+--------------+
(1 row)
!ok
# Validate that all rows are outputted by window WindowOperatorQueryFrameProcessor layer for non-empty over() clause scenario.
select rowNumber, count(rowNumber) as numRows
from (
select countryName, cityName, channel, added, delta, row_number() over(partition by countryName, cityName, channel, added, delta) as rowNumber
from wikipedia
group by countryName, cityName, channel, added, delta
) group by rowNumber;
+-----------+---------+
| rowNumber | numRows |
+-----------+---------+
| 1 | 11631 |
+-----------+---------+
(1 row)
!ok