From 491087fbe305ee9e057ca6b8eef09e7f33bd986c Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Wed, 2 Oct 2024 14:34:27 +0530 Subject: [PATCH] Modify DataSegmentProvider to also return DataSegment (#17021) (#17217) Currently, TaskDataSegmentProvider fetches the DataSegment from the Coordinator while loading the segment, but just discards it later. This PR refactors this to also return the DataSegment so that it can be used by workers without a separate fetch. Co-authored-by: Adarsh Sanjeev --- .../msq/exec/TaskDataSegmentProvider.java | 24 +++--- .../external/ExternalInputSliceReader.java | 3 +- .../input/inline/InlineInputSliceReader.java | 3 +- .../input/lookup/LookupInputSliceReader.java | 3 +- .../input/table/SegmentWithDescriptor.java | 7 +- .../msq/querykit/DataSegmentProvider.java | 4 +- .../GroupByPreShuffleFrameProcessor.java | 6 +- .../scan/ScanQueryFrameProcessor.java | 7 +- .../msq/exec/TaskDataSegmentProviderTest.java | 8 +- .../druid/msq/test/CalciteMSQTestsHelper.java | 21 +++-- .../apache/druid/msq/test/MSQTestBase.java | 24 ++++-- .../MSQTestDelegateDataSegmentPusher.java | 2 +- .../druid/msq/test/MSQTestSegmentManager.java | 19 ++--- .../apache/druid/segment/CompleteSegment.java | 81 +++++++++++++++++++ .../druid/segment/CompleteSegmentTest.java | 49 +++++++++++ 15 files changed, 209 insertions(+), 52 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/segment/CompleteSegment.java create mode 100644 processing/src/test/java/org/apache/druid/segment/CompleteSegmentTest.java diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java index c327ec340ae..04b08f9346a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java @@ -29,10 +29,10 @@ import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.msq.counters.ChannelCounters; import org.apache.druid.msq.querykit.DataSegmentProvider; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; -import org.apache.druid.segment.Segment; import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.timeline.DataSegment; @@ -70,7 +70,7 @@ public class TaskDataSegmentProvider implements DataSegmentProvider } @Override - public Supplier> fetchSegment( + public Supplier> fetchSegment( final SegmentId segmentId, final ChannelCounters channelCounters, final boolean isReindex @@ -79,7 +79,7 @@ public class TaskDataSegmentProvider implements DataSegmentProvider // Returns Supplier instead of ResourceHolder, so the Coordinator calls and segment downloads happen // in processing threads, rather than the main thread. (They happen when fetchSegmentInternal is called.) return () -> { - ResourceHolder holder = null; + ResourceHolder holder = null; while (holder == null) { holder = holders.computeIfAbsent( @@ -99,7 +99,7 @@ public class TaskDataSegmentProvider implements DataSegmentProvider * Helper used by {@link #fetchSegment(SegmentId, ChannelCounters, boolean)}. Does the actual fetching of a segment, once it * is determined that we definitely need to go out and get one. */ - private ReferenceCountingResourceHolder fetchSegmentInternal( + private ReferenceCountingResourceHolder fetchSegmentInternal( final SegmentId segmentId, final ChannelCounters channelCounters, final boolean isReindex @@ -133,7 +133,7 @@ public class TaskDataSegmentProvider implements DataSegmentProvider final int numRows = index.getNumRows(); final long size = dataSegment.getSize(); closer.register(() -> channelCounters.addFile(numRows, size)); - return new ReferenceCountingResourceHolder<>(segment, closer); + return new ReferenceCountingResourceHolder<>(new CompleteSegment(dataSegment, segment), closer); } catch (IOException | SegmentLoadingException e) { throw CloseableUtils.closeInCatch( @@ -143,13 +143,13 @@ public class TaskDataSegmentProvider implements DataSegmentProvider } } - private static class SegmentHolder implements Supplier> + private static class SegmentHolder implements Supplier> { - private final Supplier> holderSupplier; + private final Supplier> holderSupplier; private final Closeable cleanupFn; @GuardedBy("this") - private ReferenceCountingResourceHolder holder; + private ReferenceCountingResourceHolder holder; @GuardedBy("this") private boolean closing; @@ -157,7 +157,7 @@ public class TaskDataSegmentProvider implements DataSegmentProvider @GuardedBy("this") private boolean closed; - public SegmentHolder(Supplier> holderSupplier, Closeable cleanupFn) + public SegmentHolder(Supplier> holderSupplier, Closeable cleanupFn) { this.holderSupplier = holderSupplier; this.cleanupFn = cleanupFn; @@ -165,7 +165,7 @@ public class TaskDataSegmentProvider implements DataSegmentProvider @Override @Nullable - public ResourceHolder get() + public ResourceHolder get() { synchronized (this) { if (closing) { @@ -183,7 +183,7 @@ public class TaskDataSegmentProvider implements DataSegmentProvider // Then, return null so "fetchSegment" will try again. return null; } else if (holder == null) { - final ResourceHolder segmentHolder = holderSupplier.get(); + final ResourceHolder segmentHolder = holderSupplier.get(); holder = new ReferenceCountingResourceHolder<>( segmentHolder.get(), () -> { @@ -210,7 +210,7 @@ public class TaskDataSegmentProvider implements DataSegmentProvider } } ); - final ResourceHolder retVal = holder.increment(); + final ResourceHolder retVal = holder.increment(); // Store already-closed holder, so it disappears when the last reference is closed. holder.close(); return retVal; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java index 4b68a3bf1b0..2a863fa5525 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java @@ -45,6 +45,7 @@ import org.apache.druid.msq.input.ReadableInputs; import org.apache.druid.msq.input.table.RichSegmentDescriptor; import org.apache.druid.msq.input.table.SegmentWithDescriptor; import org.apache.druid.msq.util.DimensionSchemaUtils; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.RowBasedSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.column.ColumnHolder; @@ -163,7 +164,7 @@ public class ExternalInputSliceReader implements InputSliceReader signature ); return new SegmentWithDescriptor( - () -> ResourceHolder.fromCloseable(segment), + () -> ResourceHolder.fromCloseable(new CompleteSegment(null, segment)), new RichSegmentDescriptor(segmentId.toDescriptor(), null) ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSliceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSliceReader.java index ef58c7723b3..8a05ce1527e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSliceReader.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSliceReader.java @@ -30,6 +30,7 @@ import org.apache.druid.msq.input.ReadableInputs; import org.apache.druid.msq.input.table.RichSegmentDescriptor; import org.apache.druid.msq.input.table.SegmentWithDescriptor; import org.apache.druid.query.InlineDataSource; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.InlineSegmentWrangler; import org.apache.druid.segment.SegmentWrangler; import org.apache.druid.timeline.SegmentId; @@ -74,7 +75,7 @@ public class InlineInputSliceReader implements InputSliceReader segmentWrangler.getSegmentsForIntervals(dataSource, Intervals.ONLY_ETERNITY), segment -> ReadableInput.segment( new SegmentWithDescriptor( - () -> ResourceHolder.fromCloseable(segment), + () -> ResourceHolder.fromCloseable(new CompleteSegment(null, segment)), DUMMY_SEGMENT_DESCRIPTOR ) ) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSliceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSliceReader.java index 2b327f216f7..85f0b10718d 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSliceReader.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSliceReader.java @@ -32,6 +32,7 @@ import org.apache.druid.msq.input.ReadableInputs; import org.apache.druid.msq.input.table.RichSegmentDescriptor; import org.apache.druid.msq.input.table.SegmentWithDescriptor; import org.apache.druid.query.LookupDataSource; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.SegmentWrangler; import org.apache.druid.timeline.SegmentId; @@ -98,7 +99,7 @@ public class LookupInputSliceReader implements InputSliceReader throw new ISE("Lookup[%s] has multiple segments; cannot read", lookupName); } - return ResourceHolder.fromCloseable(segment); + return ResourceHolder.fromCloseable(new CompleteSegment(null, segment)); }, new RichSegmentDescriptor(SegmentId.dummy(lookupName).toDescriptor(), null) ) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java index b9026c7b9fb..343f7994d1e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java @@ -21,6 +21,7 @@ package org.apache.druid.msq.input.table; import com.google.common.base.Preconditions; import org.apache.druid.collections.ResourceHolder; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.Segment; import java.util.Objects; @@ -31,7 +32,7 @@ import java.util.function.Supplier; */ public class SegmentWithDescriptor { - private final Supplier> segmentSupplier; + private final Supplier> segmentSupplier; private final RichSegmentDescriptor descriptor; /** @@ -42,7 +43,7 @@ public class SegmentWithDescriptor * @param descriptor segment descriptor */ public SegmentWithDescriptor( - final Supplier> segmentSupplier, + final Supplier> segmentSupplier, final RichSegmentDescriptor descriptor ) { @@ -59,7 +60,7 @@ public class SegmentWithDescriptor * It is not necessary to call {@link Segment#close()} on the returned segment. Calling {@link ResourceHolder#close()} * is enough. */ - public ResourceHolder getOrLoad() + public ResourceHolder getOrLoad() { return segmentSupplier.get(); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java index 91ee4a48788..232e85166a0 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java @@ -21,7 +21,7 @@ package org.apache.druid.msq.querykit; import org.apache.druid.collections.ResourceHolder; import org.apache.druid.msq.counters.ChannelCounters; -import org.apache.druid.segment.Segment; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.timeline.SegmentId; import java.util.function.Supplier; @@ -35,7 +35,7 @@ public interface DataSegmentProvider *
* It is not necessary to call {@link ResourceHolder#close()} if you never call {@link Supplier#get()}. */ - Supplier> fetchSegment( + Supplier> fetchSegment( SegmentId segmentId, ChannelCounters channelCounters, boolean isReindex diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java index 05c6f36c35f..ad456a45f5b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java @@ -54,7 +54,7 @@ import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.segment.ColumnSelectorFactory; -import org.apache.druid.segment.Segment; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.TimeBoundaryInspector; import org.apache.druid.segment.column.RowSignature; @@ -152,8 +152,8 @@ public class GroupByPreShuffleFrameProcessor extends BaseLeafFrameProcessor protected ReturnOrAwait runWithSegment(final SegmentWithDescriptor segment) throws IOException { if (resultYielder == null) { - final ResourceHolder segmentHolder = closer.register(segment.getOrLoad()); - final SegmentReference mappedSegment = mapSegment(segmentHolder.get()); + final ResourceHolder segmentHolder = closer.register(segment.getOrLoad()); + final SegmentReference mappedSegment = mapSegment(segmentHolder.get().getSegment()); final Sequence rowSequence = groupingEngine.process( diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java index fbcae67012a..e5fa0a03d62 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java @@ -64,6 +64,7 @@ import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.scan.ScanQueryEngine; import org.apache.druid.query.scan.ScanResultValue; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.CursorFactory; import org.apache.druid.segment.CursorHolder; @@ -245,9 +246,9 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor protected ReturnOrAwait runWithSegment(final SegmentWithDescriptor segment) throws IOException { if (cursor == null) { - final ResourceHolder segmentHolder = closer.register(segment.getOrLoad()); + final ResourceHolder segmentHolder = closer.register(segment.getOrLoad()); - final Segment mappedSegment = mapSegment(segmentHolder.get()); + final Segment mappedSegment = mapSegment(segmentHolder.get().getSegment()); final CursorFactory cursorFactory = mappedSegment.asCursorFactory(); if (cursorFactory == null) { throw new ISE( @@ -264,7 +265,7 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor // No cursors! return ReturnOrAwait.returnObject(Unit.instance()); } else { - final long rowsFlushed = setNextCursor(nextCursor, segmentHolder.get()); + final long rowsFlushed = setNextCursor(nextCursor, segmentHolder.get().getSegment()); assert rowsFlushed == 0; // There's only ever one cursor when running with a segment } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java index b5141e12dc8..a9bf5e91bb4 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java @@ -43,11 +43,11 @@ import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.msq.counters.ChannelCounters; import org.apache.druid.query.OrderBy; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.DimensionHandler; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.Metadata; import org.apache.druid.segment.QueryableIndex; -import org.apache.druid.segment.Segment; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.TestIndex; import org.apache.druid.segment.column.ColumnHolder; @@ -187,7 +187,7 @@ public class TaskDataSegmentProviderTest for (int i = 0; i < iterations; i++) { final int expectedSegmentNumber = i % NUM_SEGMENTS; final DataSegment segment = segments.get(expectedSegmentNumber); - final ListenableFuture>> f = + final ListenableFuture>> f = exec.submit(() -> provider.fetchSegment(segment.getId(), new ChannelCounters(), false)); testFutures.add( @@ -195,8 +195,8 @@ public class TaskDataSegmentProviderTest f, holderSupplier -> { try { - final ResourceHolder holder = holderSupplier.get(); - Assert.assertEquals(segment.getId(), holder.get().getId()); + final ResourceHolder holder = holderSupplier.get(); + Assert.assertEquals(segment.getId(), holder.get().getSegment().getId()); final String expectedStorageDir = DataSegmentPusher.getDefaultStorageDir(segment, false); final File expectedFile = new File( diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java index 13dac0da6d4..7d65cea9872 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java @@ -61,15 +61,14 @@ import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryRunnerTest; import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.TestGroupByBuffers; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.CursorFactory; import org.apache.druid.segment.IndexBuilder; -import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexCursorFactory; import org.apache.druid.segment.Segment; import org.apache.druid.segment.TestIndex; -import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.loading.DataSegmentPusher; @@ -86,6 +85,7 @@ import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.calcite.util.TestDataBuilder; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.partition.LinearShardSpec; import org.easymock.EasyMock; import org.joda.time.Interval; import org.mockito.Mockito; @@ -161,11 +161,10 @@ public class CalciteMSQTestsHelper ) ); ObjectMapper testMapper = MSQTestBase.setupObjectMapper(dummyInjector); - IndexIO indexIO = new IndexIO(testMapper, ColumnConfig.DEFAULT); SegmentCacheManager segmentCacheManager = new SegmentCacheManagerFactory(TestIndex.INDEX_IO, testMapper) .manufacturate(cacheManagerDir); LocalDataSegmentPusherConfig config = new LocalDataSegmentPusherConfig(); - MSQTestSegmentManager segmentManager = new MSQTestSegmentManager(segmentCacheManager, indexIO); + MSQTestSegmentManager segmentManager = new MSQTestSegmentManager(segmentCacheManager); config.storageDirectory = storageDir; binder.bind(DataSegmentPusher.class).toProvider(() -> new MSQTestDelegateDataSegmentPusher( new LocalDataSegmentPusher(config), @@ -206,7 +205,10 @@ public class CalciteMSQTestsHelper return mockFactory; } - private static Supplier> getSupplierForSegment(Function tempFolderProducer, SegmentId segmentId) + protected static Supplier> getSupplierForSegment( + Function tempFolderProducer, + SegmentId segmentId + ) { final QueryableIndex index; switch (segmentId.getDataSource()) { @@ -450,6 +452,13 @@ public class CalciteMSQTestsHelper { } }; - return () -> new ReferenceCountingResourceHolder<>(segment, Closer.create()); + DataSegment dataSegment = DataSegment.builder() + .dataSource(segmentId.getDataSource()) + .interval(segmentId.getInterval()) + .version(segmentId.getVersion()) + .shardSpec(new LinearShardSpec(0)) + .size(0) + .build(); + return () -> new ReferenceCountingResourceHolder<>(new CompleteSegment(dataSegment, segment), Closer.create()); } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index ced9b85bdc0..6cfc37df9dd 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -134,6 +134,7 @@ import org.apache.druid.query.groupby.GroupByQueryRunnerTest; import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.TestGroupByBuffers; import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.CursorFactory; import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.IndexIO; @@ -197,6 +198,7 @@ import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.PruneLoadSpec; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.partition.LinearShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.ShardSpec; import org.apache.druid.timeline.partition.TombstoneShardSpec; @@ -421,7 +423,7 @@ public class MSQTestBase extends BaseCalciteQueryTest MSQSqlModule sqlModule = new MSQSqlModule(); - segmentManager = new MSQTestSegmentManager(segmentCacheManager, indexIO); + segmentManager = new MSQTestSegmentManager(segmentCacheManager); BrokerClient brokerClient = mock(BrokerClient.class); List modules = ImmutableList.of( @@ -631,7 +633,10 @@ public class MSQTestBase extends BaseCalciteQueryTest } @Nonnull - private Supplier> getSupplierForSegment(Function tempFolderProducer, SegmentId segmentId) + protected Supplier> getSupplierForSegment( + Function tempFolderProducer, + SegmentId segmentId + ) { if (segmentManager.getSegment(segmentId) == null) { final QueryableIndex index; @@ -722,7 +727,14 @@ public class MSQTestBase extends BaseCalciteQueryTest }; segmentManager.addSegment(segment); } - return () -> ReferenceCountingResourceHolder.fromCloseable(segmentManager.getSegment(segmentId)); + DataSegment dataSegment = DataSegment.builder() + .dataSource(segmentId.getDataSource()) + .interval(segmentId.getInterval()) + .version(segmentId.getVersion()) + .shardSpec(new LinearShardSpec(0)) + .size(0) + .build(); + return () -> ReferenceCountingResourceHolder.fromCloseable(new CompleteSegment(dataSegment, segmentManager.getSegment(segmentId))); } public SelectTester testSelectQuery() @@ -1232,17 +1244,17 @@ public class MSQTestBase extends BaseCalciteQueryTest verifyLookupLoadingInfoInTaskContext(msqControllerTask.getContext()); log.info( "found generated segments: %s", - segmentManager.getAllDataSegments().stream().map(s -> s.toString()).collect( + segmentManager.getAllTestGeneratedDataSegments().stream().map(s -> s.toString()).collect( Collectors.joining("\n")) ); // check if segments are created if (!expectedResultRows.isEmpty()) { - Assert.assertNotEquals(0, segmentManager.getAllDataSegments().size()); + Assert.assertNotEquals(0, segmentManager.getAllTestGeneratedDataSegments().size()); } String foundDataSource = null; SortedMap>> segmentIdVsOutputRowsMap = new TreeMap<>(); - for (DataSegment dataSegment : segmentManager.getAllDataSegments()) { + for (DataSegment dataSegment : segmentManager.getAllTestGeneratedDataSegments()) { //Assert shard spec class Assert.assertEquals(expectedShardSpec, dataSegment.getShardSpec().getClass()); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestDelegateDataSegmentPusher.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestDelegateDataSegmentPusher.java index 73fca53682c..5d22c08e602 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestDelegateDataSegmentPusher.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestDelegateDataSegmentPusher.java @@ -60,7 +60,7 @@ public class MSQTestDelegateDataSegmentPusher implements DataSegmentPusher public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException { final DataSegment dataSegment = delegate.push(file, segment, useUniquePath); - segmentManager.addDataSegment(dataSegment); + segmentManager.addTestGeneratedDataSegment(dataSegment); return dataSegment; } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestSegmentManager.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestSegmentManager.java index 6151cb37cc2..2c0fdac3a69 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestSegmentManager.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestSegmentManager.java @@ -20,7 +20,6 @@ package org.apache.druid.msq.test; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.Segment; import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.loading.SegmentLoadingException; @@ -37,24 +36,26 @@ import java.util.concurrent.ConcurrentMap; */ public class MSQTestSegmentManager { - private final ConcurrentMap dataSegments = new ConcurrentHashMap<>(); + private final ConcurrentMap testGeneratedDataSegments = new ConcurrentHashMap<>(); private final ConcurrentMap segments = new ConcurrentHashMap<>(); private final SegmentCacheManager segmentCacheManager; - private final IndexIO indexIO; final Object lock = new Object(); - public MSQTestSegmentManager(SegmentCacheManager segmentCacheManager, IndexIO indexIO) + public MSQTestSegmentManager(SegmentCacheManager segmentCacheManager) { this.segmentCacheManager = segmentCacheManager; - this.indexIO = indexIO; } - public void addDataSegment(DataSegment dataSegment) + /** + * Registers a data segment which was generated during the test run (as opposed to during setup). This is used to + * validate which segments are generated by the test. + */ + public void addTestGeneratedDataSegment(DataSegment dataSegment) { synchronized (lock) { - dataSegments.put(dataSegment.getId(), dataSegment); + testGeneratedDataSegments.put(dataSegment.getId(), dataSegment); try { segmentCacheManager.getSegmentFiles(dataSegment); @@ -65,9 +66,9 @@ public class MSQTestSegmentManager } } - public Collection getAllDataSegments() + public Collection getAllTestGeneratedDataSegments() { - return dataSegments.values(); + return testGeneratedDataSegments.values(); } public void addSegment(Segment segment) diff --git a/processing/src/main/java/org/apache/druid/segment/CompleteSegment.java b/processing/src/main/java/org/apache/druid/segment/CompleteSegment.java new file mode 100644 index 00000000000..d44781774ae --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/CompleteSegment.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.IOException; +import java.util.Objects; + +/** + * Contains the {@link DataSegment} and {@link Segment}. The datasegment could be null if the segment is a dummy, such + * as those created by {@link org.apache.druid.msq.input.inline.InlineInputSliceReader}. + */ +public class CompleteSegment implements Closeable +{ + @Nullable + private final DataSegment dataSegment; + private final Segment segment; + + public CompleteSegment(@Nullable DataSegment dataSegment, Segment segment) + { + this.dataSegment = dataSegment; + this.segment = segment; + } + + @Nullable + @SuppressWarnings("unused") + public DataSegment getDataSegment() + { + return dataSegment; + } + + public Segment getSegment() + { + return segment; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CompleteSegment that = (CompleteSegment) o; + return Objects.equals(dataSegment, that.dataSegment) && Objects.equals(segment, that.segment); + } + + @Override + public int hashCode() + { + return Objects.hash(dataSegment, segment); + } + + @Override + public void close() throws IOException + { + segment.close(); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/CompleteSegmentTest.java b/processing/src/test/java/org/apache/druid/segment/CompleteSegmentTest.java new file mode 100644 index 00000000000..2c256c1d2ad --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/CompleteSegmentTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.Test; + +import java.io.IOException; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class CompleteSegmentTest +{ + @Test + public void testCloseSegment() throws IOException + { + Segment segment = mock(Segment.class); + CompleteSegment completeSegment = new CompleteSegment(null, segment); + completeSegment.close(); + verify(segment).close(); + } + + @Test + public void testEquals() + { + EqualsVerifier.forClass(CompleteSegment.class) + .withNonnullFields("segment", "dataSegment") + .usingGetClass() + .verify(); + } +}