diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java index e5fa0a03d62..06dce22a189 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java @@ -40,6 +40,7 @@ import org.apache.druid.frame.write.FrameWriterFactory; import org.apache.druid.frame.write.InvalidFieldException; import org.apache.druid.frame.write.InvalidNullByteException; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Unit; import org.apache.druid.java.util.common.guava.Sequence; @@ -63,6 +64,8 @@ import org.apache.druid.query.Order; import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.scan.ScanQueryEngine; import org.apache.druid.query.scan.ScanResultValue; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.Cursor; @@ -102,6 +105,7 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor private final Closer closer = Closer.create(); private Cursor cursor; + private Closeable cursorCloser; private Segment segment; private final SimpleSettableOffset cursorOffset = new SimpleAscendingOffset(Integer.MAX_VALUE); private FrameWriter frameWriter; @@ -156,6 +160,7 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor @Override public void cleanup() throws IOException { + closer.register(cursorCloser); closer.register(frameWriter); closer.register(super::cleanup); closer.close(); @@ -221,7 +226,7 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor cursorYielder.close(); return ReturnOrAwait.returnObject(handedOffSegments); } else { - final long rowsFlushed = setNextCursor(cursorYielder.get(), null); + final long rowsFlushed = setNextCursor(cursorYielder.get(), null, null); closer.register(cursorYielder); if (rowsFlushed > 0) { return ReturnOrAwait.runAgain(); @@ -256,16 +261,21 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor ); } - final CursorHolder cursorHolder = closer.register( - cursorFactory.makeCursorHolder(ScanQueryEngine.makeCursorBuildSpec(query, null)) - ); - final Cursor nextCursor = cursorHolder.asCursor(); + final CursorHolder nextCursorHolder = + cursorFactory.makeCursorHolder( + ScanQueryEngine.makeCursorBuildSpec( + query.withQuerySegmentSpec(new SpecificSegmentSpec(segment.getDescriptor())), + null + ) + ); + final Cursor nextCursor = nextCursorHolder.asCursor(); if (nextCursor == null) { // No cursors! + nextCursorHolder.close(); return ReturnOrAwait.returnObject(Unit.instance()); } else { - final long rowsFlushed = setNextCursor(nextCursor, segmentHolder.get().getSegment()); + final long rowsFlushed = setNextCursor(nextCursor, nextCursorHolder, segmentHolder.get().getSegment()); assert rowsFlushed == 0; // There's only ever one cursor when running with a segment } } @@ -302,16 +312,21 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor ); } - final CursorHolder cursorHolder = closer.register( - cursorFactory.makeCursorHolder(ScanQueryEngine.makeCursorBuildSpec(query, null)) - ); - final Cursor nextCursor = cursorHolder.asCursor(); + final CursorHolder nextCursorHolder = + cursorFactory.makeCursorHolder( + ScanQueryEngine.makeCursorBuildSpec( + query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY)), + null + ) + ); + final Cursor nextCursor = nextCursorHolder.asCursor(); if (nextCursor == null) { // no cursor + nextCursorHolder.close(); return ReturnOrAwait.returnObject(Unit.instance()); } - final long rowsFlushed = setNextCursor(nextCursor, frameSegment); + final long rowsFlushed = setNextCursor(nextCursor, nextCursorHolder, frameSegment); if (rowsFlushed > 0) { return ReturnOrAwait.runAgain(); @@ -415,10 +430,20 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor } } - private long setNextCursor(final Cursor cursor, final Segment segment) throws IOException + private long setNextCursor( + final Cursor cursor, + @Nullable final Closeable cursorCloser, + final Segment segment + ) throws IOException { final long rowsFlushed = flushFrameWriter(); + if (this.cursorCloser != null) { + // Close here, don't add to the processor-level Closer, to avoid leaking CursorHolders. We may generate many + // CursorHolders per instance of this processor, and we need to close them as we go, not all at the end. + this.cursorCloser.close(); + } this.cursor = cursor; + this.cursorCloser = cursorCloser; this.segment = segment; this.cursorOffset.reset(); return rowsFlushed; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java index 97ade19f5bc..1636e117740 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java @@ -56,8 +56,7 @@ public class ScanQueryFrameProcessorFactory extends BaseLeafFrameProcessorFactor { super(query); this.query = Preconditions.checkNotNull(query, "query"); - this.runningCountForLimit = - query.isLimited() && query.getOrderBys().isEmpty() ? new AtomicLong() : null; + this.runningCountForLimit = query.isLimited() && query.getOrderBys().isEmpty() ? new AtomicLong() : null; } @JsonProperty diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java index bfb511f949f..af0a7203570 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java @@ -19,9 +19,11 @@ package org.apache.druid.msq.querykit.scan; +import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.collections.ReferenceCountingResourceHolder; import org.apache.druid.collections.ResourceHolder; +import org.apache.druid.collections.StupidResourceHolder; import org.apache.druid.frame.Frame; import org.apache.druid.frame.FrameType; import org.apache.druid.frame.allocation.ArenaMemoryAllocator; @@ -39,6 +41,8 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Unit; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.msq.input.ReadableInput; +import org.apache.druid.msq.input.table.RichSegmentDescriptor; +import org.apache.druid.msq.input.table.SegmentWithDescriptor; import org.apache.druid.msq.kernel.StageId; import org.apache.druid.msq.kernel.StagePartition; import org.apache.druid.msq.querykit.FrameProcessorTestBase; @@ -46,10 +50,15 @@ import org.apache.druid.msq.test.LimitedFrameWriterFactory; import org.apache.druid.query.Druids; import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.CursorFactory; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.QueryableIndexCursorFactory; +import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.TestIndex; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory; +import org.apache.druid.timeline.SegmentId; import org.junit.Assert; import org.junit.Test; @@ -60,6 +69,91 @@ import java.util.function.Function; public class ScanQueryFrameProcessorTest extends FrameProcessorTestBase { + + @Test + public void test_runWithSegments() throws Exception + { + final QueryableIndex queryableIndex = TestIndex.getMMappedTestIndex(); + + final CursorFactory cursorFactory = + new QueryableIndexCursorFactory(queryableIndex); + + // put funny intervals on query to ensure it is adjusted to the segment interval before building cursor + final ScanQuery query = + Druids.newScanQueryBuilder() + .dataSource("test") + .intervals( + new MultipleIntervalSegmentSpec( + ImmutableList.of( + Intervals.of("2001-01-01T00Z/2011-01-01T00Z"), + Intervals.of("2011-01-02T00Z/2021-01-01T00Z") + ) + ) + ) + .columns(cursorFactory.getRowSignature().getColumnNames()) + .build(); + + final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal(); + + // Limit output frames to 1 row to ensure we test edge cases + final FrameWriterFactory frameWriterFactory = new LimitedFrameWriterFactory( + FrameWriters.makeRowBasedFrameWriterFactory( + new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()), + cursorFactory.getRowSignature(), + Collections.emptyList(), + false + ), + 1 + ); + + final ScanQueryFrameProcessor processor = new ScanQueryFrameProcessor( + query, + null, + new DefaultObjectMapper(), + ReadableInput.segment( + new SegmentWithDescriptor( + () -> new StupidResourceHolder<>(new CompleteSegment(null, new QueryableIndexSegment(queryableIndex, SegmentId.dummy("test")))), + new RichSegmentDescriptor(queryableIndex.getDataInterval(), queryableIndex.getDataInterval(), "dummy_version", 0) + ) + ), + Function.identity(), + new ResourceHolder() + { + @Override + public WritableFrameChannel get() + { + return outputChannel.writable(); + } + + @Override + public void close() + { + try { + outputChannel.writable().close(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + }, + new ReferenceCountingResourceHolder<>(frameWriterFactory, () -> {}) + ); + + ListenableFuture retVal = exec.runFully(processor, null); + + final Sequence> rowsFromProcessor = FrameTestUtil.readRowsFromFrameChannel( + outputChannel.readable(), + FrameReader.create(cursorFactory.getRowSignature()) + ); + + FrameTestUtil.assertRowsEqual( + FrameTestUtil.readRowsFromCursorFactory(cursorFactory, cursorFactory.getRowSignature(), false), + rowsFromProcessor + ); + + Assert.assertEquals(Unit.instance(), retVal.get()); + } + @Test public void test_runWithInputChannel() throws Exception { @@ -83,10 +177,18 @@ public class ScanQueryFrameProcessorTest extends FrameProcessorTestBase } } + // put funny intervals on query to ensure it is adjusted to the segment interval before building cursor final ScanQuery query = Druids.newScanQueryBuilder() .dataSource("test") - .intervals(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY)) + .intervals( + new MultipleIntervalSegmentSpec( + ImmutableList.of( + Intervals.of("2001-01-01T00Z/2011-01-01T00Z"), + Intervals.of("2011-01-02T00Z/2021-01-01T00Z") + ) + ) + ) .columns(cursorFactory.getRowSignature().getColumnNames()) .build();