WindowOperatorQueryFrameProcessor: Avoid unnecessary re-runs of runIncrementally() (#17211)

This commit is contained in:
Akshat Jain 2024-10-03 15:33:50 +05:30 committed by GitHub
parent 8c4db8aeed
commit edc235cfe1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 8 additions and 6 deletions

View File

@ -279,17 +279,19 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
} else if (comparePartitionKeys(outputRow, currentRow, partitionColumnNames)) { } else if (comparePartitionKeys(outputRow, currentRow, partitionColumnNames)) {
// Add current row to the same batch of rows for processing. // Add current row to the same batch of rows for processing.
rowsToProcess.add(currentRow); rowsToProcess.add(currentRow);
if (rowsToProcess.size() > maxRowsMaterialized) {
// We don't want to materialize more than maxRowsMaterialized rows at any point in time, so process the pending batch.
processRowsUpToLastPartition();
}
ensureMaxRowsInAWindowConstraint(rowsToProcess.size());
} else { } else {
lastPartitionIndex = rowsToProcess.size() - 1; lastPartitionIndex = rowsToProcess.size() - 1;
outputRow = currentRow.copy(); outputRow = currentRow.copy();
return ReturnOrAwait.runAgain(); rowsToProcess.add(currentRow);
} }
frameCursor.advance(); frameCursor.advance();
if (rowsToProcess.size() > maxRowsMaterialized) {
// We don't want to materialize more than maxRowsMaterialized rows at any point in time, so process the pending batch.
processRowsUpToLastPartition();
ensureMaxRowsInAWindowConstraint(rowsToProcess.size());
return ReturnOrAwait.runAgain();
}
} }
return ReturnOrAwait.runAgain(); return ReturnOrAwait.runAgain();
} }