diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java index c327ec340ae..04b08f9346a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java @@ -29,10 +29,10 @@ import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.msq.counters.ChannelCounters; import org.apache.druid.msq.querykit.DataSegmentProvider; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; -import org.apache.druid.segment.Segment; import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.timeline.DataSegment; @@ -70,7 +70,7 @@ public class TaskDataSegmentProvider implements DataSegmentProvider } @Override - public Supplier> fetchSegment( + public Supplier> fetchSegment( final SegmentId segmentId, final ChannelCounters channelCounters, final boolean isReindex @@ -79,7 +79,7 @@ public class TaskDataSegmentProvider implements DataSegmentProvider // Returns Supplier instead of ResourceHolder, so the Coordinator calls and segment downloads happen // in processing threads, rather than the main thread. (They happen when fetchSegmentInternal is called.) return () -> { - ResourceHolder holder = null; + ResourceHolder holder = null; while (holder == null) { holder = holders.computeIfAbsent( @@ -99,7 +99,7 @@ public class TaskDataSegmentProvider implements DataSegmentProvider * Helper used by {@link #fetchSegment(SegmentId, ChannelCounters, boolean)}. Does the actual fetching of a segment, once it * is determined that we definitely need to go out and get one. */ - private ReferenceCountingResourceHolder fetchSegmentInternal( + private ReferenceCountingResourceHolder fetchSegmentInternal( final SegmentId segmentId, final ChannelCounters channelCounters, final boolean isReindex @@ -133,7 +133,7 @@ public class TaskDataSegmentProvider implements DataSegmentProvider final int numRows = index.getNumRows(); final long size = dataSegment.getSize(); closer.register(() -> channelCounters.addFile(numRows, size)); - return new ReferenceCountingResourceHolder<>(segment, closer); + return new ReferenceCountingResourceHolder<>(new CompleteSegment(dataSegment, segment), closer); } catch (IOException | SegmentLoadingException e) { throw CloseableUtils.closeInCatch( @@ -143,13 +143,13 @@ public class TaskDataSegmentProvider implements DataSegmentProvider } } - private static class SegmentHolder implements Supplier> + private static class SegmentHolder implements Supplier> { - private final Supplier> holderSupplier; + private final Supplier> holderSupplier; private final Closeable cleanupFn; @GuardedBy("this") - private ReferenceCountingResourceHolder holder; + private ReferenceCountingResourceHolder holder; @GuardedBy("this") private boolean closing; @@ -157,7 +157,7 @@ public class TaskDataSegmentProvider implements DataSegmentProvider @GuardedBy("this") private boolean closed; - public SegmentHolder(Supplier> holderSupplier, Closeable cleanupFn) + public SegmentHolder(Supplier> holderSupplier, Closeable cleanupFn) { this.holderSupplier = holderSupplier; this.cleanupFn = cleanupFn; @@ -165,7 +165,7 @@ public class TaskDataSegmentProvider implements DataSegmentProvider @Override @Nullable - public ResourceHolder get() + public ResourceHolder get() { synchronized (this) { if (closing) { @@ -183,7 +183,7 @@ public class TaskDataSegmentProvider implements DataSegmentProvider // Then, return null so "fetchSegment" will try again. return null; } else if (holder == null) { - final ResourceHolder segmentHolder = holderSupplier.get(); + final ResourceHolder segmentHolder = holderSupplier.get(); holder = new ReferenceCountingResourceHolder<>( segmentHolder.get(), () -> { @@ -210,7 +210,7 @@ public class TaskDataSegmentProvider implements DataSegmentProvider } } ); - final ResourceHolder retVal = holder.increment(); + final ResourceHolder retVal = holder.increment(); // Store already-closed holder, so it disappears when the last reference is closed. holder.close(); return retVal; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java index 4b68a3bf1b0..2a863fa5525 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java @@ -45,6 +45,7 @@ import org.apache.druid.msq.input.ReadableInputs; import org.apache.druid.msq.input.table.RichSegmentDescriptor; import org.apache.druid.msq.input.table.SegmentWithDescriptor; import org.apache.druid.msq.util.DimensionSchemaUtils; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.RowBasedSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.column.ColumnHolder; @@ -163,7 +164,7 @@ public class ExternalInputSliceReader implements InputSliceReader signature ); return new SegmentWithDescriptor( - () -> ResourceHolder.fromCloseable(segment), + () -> ResourceHolder.fromCloseable(new CompleteSegment(null, segment)), new RichSegmentDescriptor(segmentId.toDescriptor(), null) ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSliceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSliceReader.java index ef58c7723b3..8a05ce1527e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSliceReader.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSliceReader.java @@ -30,6 +30,7 @@ import org.apache.druid.msq.input.ReadableInputs; import org.apache.druid.msq.input.table.RichSegmentDescriptor; import org.apache.druid.msq.input.table.SegmentWithDescriptor; import org.apache.druid.query.InlineDataSource; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.InlineSegmentWrangler; import org.apache.druid.segment.SegmentWrangler; import org.apache.druid.timeline.SegmentId; @@ -74,7 +75,7 @@ public class InlineInputSliceReader implements InputSliceReader segmentWrangler.getSegmentsForIntervals(dataSource, Intervals.ONLY_ETERNITY), segment -> ReadableInput.segment( new SegmentWithDescriptor( - () -> ResourceHolder.fromCloseable(segment), + () -> ResourceHolder.fromCloseable(new CompleteSegment(null, segment)), DUMMY_SEGMENT_DESCRIPTOR ) ) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSliceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSliceReader.java index 2b327f216f7..85f0b10718d 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSliceReader.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSliceReader.java @@ -32,6 +32,7 @@ import org.apache.druid.msq.input.ReadableInputs; import org.apache.druid.msq.input.table.RichSegmentDescriptor; import org.apache.druid.msq.input.table.SegmentWithDescriptor; import org.apache.druid.query.LookupDataSource; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.SegmentWrangler; import org.apache.druid.timeline.SegmentId; @@ -98,7 +99,7 @@ public class LookupInputSliceReader implements InputSliceReader throw new ISE("Lookup[%s] has multiple segments; cannot read", lookupName); } - return ResourceHolder.fromCloseable(segment); + return ResourceHolder.fromCloseable(new CompleteSegment(null, segment)); }, new RichSegmentDescriptor(SegmentId.dummy(lookupName).toDescriptor(), null) ) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java index b9026c7b9fb..343f7994d1e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java @@ -21,6 +21,7 @@ package org.apache.druid.msq.input.table; import com.google.common.base.Preconditions; import org.apache.druid.collections.ResourceHolder; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.Segment; import java.util.Objects; @@ -31,7 +32,7 @@ import java.util.function.Supplier; */ public class SegmentWithDescriptor { - private final Supplier> segmentSupplier; + private final Supplier> segmentSupplier; private final RichSegmentDescriptor descriptor; /** @@ -42,7 +43,7 @@ public class SegmentWithDescriptor * @param descriptor segment descriptor */ public SegmentWithDescriptor( - final Supplier> segmentSupplier, + final Supplier> segmentSupplier, final RichSegmentDescriptor descriptor ) { @@ -59,7 +60,7 @@ public class SegmentWithDescriptor * It is not necessary to call {@link Segment#close()} on the returned segment. Calling {@link ResourceHolder#close()} * is enough. */ - public ResourceHolder getOrLoad() + public ResourceHolder getOrLoad() { return segmentSupplier.get(); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java index 91ee4a48788..232e85166a0 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java @@ -21,7 +21,7 @@ package org.apache.druid.msq.querykit; import org.apache.druid.collections.ResourceHolder; import org.apache.druid.msq.counters.ChannelCounters; -import org.apache.druid.segment.Segment; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.timeline.SegmentId; import java.util.function.Supplier; @@ -35,7 +35,7 @@ public interface DataSegmentProvider *
* It is not necessary to call {@link ResourceHolder#close()} if you never call {@link Supplier#get()}. */ - Supplier> fetchSegment( + Supplier> fetchSegment( SegmentId segmentId, ChannelCounters channelCounters, boolean isReindex diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java index 05c6f36c35f..ad456a45f5b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java @@ -54,7 +54,7 @@ import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.segment.ColumnSelectorFactory; -import org.apache.druid.segment.Segment; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.TimeBoundaryInspector; import org.apache.druid.segment.column.RowSignature; @@ -152,8 +152,8 @@ public class GroupByPreShuffleFrameProcessor extends BaseLeafFrameProcessor protected ReturnOrAwait runWithSegment(final SegmentWithDescriptor segment) throws IOException { if (resultYielder == null) { - final ResourceHolder segmentHolder = closer.register(segment.getOrLoad()); - final SegmentReference mappedSegment = mapSegment(segmentHolder.get()); + final ResourceHolder segmentHolder = closer.register(segment.getOrLoad()); + final SegmentReference mappedSegment = mapSegment(segmentHolder.get().getSegment()); final Sequence rowSequence = groupingEngine.process( diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java index fbcae67012a..e5fa0a03d62 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java @@ -64,6 +64,7 @@ import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.scan.ScanQueryEngine; import org.apache.druid.query.scan.ScanResultValue; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.CursorFactory; import org.apache.druid.segment.CursorHolder; @@ -245,9 +246,9 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor protected ReturnOrAwait runWithSegment(final SegmentWithDescriptor segment) throws IOException { if (cursor == null) { - final ResourceHolder segmentHolder = closer.register(segment.getOrLoad()); + final ResourceHolder segmentHolder = closer.register(segment.getOrLoad()); - final Segment mappedSegment = mapSegment(segmentHolder.get()); + final Segment mappedSegment = mapSegment(segmentHolder.get().getSegment()); final CursorFactory cursorFactory = mappedSegment.asCursorFactory(); if (cursorFactory == null) { throw new ISE( @@ -264,7 +265,7 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor // No cursors! return ReturnOrAwait.returnObject(Unit.instance()); } else { - final long rowsFlushed = setNextCursor(nextCursor, segmentHolder.get()); + final long rowsFlushed = setNextCursor(nextCursor, segmentHolder.get().getSegment()); assert rowsFlushed == 0; // There's only ever one cursor when running with a segment } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java index b5141e12dc8..a9bf5e91bb4 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java @@ -43,11 +43,11 @@ import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.msq.counters.ChannelCounters; import org.apache.druid.query.OrderBy; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.DimensionHandler; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.Metadata; import org.apache.druid.segment.QueryableIndex; -import org.apache.druid.segment.Segment; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.TestIndex; import org.apache.druid.segment.column.ColumnHolder; @@ -187,7 +187,7 @@ public class TaskDataSegmentProviderTest for (int i = 0; i < iterations; i++) { final int expectedSegmentNumber = i % NUM_SEGMENTS; final DataSegment segment = segments.get(expectedSegmentNumber); - final ListenableFuture>> f = + final ListenableFuture>> f = exec.submit(() -> provider.fetchSegment(segment.getId(), new ChannelCounters(), false)); testFutures.add( @@ -195,8 +195,8 @@ public class TaskDataSegmentProviderTest f, holderSupplier -> { try { - final ResourceHolder holder = holderSupplier.get(); - Assert.assertEquals(segment.getId(), holder.get().getId()); + final ResourceHolder holder = holderSupplier.get(); + Assert.assertEquals(segment.getId(), holder.get().getSegment().getId()); final String expectedStorageDir = DataSegmentPusher.getDefaultStorageDir(segment, false); final File expectedFile = new File( diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java index 13dac0da6d4..7d65cea9872 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java @@ -61,15 +61,14 @@ import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryRunnerTest; import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.TestGroupByBuffers; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.CursorFactory; import org.apache.druid.segment.IndexBuilder; -import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexCursorFactory; import org.apache.druid.segment.Segment; import org.apache.druid.segment.TestIndex; -import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.loading.DataSegmentPusher; @@ -86,6 +85,7 @@ import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.calcite.util.TestDataBuilder; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.partition.LinearShardSpec; import org.easymock.EasyMock; import org.joda.time.Interval; import org.mockito.Mockito; @@ -161,11 +161,10 @@ public class CalciteMSQTestsHelper ) ); ObjectMapper testMapper = MSQTestBase.setupObjectMapper(dummyInjector); - IndexIO indexIO = new IndexIO(testMapper, ColumnConfig.DEFAULT); SegmentCacheManager segmentCacheManager = new SegmentCacheManagerFactory(TestIndex.INDEX_IO, testMapper) .manufacturate(cacheManagerDir); LocalDataSegmentPusherConfig config = new LocalDataSegmentPusherConfig(); - MSQTestSegmentManager segmentManager = new MSQTestSegmentManager(segmentCacheManager, indexIO); + MSQTestSegmentManager segmentManager = new MSQTestSegmentManager(segmentCacheManager); config.storageDirectory = storageDir; binder.bind(DataSegmentPusher.class).toProvider(() -> new MSQTestDelegateDataSegmentPusher( new LocalDataSegmentPusher(config), @@ -206,7 +205,10 @@ public class CalciteMSQTestsHelper return mockFactory; } - private static Supplier> getSupplierForSegment(Function tempFolderProducer, SegmentId segmentId) + protected static Supplier> getSupplierForSegment( + Function tempFolderProducer, + SegmentId segmentId + ) { final QueryableIndex index; switch (segmentId.getDataSource()) { @@ -450,6 +452,13 @@ public class CalciteMSQTestsHelper { } }; - return () -> new ReferenceCountingResourceHolder<>(segment, Closer.create()); + DataSegment dataSegment = DataSegment.builder() + .dataSource(segmentId.getDataSource()) + .interval(segmentId.getInterval()) + .version(segmentId.getVersion()) + .shardSpec(new LinearShardSpec(0)) + .size(0) + .build(); + return () -> new ReferenceCountingResourceHolder<>(new CompleteSegment(dataSegment, segment), Closer.create()); } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index ced9b85bdc0..6cfc37df9dd 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -134,6 +134,7 @@ import org.apache.druid.query.groupby.GroupByQueryRunnerTest; import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.TestGroupByBuffers; import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.CursorFactory; import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.IndexIO; @@ -197,6 +198,7 @@ import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.PruneLoadSpec; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.partition.LinearShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.ShardSpec; import org.apache.druid.timeline.partition.TombstoneShardSpec; @@ -421,7 +423,7 @@ public class MSQTestBase extends BaseCalciteQueryTest MSQSqlModule sqlModule = new MSQSqlModule(); - segmentManager = new MSQTestSegmentManager(segmentCacheManager, indexIO); + segmentManager = new MSQTestSegmentManager(segmentCacheManager); BrokerClient brokerClient = mock(BrokerClient.class); List modules = ImmutableList.of( @@ -631,7 +633,10 @@ public class MSQTestBase extends BaseCalciteQueryTest } @Nonnull - private Supplier> getSupplierForSegment(Function tempFolderProducer, SegmentId segmentId) + protected Supplier> getSupplierForSegment( + Function tempFolderProducer, + SegmentId segmentId + ) { if (segmentManager.getSegment(segmentId) == null) { final QueryableIndex index; @@ -722,7 +727,14 @@ public class MSQTestBase extends BaseCalciteQueryTest }; segmentManager.addSegment(segment); } - return () -> ReferenceCountingResourceHolder.fromCloseable(segmentManager.getSegment(segmentId)); + DataSegment dataSegment = DataSegment.builder() + .dataSource(segmentId.getDataSource()) + .interval(segmentId.getInterval()) + .version(segmentId.getVersion()) + .shardSpec(new LinearShardSpec(0)) + .size(0) + .build(); + return () -> ReferenceCountingResourceHolder.fromCloseable(new CompleteSegment(dataSegment, segmentManager.getSegment(segmentId))); } public SelectTester testSelectQuery() @@ -1232,17 +1244,17 @@ public class MSQTestBase extends BaseCalciteQueryTest verifyLookupLoadingInfoInTaskContext(msqControllerTask.getContext()); log.info( "found generated segments: %s", - segmentManager.getAllDataSegments().stream().map(s -> s.toString()).collect( + segmentManager.getAllTestGeneratedDataSegments().stream().map(s -> s.toString()).collect( Collectors.joining("\n")) ); // check if segments are created if (!expectedResultRows.isEmpty()) { - Assert.assertNotEquals(0, segmentManager.getAllDataSegments().size()); + Assert.assertNotEquals(0, segmentManager.getAllTestGeneratedDataSegments().size()); } String foundDataSource = null; SortedMap>> segmentIdVsOutputRowsMap = new TreeMap<>(); - for (DataSegment dataSegment : segmentManager.getAllDataSegments()) { + for (DataSegment dataSegment : segmentManager.getAllTestGeneratedDataSegments()) { //Assert shard spec class Assert.assertEquals(expectedShardSpec, dataSegment.getShardSpec().getClass()); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestDelegateDataSegmentPusher.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestDelegateDataSegmentPusher.java index 73fca53682c..5d22c08e602 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestDelegateDataSegmentPusher.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestDelegateDataSegmentPusher.java @@ -60,7 +60,7 @@ public class MSQTestDelegateDataSegmentPusher implements DataSegmentPusher public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException { final DataSegment dataSegment = delegate.push(file, segment, useUniquePath); - segmentManager.addDataSegment(dataSegment); + segmentManager.addTestGeneratedDataSegment(dataSegment); return dataSegment; } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestSegmentManager.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestSegmentManager.java index 6151cb37cc2..2c0fdac3a69 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestSegmentManager.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestSegmentManager.java @@ -20,7 +20,6 @@ package org.apache.druid.msq.test; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.Segment; import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.loading.SegmentLoadingException; @@ -37,24 +36,26 @@ import java.util.concurrent.ConcurrentMap; */ public class MSQTestSegmentManager { - private final ConcurrentMap dataSegments = new ConcurrentHashMap<>(); + private final ConcurrentMap testGeneratedDataSegments = new ConcurrentHashMap<>(); private final ConcurrentMap segments = new ConcurrentHashMap<>(); private final SegmentCacheManager segmentCacheManager; - private final IndexIO indexIO; final Object lock = new Object(); - public MSQTestSegmentManager(SegmentCacheManager segmentCacheManager, IndexIO indexIO) + public MSQTestSegmentManager(SegmentCacheManager segmentCacheManager) { this.segmentCacheManager = segmentCacheManager; - this.indexIO = indexIO; } - public void addDataSegment(DataSegment dataSegment) + /** + * Registers a data segment which was generated during the test run (as opposed to during setup). This is used to + * validate which segments are generated by the test. + */ + public void addTestGeneratedDataSegment(DataSegment dataSegment) { synchronized (lock) { - dataSegments.put(dataSegment.getId(), dataSegment); + testGeneratedDataSegments.put(dataSegment.getId(), dataSegment); try { segmentCacheManager.getSegmentFiles(dataSegment); @@ -65,9 +66,9 @@ public class MSQTestSegmentManager } } - public Collection getAllDataSegments() + public Collection getAllTestGeneratedDataSegments() { - return dataSegments.values(); + return testGeneratedDataSegments.values(); } public void addSegment(Segment segment) diff --git a/processing/src/main/java/org/apache/druid/segment/CompleteSegment.java b/processing/src/main/java/org/apache/druid/segment/CompleteSegment.java new file mode 100644 index 00000000000..d44781774ae --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/CompleteSegment.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.IOException; +import java.util.Objects; + +/** + * Contains the {@link DataSegment} and {@link Segment}. The datasegment could be null if the segment is a dummy, such + * as those created by {@link org.apache.druid.msq.input.inline.InlineInputSliceReader}. + */ +public class CompleteSegment implements Closeable +{ + @Nullable + private final DataSegment dataSegment; + private final Segment segment; + + public CompleteSegment(@Nullable DataSegment dataSegment, Segment segment) + { + this.dataSegment = dataSegment; + this.segment = segment; + } + + @Nullable + @SuppressWarnings("unused") + public DataSegment getDataSegment() + { + return dataSegment; + } + + public Segment getSegment() + { + return segment; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CompleteSegment that = (CompleteSegment) o; + return Objects.equals(dataSegment, that.dataSegment) && Objects.equals(segment, that.segment); + } + + @Override + public int hashCode() + { + return Objects.hash(dataSegment, segment); + } + + @Override + public void close() throws IOException + { + segment.close(); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/CompleteSegmentTest.java b/processing/src/test/java/org/apache/druid/segment/CompleteSegmentTest.java new file mode 100644 index 00000000000..2c256c1d2ad --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/CompleteSegmentTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.Test; + +import java.io.IOException; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class CompleteSegmentTest +{ + @Test + public void testCloseSegment() throws IOException + { + Segment segment = mock(Segment.class); + CompleteSegment completeSegment = new CompleteSegment(null, segment); + completeSegment.close(); + verify(segment).close(); + } + + @Test + public void testEquals() + { + EqualsVerifier.forClass(CompleteSegment.class) + .withNonnullFields("segment", "dataSegment") + .usingGetClass() + .verify(); + } +}