[Backport] ScanQueryFrameProcessor: Close CursorHolders as we go along. (#17152) (#17168) (#17245)

* ScanQueryFrameProcessor: Close CursorHolders as we go along. (#17152)
* fix issue with ScanQueryFrameProcessor cursor build not adjusting intervals (#17168)
---------
Co-authored-by: Gian Merlino <gianmerlino@gmail.com>
Co-authored-by: Clint Wylie <cwylie@apache.org>
This commit is contained in:
Kashif Faraz 2024-10-04 17:13:55 +05:30 committed by GitHub
parent 50eb7321d2
commit c1622be527
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 141 additions and 15 deletions

View File

@ -40,6 +40,7 @@ import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.frame.write.InvalidFieldException; import org.apache.druid.frame.write.InvalidFieldException;
import org.apache.druid.frame.write.InvalidNullByteException; import org.apache.druid.frame.write.InvalidNullByteException;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.Unit; import org.apache.druid.java.util.common.Unit;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
@ -63,6 +64,8 @@ import org.apache.druid.query.Order;
import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.scan.ScanQueryEngine; import org.apache.druid.query.scan.ScanQueryEngine;
import org.apache.druid.query.scan.ScanResultValue; import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.Cursor; import org.apache.druid.segment.Cursor;
@ -102,6 +105,7 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
private final Closer closer = Closer.create(); private final Closer closer = Closer.create();
private Cursor cursor; private Cursor cursor;
private Closeable cursorCloser;
private Segment segment; private Segment segment;
private final SimpleSettableOffset cursorOffset = new SimpleAscendingOffset(Integer.MAX_VALUE); private final SimpleSettableOffset cursorOffset = new SimpleAscendingOffset(Integer.MAX_VALUE);
private FrameWriter frameWriter; private FrameWriter frameWriter;
@ -156,6 +160,7 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
@Override @Override
public void cleanup() throws IOException public void cleanup() throws IOException
{ {
closer.register(cursorCloser);
closer.register(frameWriter); closer.register(frameWriter);
closer.register(super::cleanup); closer.register(super::cleanup);
closer.close(); closer.close();
@ -221,7 +226,7 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
cursorYielder.close(); cursorYielder.close();
return ReturnOrAwait.returnObject(handedOffSegments); return ReturnOrAwait.returnObject(handedOffSegments);
} else { } else {
final long rowsFlushed = setNextCursor(cursorYielder.get(), null); final long rowsFlushed = setNextCursor(cursorYielder.get(), null, null);
closer.register(cursorYielder); closer.register(cursorYielder);
if (rowsFlushed > 0) { if (rowsFlushed > 0) {
return ReturnOrAwait.runAgain(); return ReturnOrAwait.runAgain();
@ -256,16 +261,21 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
); );
} }
final CursorHolder cursorHolder = closer.register( final CursorHolder nextCursorHolder =
cursorFactory.makeCursorHolder(ScanQueryEngine.makeCursorBuildSpec(query, null)) cursorFactory.makeCursorHolder(
); ScanQueryEngine.makeCursorBuildSpec(
final Cursor nextCursor = cursorHolder.asCursor(); query.withQuerySegmentSpec(new SpecificSegmentSpec(segment.getDescriptor())),
null
)
);
final Cursor nextCursor = nextCursorHolder.asCursor();
if (nextCursor == null) { if (nextCursor == null) {
// No cursors! // No cursors!
nextCursorHolder.close();
return ReturnOrAwait.returnObject(Unit.instance()); return ReturnOrAwait.returnObject(Unit.instance());
} else { } else {
final long rowsFlushed = setNextCursor(nextCursor, segmentHolder.get().getSegment()); final long rowsFlushed = setNextCursor(nextCursor, nextCursorHolder, segmentHolder.get().getSegment());
assert rowsFlushed == 0; // There's only ever one cursor when running with a segment assert rowsFlushed == 0; // There's only ever one cursor when running with a segment
} }
} }
@ -302,16 +312,21 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
); );
} }
final CursorHolder cursorHolder = closer.register( final CursorHolder nextCursorHolder =
cursorFactory.makeCursorHolder(ScanQueryEngine.makeCursorBuildSpec(query, null)) cursorFactory.makeCursorHolder(
); ScanQueryEngine.makeCursorBuildSpec(
final Cursor nextCursor = cursorHolder.asCursor(); query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY)),
null
)
);
final Cursor nextCursor = nextCursorHolder.asCursor();
if (nextCursor == null) { if (nextCursor == null) {
// no cursor // no cursor
nextCursorHolder.close();
return ReturnOrAwait.returnObject(Unit.instance()); return ReturnOrAwait.returnObject(Unit.instance());
} }
final long rowsFlushed = setNextCursor(nextCursor, frameSegment); final long rowsFlushed = setNextCursor(nextCursor, nextCursorHolder, frameSegment);
if (rowsFlushed > 0) { if (rowsFlushed > 0) {
return ReturnOrAwait.runAgain(); return ReturnOrAwait.runAgain();
@ -415,10 +430,20 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
} }
} }
private long setNextCursor(final Cursor cursor, final Segment segment) throws IOException private long setNextCursor(
final Cursor cursor,
@Nullable final Closeable cursorCloser,
final Segment segment
) throws IOException
{ {
final long rowsFlushed = flushFrameWriter(); final long rowsFlushed = flushFrameWriter();
if (this.cursorCloser != null) {
// Close here, don't add to the processor-level Closer, to avoid leaking CursorHolders. We may generate many
// CursorHolders per instance of this processor, and we need to close them as we go, not all at the end.
this.cursorCloser.close();
}
this.cursor = cursor; this.cursor = cursor;
this.cursorCloser = cursorCloser;
this.segment = segment; this.segment = segment;
this.cursorOffset.reset(); this.cursorOffset.reset();
return rowsFlushed; return rowsFlushed;

View File

@ -56,8 +56,7 @@ public class ScanQueryFrameProcessorFactory extends BaseLeafFrameProcessorFactor
{ {
super(query); super(query);
this.query = Preconditions.checkNotNull(query, "query"); this.query = Preconditions.checkNotNull(query, "query");
this.runningCountForLimit = this.runningCountForLimit = query.isLimited() && query.getOrderBys().isEmpty() ? new AtomicLong() : null;
query.isLimited() && query.getOrderBys().isEmpty() ? new AtomicLong() : null;
} }
@JsonProperty @JsonProperty

View File

@ -19,9 +19,11 @@
package org.apache.druid.msq.querykit.scan; package org.apache.druid.msq.querykit.scan;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.collections.ReferenceCountingResourceHolder; import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.collections.ResourceHolder; import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.collections.StupidResourceHolder;
import org.apache.druid.frame.Frame; import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType; import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.ArenaMemoryAllocator; import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
@ -39,6 +41,8 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Unit; import org.apache.druid.java.util.common.Unit;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.msq.input.ReadableInput; import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.input.table.RichSegmentDescriptor;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.msq.kernel.StageId; import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.kernel.StagePartition; import org.apache.druid.msq.kernel.StagePartition;
import org.apache.druid.msq.querykit.FrameProcessorTestBase; import org.apache.druid.msq.querykit.FrameProcessorTestBase;
@ -46,10 +50,15 @@ import org.apache.druid.msq.test.LimitedFrameWriterFactory;
import org.apache.druid.query.Druids; import org.apache.druid.query.Druids;
import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.CursorFactory; import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexCursorFactory;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.TestIndex; import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory; import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory;
import org.apache.druid.timeline.SegmentId;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -60,6 +69,91 @@ import java.util.function.Function;
public class ScanQueryFrameProcessorTest extends FrameProcessorTestBase public class ScanQueryFrameProcessorTest extends FrameProcessorTestBase
{ {
@Test
public void test_runWithSegments() throws Exception
{
final QueryableIndex queryableIndex = TestIndex.getMMappedTestIndex();
final CursorFactory cursorFactory =
new QueryableIndexCursorFactory(queryableIndex);
// put funny intervals on query to ensure it is adjusted to the segment interval before building cursor
final ScanQuery query =
Druids.newScanQueryBuilder()
.dataSource("test")
.intervals(
new MultipleIntervalSegmentSpec(
ImmutableList.of(
Intervals.of("2001-01-01T00Z/2011-01-01T00Z"),
Intervals.of("2011-01-02T00Z/2021-01-01T00Z")
)
)
)
.columns(cursorFactory.getRowSignature().getColumnNames())
.build();
final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
// Limit output frames to 1 row to ensure we test edge cases
final FrameWriterFactory frameWriterFactory = new LimitedFrameWriterFactory(
FrameWriters.makeRowBasedFrameWriterFactory(
new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
cursorFactory.getRowSignature(),
Collections.emptyList(),
false
),
1
);
final ScanQueryFrameProcessor processor = new ScanQueryFrameProcessor(
query,
null,
new DefaultObjectMapper(),
ReadableInput.segment(
new SegmentWithDescriptor(
() -> new StupidResourceHolder<>(new CompleteSegment(null, new QueryableIndexSegment(queryableIndex, SegmentId.dummy("test")))),
new RichSegmentDescriptor(queryableIndex.getDataInterval(), queryableIndex.getDataInterval(), "dummy_version", 0)
)
),
Function.identity(),
new ResourceHolder<WritableFrameChannel>()
{
@Override
public WritableFrameChannel get()
{
return outputChannel.writable();
}
@Override
public void close()
{
try {
outputChannel.writable().close();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
},
new ReferenceCountingResourceHolder<>(frameWriterFactory, () -> {})
);
ListenableFuture<Object> retVal = exec.runFully(processor, null);
final Sequence<List<Object>> rowsFromProcessor = FrameTestUtil.readRowsFromFrameChannel(
outputChannel.readable(),
FrameReader.create(cursorFactory.getRowSignature())
);
FrameTestUtil.assertRowsEqual(
FrameTestUtil.readRowsFromCursorFactory(cursorFactory, cursorFactory.getRowSignature(), false),
rowsFromProcessor
);
Assert.assertEquals(Unit.instance(), retVal.get());
}
@Test @Test
public void test_runWithInputChannel() throws Exception public void test_runWithInputChannel() throws Exception
{ {
@ -83,10 +177,18 @@ public class ScanQueryFrameProcessorTest extends FrameProcessorTestBase
} }
} }
// put funny intervals on query to ensure it is adjusted to the segment interval before building cursor
final ScanQuery query = final ScanQuery query =
Druids.newScanQueryBuilder() Druids.newScanQueryBuilder()
.dataSource("test") .dataSource("test")
.intervals(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY)) .intervals(
new MultipleIntervalSegmentSpec(
ImmutableList.of(
Intervals.of("2001-01-01T00Z/2011-01-01T00Z"),
Intervals.of("2011-01-02T00Z/2021-01-01T00Z")
)
)
)
.columns(cursorFactory.getRowSignature().getColumnNames()) .columns(cursorFactory.getRowSignature().getColumnNames())
.build(); .build();