WindowOperatorQueryFrameProcessor: Avoid writing multiple frames to output channel in runIncrementally() (#17373)

WindowOperatorQueryFrameProcessor: Avoid writing multiple frames to output channel in runIncrementally()
This commit is contained in:
Akshat Jain 2024-10-23 10:34:37 +05:30 committed by GitHub
parent 43b325b6aa
commit 1e96c85b38
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 87 additions and 0 deletions

View File

@ -144,6 +144,9 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
if (needToProcessBatch()) {
runAllOpsOnBatch();
if (inputChannel.isFinished()) {
return ReturnOrAwait.runAgain();
}
flushAllRowsAndCols();
}
return ReturnOrAwait.runAgain();

View File

@ -156,6 +156,90 @@ public class WindowOperatorQueryFrameProcessorTest extends FrameProcessorTestBas
}
}
@Test
public void testOutputChannelReachingCapacity() throws IOException
{
// This test validates that we don't end up writing multiple (2) frames to the output channel while reading from the input channel,
// in the scenario when the input channel has finished and receiver's completed() gets called.
final ReadableInput factChannel = buildWindowTestInputChannel();
RowSignature inputSignature = RowSignature.builder()
.add("cityName", ColumnType.STRING)
.add("added", ColumnType.LONG)
.build();
FrameReader frameReader = FrameReader.create(inputSignature);
RowSignature outputSignature = RowSignature.builder()
.addAll(inputSignature)
.add("w0", ColumnType.LONG)
.build();
final WindowOperatorQuery query = new WindowOperatorQuery(
new QueryDataSource(
Druids.newScanQueryBuilder()
.dataSource(new TableDataSource("test"))
.intervals(new LegacySegmentSpec(Intervals.ETERNITY))
.columns("cityName", "added")
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(new HashMap<>())
.build()),
new LegacySegmentSpec(Intervals.ETERNITY),
new HashMap<>(
// This ends up satisfying the criteria of needToProcessBatch() method,
// so we end up processing the rows we've read, hence writing the 1st frame to the output channel.
ImmutableMap.of(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW, 12)
),
outputSignature,
ImmutableList.of(
new WindowOperatorFactory(new WindowRowNumberProcessor("w0"))
),
ImmutableList.of()
);
final FrameWriterFactory frameWriterFactory = new LimitedFrameWriterFactory(
FrameWriters.makeRowBasedFrameWriterFactory(
new ArenaMemoryAllocatorFactory(1 << 20),
outputSignature,
Collections.emptyList(),
false
),
INPUT_ROWS.size() / 4 // This forces frameWriter's capacity to be reached, hence requiring another write.
);
final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
final WindowOperatorQueryFrameProcessor processor = new WindowOperatorQueryFrameProcessor(
query,
factChannel.getChannel(),
outputChannel.writable(),
frameWriterFactory,
frameReader,
new ObjectMapper(),
ImmutableList.of(
new WindowOperatorFactory(new WindowRowNumberProcessor("w0"))
)
);
exec.runFully(processor, null);
final Sequence<List<Object>> rowsFromProcessor = FrameTestUtil.readRowsFromFrameChannel(
outputChannel.readable(),
FrameReader.create(outputSignature)
);
List<List<Object>> outputRows = rowsFromProcessor.toList();
Assert.assertEquals(INPUT_ROWS.size(), outputRows.size());
for (int i = 0; i < INPUT_ROWS.size(); i++) {
Map<String, Object> inputRow = INPUT_ROWS.get(i);
List<Object> outputRow = outputRows.get(i);
Assert.assertEquals("cityName should match", inputRow.get("cityName"), outputRow.get(0));
Assert.assertEquals("added should match", inputRow.get("added"), outputRow.get(1));
Assert.assertEquals("row_number() should be correct", (long) i + 1, outputRow.get(2));
}
}
@Test
public void testProcessorRun() throws Exception
{