diff --git a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java index 87ff2c1e1c8..53100335212 100644 --- a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java @@ -28,7 +28,7 @@ import org.apache.druid.server.coordinator.duty.CompactionSegmentIterator; import org.apache.druid.server.coordinator.duty.CompactionSegmentSearchPolicy; import org.apache.druid.server.coordinator.duty.NewestSegmentFirstPolicy; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.ShardSpec; import org.joda.time.DateTime; @@ -82,7 +82,7 @@ public class NewestSegmentFirstPolicyBenchmark private int numCompactionTaskSlots; private Map compactionConfigs; - private Map> dataSources; + private Map dataSources; @Setup(Level.Trial) public void setup() diff --git a/benchmarks/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineBenchmark.java b/benchmarks/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineBenchmark.java index 837c8770290..a8a4414614c 100644 --- a/benchmarks/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineBenchmark.java @@ -78,7 +78,7 @@ public class VersionedIntervalTimelineBenchmark private List intervals; private List segments; - private VersionedIntervalTimeline timeline; + private SegmentTimeline timeline; private List newSegments; @Setup @@ -143,7 +143,7 @@ public class VersionedIntervalTimelineBenchmark nextMinorVersions.put(interval, (short) (numNonRootGenerations + 1)); } - timeline = VersionedIntervalTimeline.forSegments(segments); + timeline = SegmentTimeline.forSegments(segments); newSegments = new ArrayList<>(200); @@ -206,7 +206,7 @@ public class VersionedIntervalTimelineBenchmark @Benchmark public void benchAdd(Blackhole blackhole) { - final VersionedIntervalTimeline timeline = VersionedIntervalTimeline.forSegments(segments); + final SegmentTimeline timeline = SegmentTimeline.forSegments(segments); for (DataSegment newSegment : newSegments) { timeline.add( newSegment.getInterval(), @@ -220,7 +220,7 @@ public class VersionedIntervalTimelineBenchmark public void benchRemove(Blackhole blackhole) { final List segmentsCopy = new ArrayList<>(segments); - final VersionedIntervalTimeline timeline = VersionedIntervalTimeline.forSegments(segmentsCopy); + final SegmentTimeline timeline = SegmentTimeline.forSegments(segmentsCopy); final int numTests = (int) (segmentsCopy.size() * 0.1); for (int i = 0; i < numTests; i++) { final DataSegment segment = segmentsCopy.remove(ThreadLocalRandom.current().nextInt(segmentsCopy.size())); diff --git a/core/src/main/java/org/apache/druid/timeline/Overshadowable.java b/core/src/main/java/org/apache/druid/timeline/Overshadowable.java index e10b75bff86..724eaa20c14 100644 --- a/core/src/main/java/org/apache/druid/timeline/Overshadowable.java +++ b/core/src/main/java/org/apache/druid/timeline/Overshadowable.java @@ -27,7 +27,7 @@ package org.apache.druid.timeline; * An Overshadowable overshadows another if its root partition range contains that of another * and has a higher minorVersion. For more details, check https://github.com/apache/druid/issues/7491. */ -public interface Overshadowable +public interface Overshadowable> { /** * Returns true if this overshadowable overshadows the other. diff --git a/core/src/main/java/org/apache/druid/timeline/SegmentTimeline.java b/core/src/main/java/org/apache/druid/timeline/SegmentTimeline.java new file mode 100644 index 00000000000..e9c737b0a92 --- /dev/null +++ b/core/src/main/java/org/apache/druid/timeline/SegmentTimeline.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.timeline; + +import java.util.Comparator; +import java.util.Iterator; + +/** + * {@link VersionedIntervalTimeline} for {@link DataSegment} objects. + */ +public class SegmentTimeline extends VersionedIntervalTimeline +{ + public static SegmentTimeline forSegments(Iterable segments) + { + return forSegments(segments.iterator()); + } + + public static SegmentTimeline forSegments(Iterator segments) + { + final SegmentTimeline timeline = new SegmentTimeline(); + VersionedIntervalTimeline.addSegments(timeline, segments); + return timeline; + } + + public SegmentTimeline() + { + super(Comparator.naturalOrder()); + } + + public boolean isOvershadowed(DataSegment segment) + { + return isOvershadowed(segment.getInterval(), segment.getVersion(), segment); + } + +} diff --git a/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java b/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java index 4a29a82d738..9dba3c1c6fd 100644 --- a/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java +++ b/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java @@ -73,19 +73,6 @@ import java.util.stream.StreamSupport; public class VersionedIntervalTimeline> implements TimelineLookup { - public static VersionedIntervalTimeline forSegments(Iterable segments) - { - return forSegments(segments.iterator()); - } - - public static VersionedIntervalTimeline forSegments(Iterator segments) - { - final VersionedIntervalTimeline timeline = - new VersionedIntervalTimeline<>(Comparator.naturalOrder()); - addSegments(timeline, segments); - return timeline; - } - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); // Below timelines stores only *visible* timelineEntries @@ -106,16 +93,16 @@ public class VersionedIntervalTimeline versionComparator; // Set this to true if the client needs to skip tombstones upon lookup (like the broker) - private boolean skipObjectsWithNoData = false; + private final boolean skipObjectsWithNoData; public VersionedIntervalTimeline(Comparator versionComparator) { - this.versionComparator = versionComparator; + this(versionComparator, false); } public VersionedIntervalTimeline(Comparator versionComparator, boolean skipObjectsWithNoData) { - this(versionComparator); + this.versionComparator = versionComparator; this.skipObjectsWithNoData = skipObjectsWithNoData; } diff --git a/core/src/test/java/org/apache/druid/timeline/SegmentTimelineTest.java b/core/src/test/java/org/apache/druid/timeline/SegmentTimelineTest.java new file mode 100644 index 00000000000..0d765f24501 --- /dev/null +++ b/core/src/test/java/org/apache/druid/timeline/SegmentTimelineTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.timeline; + +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; + +public class SegmentTimelineTest +{ + + @Test + public void testIsOvershadowed() + { + final SegmentTimeline timeline = SegmentTimeline.forSegments( + Arrays.asList( + createSegment("2022-01-01/2022-01-02", "v1", 0, 3), + createSegment("2022-01-01/2022-01-02", "v1", 1, 3), + createSegment("2022-01-01/2022-01-02", "v1", 2, 3), + createSegment("2022-01-02/2022-01-03", "v2", 0, 2), + createSegment("2022-01-02/2022-01-03", "v2", 1, 2) + ) + ); + + Assert.assertFalse( + timeline.isOvershadowed(createSegment("2022-01-01/2022-01-02", "v1", 1, 3)) + ); + Assert.assertFalse( + timeline.isOvershadowed(createSegment("2022-01-01/2022-01-02", "v1", 2, 3)) + ); + Assert.assertFalse( + timeline.isOvershadowed(createSegment("2022-01-01/2022-01-02", "v1", 1, 4)) + ); + Assert.assertFalse( + timeline.isOvershadowed(createSegment("2022-01-01T00:00:00/2022-01-01T06:00:00", "v1", 1, 4)) + ); + + Assert.assertTrue( + timeline.isOvershadowed(createSegment("2022-01-02/2022-01-03", "v1", 2, 4)) + ); + Assert.assertTrue( + timeline.isOvershadowed(createSegment("2022-01-02/2022-01-03", "v1", 0, 1)) + ); + } + + private DataSegment createSegment(String interval, String version, int partitionNum, int totalNumPartitions) + { + return new DataSegment( + "wiki", + Intervals.of(interval), + version, + Collections.emptyMap(), + Collections.emptyList(), + Collections.emptyList(), + new NumberedShardSpec(partitionNum, totalNumPartitions), + 0x9, + 1L + ); + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index eb7609c32ea..a2f93280006 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -175,7 +175,7 @@ import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.server.DruidNode; import org.apache.druid.sql.calcite.rel.DruidQuery; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.partition.DimensionRangeShardSpec; import org.apache.druid.timeline.partition.NumberedPartialShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; @@ -957,7 +957,7 @@ public class ControllerImpl implements Controller if (dataSegments.isEmpty()) { return Optional.empty(); } else { - return Optional.of(VersionedIntervalTimeline.forSegments(dataSegments)); + return Optional.of(SegmentTimeline.forSegments(dataSegments)); } }; } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecSlicerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecSlicerTest.java index fe55fa1327b..5d553ffc7f1 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecSlicerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecSlicerTest.java @@ -27,7 +27,7 @@ import org.apache.druid.msq.querykit.DataSegmentTimelineView; import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.partition.DimensionRangeShardSpec; import org.junit.Assert; import org.junit.Before; @@ -79,15 +79,14 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest BYTES_PER_SEGMENT ); - private VersionedIntervalTimeline timeline; - private DataSegmentTimelineView timelineView; + private SegmentTimeline timeline; private TableInputSpecSlicer slicer; @Before public void setUp() { - timeline = VersionedIntervalTimeline.forSegments(ImmutableList.of(SEGMENT1, SEGMENT2)); - timelineView = (dataSource, intervals) -> { + timeline = SegmentTimeline.forSegments(ImmutableList.of(SEGMENT1, SEGMENT2)); + DataSegmentTimelineView timelineView = (dataSource, intervals) -> { if (DATASOURCE.equals(dataSource)) { return Optional.of(timeline); } else { diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIngestionSpec.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIngestionSpec.java index 2aa91e24578..0618d22d717 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIngestionSpec.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIngestionSpec.java @@ -30,13 +30,12 @@ import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.IngestionSpec; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.TimelineObjectHolder; -import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; import org.joda.time.Interval; import javax.annotation.Nullable; - import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -199,8 +198,7 @@ public class HadoopIngestionSpec extends IngestionSpec timeline = - VersionedIntervalTimeline.forSegments(usedVisibleSegments); + final SegmentTimeline timeline = SegmentTimeline.forSegments(usedVisibleSegments); final List windowedSegments = new ArrayList<>(); for (Interval interval : ingestionSpecObj.getIntervals()) { final List> timeLineSegments = timeline.lookup(interval); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index 8fcadc301d2..b0c4927d926 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -74,7 +74,7 @@ import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.Partitions; -import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; import org.apache.druid.timeline.partition.TombstoneShardSpec; import org.joda.time.DateTime; @@ -483,7 +483,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask } else { // Use segment lock // Create a timeline to find latest segments only - final VersionedIntervalTimeline timeline = VersionedIntervalTimeline.forSegments( + final SegmentTimeline timeline = SegmentTimeline.forSegments( segments ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index c521dbf3b89..4ffa3581fe3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -96,8 +96,8 @@ import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.apache.druid.server.coordinator.duty.CompactSegments; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.TimelineObjectHolder; -import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionHolder; import org.joda.time.Duration; @@ -735,7 +735,7 @@ public class CompactionTask extends AbstractBatchIndexTask segmentProvider.findSegments(toolbox.getTaskActionClient()); segmentProvider.checkSegments(lockGranularityInUse, usedSegments); final Map segmentFileMap = toolbox.fetchSegments(usedSegments); - final List> timelineSegments = VersionedIntervalTimeline + final List> timelineSegments = SegmentTimeline .forSegments(usedSegments) .lookup(segmentProvider.interval); return new NonnullPair<>(segmentFileMap, timelineSegments); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index db325f8778d..87478be6ab0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -71,8 +71,8 @@ import org.apache.druid.segment.realtime.firehose.ChatHandler; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.TimelineObjectHolder; -import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.joda.time.Interval; @@ -267,7 +267,7 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand // Find inputSegments overshadowed by pushedSegments final Set allSegments = new HashSet<>(getTaskLockHelper().getLockedExistingSegments()); allSegments.addAll(pushedSegments); - final VersionedIntervalTimeline timeline = VersionedIntervalTimeline.forSegments(allSegments); + final SegmentTimeline timeline = SegmentTimeline.forSegments(allSegments); final Set oldSegments = FluentIterable.from(timeline.findFullyOvershadowed()) .transformAndConcat(TimelineObjectHolder::getObject) .transform(PartitionChunk::getObject) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index d10f3d3e9f6..f6262975463 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -56,8 +56,8 @@ import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.TimelineObjectHolder; -import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionHolder; import org.apache.druid.utils.Streams; @@ -509,7 +509,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI } } - return VersionedIntervalTimeline.forSegments(usedSegments).lookup(interval); + return SegmentTimeline.forSegments(usedSegments).lookup(interval); } public static List> getTimelineForSegmentIds( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java index e1907c9dcab..b35e660b909 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java @@ -50,7 +50,7 @@ import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.Partitions; -import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.joda.time.Interval; @@ -241,7 +241,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv : getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", inputInterval, Segments.ONLY_VISIBLE); Assert.assertFalse(newSegments.isEmpty()); allSegments.addAll(newSegments); - final VersionedIntervalTimeline timeline = VersionedIntervalTimeline.forSegments(allSegments); + final SegmentTimeline timeline = SegmentTimeline.forSegments(allSegments); final Interval timelineInterval = inputInterval == null ? Intervals.ETERNITY : inputInterval; final Set visibles = timeline.findNonOvershadowedObjectsInInterval( @@ -606,7 +606,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv final Collection newSegments = getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE); Assert.assertTrue(newSegments.containsAll(oldSegments)); - final VersionedIntervalTimeline timeline = VersionedIntervalTimeline.forSegments(newSegments); + final SegmentTimeline timeline = SegmentTimeline.forSegments(newSegments); final Set visibles = timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE); Assert.assertEquals(new HashSet<>(newSegments), visibles); } @@ -663,8 +663,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv final Collection afterAppendSegments = getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE); Assert.assertTrue(afterAppendSegments.containsAll(beforeAppendSegments)); - final VersionedIntervalTimeline timeline = VersionedIntervalTimeline - .forSegments(afterAppendSegments); + final SegmentTimeline timeline = SegmentTimeline.forSegments(afterAppendSegments); final Set visibles = timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE); Assert.assertEquals(new HashSet<>(afterAppendSegments), visibles); } diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java index 5b080cd7448..b46b5ecf079 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java @@ -42,8 +42,8 @@ import org.apache.druid.testing.clients.ClientInfoResourceTestClient; import org.apache.druid.testing.utils.ITRetryUtil; import org.apache.druid.testing.utils.SqlTestQueryHelper; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.TimelineObjectHolder; -import org.apache.druid.timeline.VersionedIntervalTimeline; import org.junit.Assert; import java.io.IOException; @@ -359,7 +359,7 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest if (waitForNewVersion) { ITRetryUtil.retryUntilTrue( () -> { - final VersionedIntervalTimeline timeline = VersionedIntervalTimeline.forSegments( + final SegmentTimeline timeline = SegmentTimeline.forSegments( coordinator.getAvailableSegments(dataSourceName) ); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java index 7f39026b712..1a5dd13addb 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java @@ -42,8 +42,8 @@ import org.apache.druid.testing.clients.ClientInfoResourceTestClient; import org.apache.druid.testing.utils.ITRetryUtil; import org.apache.druid.testing.utils.SqlTestQueryHelper; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.TimelineObjectHolder; -import org.apache.druid.timeline.VersionedIntervalTimeline; import org.testng.Assert; import java.io.IOException; @@ -359,7 +359,7 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest if (waitForNewVersion) { ITRetryUtil.retryUntilTrue( () -> { - final VersionedIntervalTimeline timeline = VersionedIntervalTimeline.forSegments( + final SegmentTimeline timeline = SegmentTimeline.forSegments( coordinator.getAvailableSegments(dataSourceName) ); diff --git a/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java b/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java index 6041f2ba16d..aa1641d92be 100644 --- a/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java +++ b/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java @@ -24,7 +24,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import org.apache.druid.metadata.SqlSegmentsMetadataManager; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.utils.CollectionUtils; @@ -56,7 +56,7 @@ public class DataSourcesSnapshot } public static DataSourcesSnapshot fromUsedSegmentsTimelines( - Map> usedSegmentsTimelinesPerDataSource, + Map usedSegmentsTimelinesPerDataSource, ImmutableMap dataSourceProperties ) { @@ -73,8 +73,8 @@ public class DataSourcesSnapshot } private final Map dataSourcesWithAllUsedSegments; - private final Map> usedSegmentsTimelinesPerDataSource; - private final ImmutableSet overshadowedSegments; + private final Map usedSegmentsTimelinesPerDataSource; + private final ImmutableSet overshadowedSegments; public DataSourcesSnapshot(Map dataSourcesWithAllUsedSegments) { @@ -82,14 +82,14 @@ public class DataSourcesSnapshot dataSourcesWithAllUsedSegments, CollectionUtils.mapValues( dataSourcesWithAllUsedSegments, - dataSource -> VersionedIntervalTimeline.forSegments(dataSource.getSegments()) + dataSource -> SegmentTimeline.forSegments(dataSource.getSegments()) ) ); } private DataSourcesSnapshot( Map dataSourcesWithAllUsedSegments, - Map> usedSegmentsTimelinesPerDataSource + Map usedSegmentsTimelinesPerDataSource ) { this.dataSourcesWithAllUsedSegments = dataSourcesWithAllUsedSegments; @@ -113,12 +113,12 @@ public class DataSourcesSnapshot return dataSourcesWithAllUsedSegments.get(dataSourceName); } - public Map> getUsedSegmentsTimelinesPerDataSource() + public Map getUsedSegmentsTimelinesPerDataSource() { return usedSegmentsTimelinesPerDataSource; } - public ImmutableSet getOvershadowedSegments() + public ImmutableSet getOvershadowedSegments() { return overshadowedSegments; } @@ -150,20 +150,20 @@ public class DataSourcesSnapshot * This method should be deduplicated with {@link VersionedIntervalTimeline#findFullyOvershadowed()}: see * https://github.com/apache/druid/issues/8070. * - * @return overshadowed segment Ids list + * @return List of overshadowed segments */ - private List determineOvershadowedSegments() + private List determineOvershadowedSegments() { // It's fine to add all overshadowed segments to a single collection because only // a small fraction of the segments in the cluster are expected to be overshadowed, // so building this collection shouldn't generate a lot of garbage. - final List overshadowedSegments = new ArrayList<>(); + final List overshadowedSegments = new ArrayList<>(); for (ImmutableDruidDataSource dataSource : dataSourcesWithAllUsedSegments.values()) { - VersionedIntervalTimeline usedSegmentsTimeline = + SegmentTimeline usedSegmentsTimeline = usedSegmentsTimelinesPerDataSource.get(dataSource.getName()); for (DataSegment segment : dataSource.getSegments()) { - if (usedSegmentsTimeline.isOvershadowed(segment.getInterval(), segment.getVersion(), segment)) { - overshadowedSegments.add(segment.getId()); + if (usedSegmentsTimeline.isOvershadowed(segment)) { + overshadowedSegments.add(segment); } } } diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index c8046380990..10e95074c55 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -50,8 +50,8 @@ import org.apache.druid.segment.SegmentUtils; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.Partitions; +import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.TimelineObjectHolder; -import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.NoneShardSpec; import org.apache.druid.timeline.partition.PartialShardSpec; import org.apache.druid.timeline.partition.PartitionChunk; @@ -157,7 +157,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor return connector.retryWithHandle( handle -> { if (visibility == Segments.ONLY_VISIBLE) { - final VersionedIntervalTimeline timeline = + final SegmentTimeline timeline = getTimelineForIntervalsWithHandle(handle, dataSource, intervals); return timeline.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, Partitions.ONLY_COMPLETE); } else { @@ -256,7 +256,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor return identifiers; } - private VersionedIntervalTimeline getTimelineForIntervalsWithHandle( + private SegmentTimeline getTimelineForIntervalsWithHandle( final Handle handle, final String dataSource, final List intervals @@ -265,7 +265,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor try (final CloseableIterator iterator = SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper) .retrieveUsedSegments(dataSource, intervals)) { - return VersionedIntervalTimeline.forSegments(iterator); + return SegmentTimeline.forSegments(iterator); } } @@ -323,7 +323,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor // Find which segments are used (i.e. not overshadowed). final Set usedSegments = new HashSet<>(); List> segmentHolders = - VersionedIntervalTimeline.forSegments(segments).lookupWithIncompletePartitions(Intervals.ETERNITY); + SegmentTimeline.forSegments(segments).lookupWithIncompletePartitions(Intervals.ETERNITY); for (TimelineObjectHolder holder : segmentHolders) { for (PartitionChunk chunk : holder.getObject()) { usedSegments.add(chunk.getObject()); diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java index 492f1b5a022..9b41f61f906 100644 --- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java @@ -96,11 +96,6 @@ public interface SegmentsMetadataManager */ Collection getImmutableDataSourcesWithAllUsedSegments(); - /** - * Returns a set of overshadowed segment ids. - */ - Set getOvershadowedSegments(); - /** * Returns a snapshot of DruidDataSources and overshadowed segments */ diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index 4ab71bc4647..c74ec4e5d87 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -49,6 +49,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.Partitions; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.joda.time.DateTime; @@ -558,8 +559,8 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName, @Nullable Interval interval) { final List unusedSegments = new ArrayList<>(); - final VersionedIntervalTimeline timeline = - VersionedIntervalTimeline.forSegments(Collections.emptyList()); + final SegmentTimeline timeline = + SegmentTimeline.forSegments(Collections.emptyIterator()); connector.inReadOnlyTransaction( (handle, status) -> { @@ -593,15 +594,14 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager private int markNonOvershadowedSegmentsAsUsed( List unusedSegments, - VersionedIntervalTimeline timeline + SegmentTimeline timeline ) { List segmentIdsToMarkAsUsed = new ArrayList<>(); for (DataSegment segment : unusedSegments) { - if (timeline.isOvershadowed(segment.getInterval(), segment.getVersion(), segment)) { - continue; + if (!timeline.isOvershadowed(segment)) { + segmentIdsToMarkAsUsed.add(segment.getId()); } - segmentIdsToMarkAsUsed.add(segment.getId()); } return markSegmentsAsUsed(segmentIdsToMarkAsUsed); @@ -612,7 +612,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager throws UnknownSegmentIdsException { try { - Pair, VersionedIntervalTimeline> unusedSegmentsAndTimeline = connector + Pair, SegmentTimeline> unusedSegmentsAndTimeline = connector .inReadOnlyTransaction( (handle, status) -> { List unusedSegments = retrieveUnusedSegments(dataSource, segmentIds, handle); @@ -621,7 +621,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager ); try (CloseableIterator usedSegmentsOverlappingUnusedSegmentsIntervals = retrieveUsedSegmentsOverlappingIntervals(dataSource, unusedSegmentsIntervals, handle)) { - VersionedIntervalTimeline timeline = VersionedIntervalTimeline.forSegments( + SegmentTimeline timeline = SegmentTimeline.forSegments( Iterators.concat(usedSegmentsOverlappingUnusedSegmentsIntervals, unusedSegments.iterator()) ); return new Pair<>(unusedSegments, timeline); @@ -630,7 +630,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager ); List unusedSegments = unusedSegmentsAndTimeline.lhs; - VersionedIntervalTimeline timeline = unusedSegmentsAndTimeline.rhs; + SegmentTimeline timeline = unusedSegmentsAndTimeline.rhs; return markNonOvershadowedSegmentsAsUsed(unusedSegments, timeline); } catch (Exception e) { @@ -800,12 +800,6 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager return getSnapshotOfDataSourcesWithAllUsedSegments().getDataSourcesWithAllUsedSegments(); } - @Override - public Set getOvershadowedSegments() - { - return getSnapshotOfDataSourcesWithAllUsedSegments().getOvershadowedSegments(); - } - @Override public DataSourcesSnapshot getSnapshotOfDataSourcesWithAllUsedSegments() { @@ -834,16 +828,18 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager } @Override - public Optional> iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(String datasource, - Interval interval, - boolean requiresLatest) + public Optional> iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( + String datasource, + Interval interval, + boolean requiresLatest + ) { if (requiresLatest) { forceOrWaitOngoingDatabasePoll(); } else { useLatestIfWithinDelayOrPerformNewDatabasePoll(); } - VersionedIntervalTimeline usedSegmentsTimeline + SegmentTimeline usedSegmentsTimeline = dataSourcesSnapshot.getUsedSegmentsTimelinesPerDataSource().get(datasource); return Optional.fromNullable(usedSegmentsTimeline) .transform(timeline -> timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE)); diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index 0c3ddfc8905..13460e2695d 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -93,7 +93,7 @@ public class SqlSegmentsMetadataQuery * You cannot assume that segments returned by this call are actually active. Because there is some delay between * new segment publishing and the marking-unused of older segments, it is possible that some segments returned * by this call are overshadowed by other segments. To check for this, use - * {@link org.apache.druid.timeline.VersionedIntervalTimeline#forSegments(Iterator)}. + * {@link org.apache.druid.timeline.SegmentTimeline#forSegments(Iterable)}. * * This call does not return any information about realtime segments. * diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index ab025c0da31..50239db4eca 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -394,6 +394,13 @@ public class DruidCoordinator return CoordinatorCompactionConfig.current(configManager); } + public void markSegmentsAsUnused(String datasource, Set segmentIds) + { + log.debug("Marking [%d] segments of datasource [%s] as unused: %s", segmentIds.size(), datasource, segmentIds); + int updatedCount = segmentsMetadataManager.markSegmentsAsUnused(segmentIds); + log.info("Successfully marked [%d] segments of datasource [%s] as unused", updatedCount, datasource); + } + public void markSegmentAsUnused(DataSegment segment) { log.debug("Marking segment[%s] as unused", segment.getId()); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java index 89bc3bf8c6d..ed488607edd 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java @@ -26,7 +26,7 @@ import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.metadata.MetadataRuleManager; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.timeline.SegmentTimeline; import javax.annotation.Nullable; import java.util.Arrays; @@ -128,7 +128,7 @@ public class DruidCoordinatorRuntimeParams * Creates and returns a "dataSource -> VersionedIntervalTimeline[version String, DataSegment]" map with "used" * segments. */ - public Map> getUsedSegmentsTimelinesPerDataSource() + public Map getUsedSegmentsTimelinesPerDataSource() { Preconditions.checkState(dataSourcesSnapshot != null, "dataSourcesSnapshot or usedSegments must be set"); return dataSourcesSnapshot.getUsedSegmentsTimelinesPerDataSource(); @@ -378,7 +378,7 @@ public class DruidCoordinatorRuntimeParams /** This method must be used in test code only. */ @VisibleForTesting public Builder withUsedSegmentsTimelinesPerDataSourceInTest( - Map> usedSegmentsTimelinesPerDataSource + Map usedSegmentsTimelinesPerDataSource ) { this.dataSourcesSnapshot = DataSourcesSnapshot.fromUsedSegmentsTimelines( diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index 87e79b1d4b3..153b573eb10 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -45,7 +45,7 @@ import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DruidCoordinatorConfig; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.timeline.SegmentTimeline; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -127,7 +127,7 @@ public class CompactSegments implements CoordinatorCustomDuty final CoordinatorStats stats = new CoordinatorStats(); List compactionConfigList = dynamicConfig.getCompactionConfigs(); if (dynamicConfig.getMaxCompactionTaskSlots() > 0) { - Map> dataSources = + Map dataSources = params.getUsedSegmentsTimelinesPerDataSource(); if (compactionConfigList != null && !compactionConfigList.isEmpty()) { Map compactionConfigs = compactionConfigList diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java index a0d0617b401..2cbaf31d69e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java @@ -20,8 +20,7 @@ package org.apache.druid.server.coordinator.duty; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; -import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.timeline.SegmentTimeline; import org.joda.time.Interval; import java.util.List; @@ -37,7 +36,7 @@ public interface CompactionSegmentSearchPolicy */ CompactionSegmentIterator reset( Map compactionConfigs, - Map> dataSources, + Map dataSources, Map> skipIntervals ); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegments.java index e278a758230..1c348e55972 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegments.java @@ -21,21 +21,27 @@ package org.apache.druid.server.coordinator.duty; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidServer; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.coordinator.CoordinatorStats; import org.apache.druid.server.coordinator.DruidCluster; import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.VersionedIntervalTimeline; -import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.SortedSet; public class MarkAsUnusedOvershadowedSegments implements CoordinatorDuty { + private static final Logger log = new Logger(MarkAsUnusedOvershadowedSegments.class); + private final DruidCoordinator coordinator; public MarkAsUnusedOvershadowedSegments(DruidCoordinator coordinator) @@ -48,13 +54,20 @@ public class MarkAsUnusedOvershadowedSegments implements CoordinatorDuty { // Mark as unused overshadowed segments only if we've had enough time to make sure we aren't flapping with old data. if (!params.coordinatorIsLeadingEnoughTimeToMarkAsUnusedOvershadowedSegements()) { + log.info("Skipping MarkAsUnused as coordinator is not leading enough time."); + return params; + } + + final Set allOvershadowedSegments = params.getDataSourcesSnapshot().getOvershadowedSegments(); + if (allOvershadowedSegments.isEmpty()) { + log.info("Skipping MarkAsUnused as there are no overshadowed segments."); return params; } CoordinatorStats stats = new CoordinatorStats(); DruidCluster cluster = params.getDruidCluster(); - Map> timelines = new HashMap<>(); + final Map timelines = new HashMap<>(); for (SortedSet serverHolders : cluster.getSortedHistoricalsByTier()) { for (ServerHolder serverHolder : serverHolders) { @@ -70,31 +83,30 @@ public class MarkAsUnusedOvershadowedSegments implements CoordinatorDuty // to prevent unpublished segments from prematurely overshadowing segments. // Mark all segments as unused in db that are overshadowed by served segments - for (DataSegment dataSegment : params.getUsedSegments()) { - VersionedIntervalTimeline timeline = timelines.get(dataSegment.getDataSource()); - if (timeline != null - && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion(), dataSegment)) { - coordinator.markSegmentAsUnused(dataSegment); + final Map> datasourceToUnusedSegments = new HashMap<>(); + for (DataSegment dataSegment : allOvershadowedSegments) { + SegmentTimeline timeline = timelines.get(dataSegment.getDataSource()); + if (timeline != null && timeline.isOvershadowed(dataSegment)) { + datasourceToUnusedSegments.computeIfAbsent(dataSegment.getDataSource(), ds -> new HashSet<>()) + .add(dataSegment.getId()); stats.addToGlobalStat("overShadowedCount", 1); } } + datasourceToUnusedSegments.forEach(coordinator::markSegmentsAsUnused); return params.buildFromExisting().withCoordinatorStats(stats).build(); } private void addSegmentsFromServer( ServerHolder serverHolder, - Map> timelines + Map timelines ) { ImmutableDruidServer server = serverHolder.getServer(); for (ImmutableDruidDataSource dataSource : server.getDataSources()) { - VersionedIntervalTimeline timeline = timelines - .computeIfAbsent( - dataSource.getName(), - dsName -> new VersionedIntervalTimeline<>(Comparator.naturalOrder()) - ); + SegmentTimeline timeline = timelines + .computeIfAbsent(dataSource.getName(), dsName -> new SegmentTimeline()); VersionedIntervalTimeline.addSegments(timeline, dataSource.getSegments().iterator()); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java index 4f2f1afca58..ca694d24c8e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java @@ -48,6 +48,7 @@ import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.Partitions; +import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.NumberedPartitionChunk; @@ -103,7 +104,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator NewestSegmentFirstIterator( ObjectMapper objectMapper, Map compactionConfigs, - Map> dataSources, + Map dataSources, Map> skipIntervals ) { @@ -111,7 +112,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator this.compactionConfigs = compactionConfigs; this.timelineIterators = Maps.newHashMapWithExpectedSize(dataSources.size()); - dataSources.forEach((String dataSource, VersionedIntervalTimeline timeline) -> { + dataSources.forEach((String dataSource, SegmentTimeline timeline) -> { final DataSourceCompactionConfig config = compactionConfigs.get(dataSource); Granularity configuredSegmentGranularity = null; if (config != null && !timeline.isEmpty()) { @@ -121,7 +122,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator Map> intervalToPartitionMap = new HashMap<>(); configuredSegmentGranularity = config.getGranularitySpec().getSegmentGranularity(); // Create a new timeline to hold segments in the new configured segment granularity - VersionedIntervalTimeline timelineWithConfiguredSegmentGranularity = new VersionedIntervalTimeline<>(Comparator.naturalOrder()); + SegmentTimeline timelineWithConfiguredSegmentGranularity = new SegmentTimeline(); Set segments = timeline.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, Partitions.ONLY_COMPLETE); for (DataSegment segment : segments) { // Convert original segmentGranularity to new granularities bucket by configuredSegmentGranularity diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java index d01170157ed..8306fab85e0 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java @@ -21,8 +21,7 @@ package org.apache.druid.server.coordinator.duty; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; -import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.timeline.SegmentTimeline; import org.joda.time.Interval; import java.util.List; @@ -43,7 +42,7 @@ public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy @Override public CompactionSegmentIterator reset( Map compactionConfigs, - Map> dataSources, + Map dataSources, Map> skipIntervals ) { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java index cf285891f61..6a79fd9dcaf 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java @@ -89,7 +89,7 @@ public class RunRules implements CoordinatorDuty // eventually will be unloaded from Historical servers. Segments overshadowed by *served* used segments are marked // as unused in MarkAsUnusedOvershadowedSegments, and then eventually Coordinator sends commands to Historical nodes // to unload such segments in UnloadUnusedSegments. - Set overshadowed = params.getDataSourcesSnapshot().getOvershadowedSegments(); + Set overshadowed = params.getDataSourcesSnapshot().getOvershadowedSegments(); for (String tier : cluster.getTierNames()) { replicatorThrottler.updateReplicationState(tier); @@ -122,7 +122,7 @@ public class RunRules implements CoordinatorDuty } for (DataSegment segment : params.getUsedSegments()) { - if (overshadowed.contains(segment.getId())) { + if (overshadowed.contains(segment)) { // Skipping overshadowed segments continue; } diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index 148299b30a2..ade8ab992e4 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -182,10 +182,10 @@ public class MetadataResource final Stream usedSegments = dataSourcesWithUsedSegments .stream() .flatMap(t -> t.getSegments().stream()); - final Set overshadowedSegments = dataSourcesSnapshot.getOvershadowedSegments(); + final Set overshadowedSegments = dataSourcesSnapshot.getOvershadowedSegments(); final Stream usedSegmentsWithOvershadowedStatus = usedSegments - .map(segment -> new SegmentWithOvershadowedStatus(segment, overshadowedSegments.contains(segment.getId()))); + .map(segment -> new SegmentWithOvershadowedStatus(segment, overshadowedSegments.contains(segment))); final Function> raGenerator = segment -> Collections .singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSegment().getDataSource())); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index d201bb8ffba..b0a92fb7fb2 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -86,6 +86,7 @@ import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig; import org.apache.druid.server.coordinator.UserCompactionTaskTransformConfig; import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; @@ -176,7 +177,7 @@ public class CompactSegmentsTest private final PartitionsSpec partitionsSpec; private final BiFunction shardSpecFactory; - private Map> dataSources; + private Map dataSources; Map> datasourceToSegments = new HashMap<>(); public CompactSegmentsTest(PartitionsSpec partitionsSpec, BiFunction shardSpecFactory) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegmentsTest.java index 301b3bd8f9f..6db88b86a27 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegmentsTest.java @@ -117,8 +117,7 @@ public class MarkAsUnusedOvershadowedSegmentsTest .andReturn(ImmutableSet.of(segmentV1, segmentV2)) .anyTimes(); EasyMock.expect(druidDataSource.getName()).andReturn("test").anyTimes(); - coordinator.markSegmentAsUnused(segmentV1); - coordinator.markSegmentAsUnused(segmentV0); + coordinator.markSegmentsAsUnused("test", ImmutableSet.of(segmentV1.getId(), segmentV0.getId())); EasyMock.expectLastCall(); EasyMock.replay(mockPeon, coordinator, druidServer, druidDataSource); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java index 2ae82a5f11d..078a538f25d 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java @@ -55,7 +55,7 @@ import org.apache.druid.server.coordinator.UserCompactionTaskTransformConfig; import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.Partitions; -import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.ShardSpec; import org.assertj.core.api.Assertions; @@ -315,7 +315,7 @@ public class NewestSegmentFirstPolicyTest public void testClearSegmentsToCompactWhenSkippingSegments() { final long inputSegmentSizeBytes = 800000; - final VersionedIntervalTimeline timeline = createTimeline( + final SegmentTimeline timeline = createTimeline( new SegmentGenerateSpec( Intervals.of("2017-12-03T00:00:00/2017-12-04T00:00:00"), new Period("P1D"), @@ -359,7 +359,7 @@ public class NewestSegmentFirstPolicyTest @Test public void testIfFirstSegmentIsInSkipOffset() { - final VersionedIntervalTimeline timeline = createTimeline( + final SegmentTimeline timeline = createTimeline( new SegmentGenerateSpec( Intervals.of("2017-12-02T14:00:00/2017-12-03T00:00:00"), new Period("PT5H"), @@ -380,7 +380,7 @@ public class NewestSegmentFirstPolicyTest @Test public void testIfFirstSegmentOverlapsSkipOffset() { - final VersionedIntervalTimeline timeline = createTimeline( + final SegmentTimeline timeline = createTimeline( new SegmentGenerateSpec( Intervals.of("2017-12-01T23:00:00/2017-12-03T00:00:00"), new Period("PT5H"), @@ -401,7 +401,7 @@ public class NewestSegmentFirstPolicyTest @Test public void testIfSegmentsSkipOffsetWithConfiguredSegmentGranularityEqual() { - final VersionedIntervalTimeline timeline = createTimeline( + final SegmentTimeline timeline = createTimeline( new SegmentGenerateSpec(Intervals.of("2017-11-30T23:00:00/2017-12-03T00:00:00"), new Period("P1D")), new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("P1D")) ); @@ -424,7 +424,7 @@ public class NewestSegmentFirstPolicyTest @Test public void testIfSegmentsSkipOffsetWithConfiguredSegmentGranularityLarger() { - final VersionedIntervalTimeline timeline = createTimeline( + final SegmentTimeline timeline = createTimeline( // This contains segment that // - Cross between month boundary of latest month (starts in Nov and ends in Dec). This should be skipped // - Fully in latest month (starts in Dec and ends in Dec). This should be skipped @@ -454,7 +454,7 @@ public class NewestSegmentFirstPolicyTest @Test public void testIfSegmentsSkipOffsetWithConfiguredSegmentGranularitySmaller() { - final VersionedIntervalTimeline timeline = createTimeline( + final SegmentTimeline timeline = createTimeline( new SegmentGenerateSpec(Intervals.of("2017-12-01T23:00:00/2017-12-03T00:00:00"), new Period("PT5H")), new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("PT5H")) ); @@ -563,7 +563,7 @@ public class NewestSegmentFirstPolicyTest @Test public void testIteratorReturnsSegmentsInConfiguredSegmentGranularity() { - final VersionedIntervalTimeline timeline = createTimeline( + final SegmentTimeline timeline = createTimeline( // Segments with day interval from Oct to Dec new SegmentGenerateSpec(Intervals.of("2017-10-01T00:00:00/2017-12-31T00:00:00"), new Period("P1D")) ); @@ -611,7 +611,7 @@ public class NewestSegmentFirstPolicyTest @Test public void testIteratorReturnsSegmentsInMultipleIntervalIfConfiguredSegmentGranularityCrossBoundary() { - final VersionedIntervalTimeline timeline = createTimeline( + final SegmentTimeline timeline = createTimeline( new SegmentGenerateSpec(Intervals.of("2020-01-01/2020-01-08"), new Period("P7D")), new SegmentGenerateSpec(Intervals.of("2020-01-28/2020-02-03"), new Period("P7D")), new SegmentGenerateSpec(Intervals.of("2020-02-08/2020-02-15"), new Period("P7D")) @@ -648,7 +648,7 @@ public class NewestSegmentFirstPolicyTest @Test public void testIteratorDoesNotReturnCompactedInterval() { - final VersionedIntervalTimeline timeline = createTimeline( + final SegmentTimeline timeline = createTimeline( new SegmentGenerateSpec(Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"), new Period("P1D")) ); @@ -670,7 +670,7 @@ public class NewestSegmentFirstPolicyTest @Test public void testIteratorReturnsAllMixedVersionSegmentsInConfiguredSegmentGranularity() { - final VersionedIntervalTimeline timeline = createTimeline( + final SegmentTimeline timeline = createTimeline( new SegmentGenerateSpec(Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), new Period("P1D"), "1994-04-29T00:00:00.000Z", null), new SegmentGenerateSpec(Intervals.of("2017-10-01T01:00:00/2017-10-01T02:00:00"), new Period("PT1H"), "1994-04-30T00:00:00.000Z", null) ); @@ -703,7 +703,7 @@ public class NewestSegmentFirstPolicyTest PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY - final VersionedIntervalTimeline timeline = createTimeline( + final SegmentTimeline timeline = createTimeline( new SegmentGenerateSpec( Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), new Period("P1D"), @@ -736,7 +736,7 @@ public class NewestSegmentFirstPolicyTest PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY - final VersionedIntervalTimeline timeline = createTimeline( + final SegmentTimeline timeline = createTimeline( new SegmentGenerateSpec( Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), new Period("P1D"), @@ -769,7 +769,7 @@ public class NewestSegmentFirstPolicyTest PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY - final VersionedIntervalTimeline timeline = createTimeline( + final SegmentTimeline timeline = createTimeline( new SegmentGenerateSpec( Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), new Period("P1D"), @@ -812,7 +812,7 @@ public class NewestSegmentFirstPolicyTest PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY - final VersionedIntervalTimeline timeline = createTimeline( + final SegmentTimeline timeline = createTimeline( new SegmentGenerateSpec( Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), new Period("P1D"), @@ -855,7 +855,7 @@ public class NewestSegmentFirstPolicyTest PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY - final VersionedIntervalTimeline timeline = createTimeline( + final SegmentTimeline timeline = createTimeline( new SegmentGenerateSpec( Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"), new Period("P1D"), @@ -907,7 +907,7 @@ public class NewestSegmentFirstPolicyTest PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY - final VersionedIntervalTimeline timeline = createTimeline( + final SegmentTimeline timeline = createTimeline( new SegmentGenerateSpec( Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"), new Period("P1D"), @@ -961,7 +961,7 @@ public class NewestSegmentFirstPolicyTest // rollup=false for interval 2017-10-01T00:00:00/2017-10-02T00:00:00, // rollup=true for interval 2017-10-02T00:00:00/2017-10-03T00:00:00, // and rollup=null for interval 2017-10-03T00:00:00/2017-10-04T00:00:00 (queryGranularity was not set during last compaction) - final VersionedIntervalTimeline timeline = createTimeline( + final SegmentTimeline timeline = createTimeline( new SegmentGenerateSpec( Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), new Period("P1D"), @@ -1021,7 +1021,7 @@ public class NewestSegmentFirstPolicyTest // queryGranularity=DAY for interval 2017-10-01T00:00:00/2017-10-02T00:00:00, // queryGranularity=MINUTE for interval 2017-10-02T00:00:00/2017-10-03T00:00:00, // and queryGranularity=null for interval 2017-10-03T00:00:00/2017-10-04T00:00:00 (queryGranularity was not set during last compaction) - final VersionedIntervalTimeline timeline = createTimeline( + final SegmentTimeline timeline = createTimeline( new SegmentGenerateSpec( Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), new Period("P1D"), @@ -1082,7 +1082,7 @@ public class NewestSegmentFirstPolicyTest // Dimensions=["foo"] for interval 2017-10-02T00:00:00/2017-10-03T00:00:00, // Dimensions=null for interval 2017-10-03T00:00:00/2017-10-04T00:00:00 (dimensions was not set during last compaction) // and dimensionsSpec=null for interval 2017-10-04T00:00:00/2017-10-05T00:00:00 (dimensionsSpec was not set during last compaction) - final VersionedIntervalTimeline timeline = createTimeline( + final SegmentTimeline timeline = createTimeline( new SegmentGenerateSpec( Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), new Period("P1D"), @@ -1181,7 +1181,7 @@ public class NewestSegmentFirstPolicyTest // filter=SelectorDimFilter("dim1", "bar", null) for interval 2017-10-02T00:00:00/2017-10-03T00:00:00, // filter=null for interval 2017-10-03T00:00:00/2017-10-04T00:00:00 (filter was not set during last compaction) // and transformSpec=null for interval 2017-10-04T00:00:00/2017-10-05T00:00:00 (transformSpec was not set during last compaction) - final VersionedIntervalTimeline timeline = createTimeline( + final SegmentTimeline timeline = createTimeline( new SegmentGenerateSpec( Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), new Period("P1D"), @@ -1305,7 +1305,7 @@ public class NewestSegmentFirstPolicyTest // metricsSpec={CountAggregatorFactory("cnt"), LongSumAggregatorFactory("val", "val")} for interval 2017-10-02T00:00:00/2017-10-03T00:00:00, // metricsSpec=[] for interval 2017-10-03T00:00:00/2017-10-04T00:00:00 (filter was not set during last compaction) // and metricsSpec=null for interval 2017-10-04T00:00:00/2017-10-05T00:00:00 (transformSpec was not set during last compaction) - final VersionedIntervalTimeline timeline = createTimeline( + final SegmentTimeline timeline = createTimeline( new SegmentGenerateSpec( Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), new Period("P1D"), @@ -1414,7 +1414,7 @@ public class NewestSegmentFirstPolicyTest @Test public void testIteratorReturnsSegmentsSmallerSegmentGranularityCoveringMultipleSegmentsInTimeline() { - final VersionedIntervalTimeline timeline = createTimeline( + final SegmentTimeline timeline = createTimeline( new SegmentGenerateSpec(Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), new Period("P1D"), "1994-04-29T00:00:00.000Z", null), new SegmentGenerateSpec(Intervals.of("2017-10-01T01:00:00/2017-10-01T02:00:00"), new Period("PT1H"), "1994-04-30T00:00:00.000Z", null) ); @@ -1450,7 +1450,7 @@ public class NewestSegmentFirstPolicyTest PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY - final VersionedIntervalTimeline timeline = createTimeline( + final SegmentTimeline timeline = createTimeline( new SegmentGenerateSpec( Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"), new Period("P1D"), @@ -1497,7 +1497,7 @@ public class NewestSegmentFirstPolicyTest { NullHandling.initializeForTests(); PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); - final VersionedIntervalTimeline timeline = createTimeline( + final SegmentTimeline timeline = createTimeline( new SegmentGenerateSpec( Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), new Period("P1D"), @@ -1629,9 +1629,7 @@ public class NewestSegmentFirstPolicyTest } } - private static VersionedIntervalTimeline createTimeline( - SegmentGenerateSpec... specs - ) + private static SegmentTimeline createTimeline(SegmentGenerateSpec... specs) { List segments = new ArrayList<>(); final String version = DateTimes.nowUtc().toString(); @@ -1671,7 +1669,7 @@ public class NewestSegmentFirstPolicyTest } } - return VersionedIntervalTimeline.forSegments(segments); + return SegmentTimeline.forSegments(segments); } private DataSourceCompactionConfig createCompactionConfig( diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java index 43a96d60070..e29c2ebbf8d 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java @@ -145,12 +145,6 @@ public class TestSegmentsMetadataManager implements SegmentsMetadataManager return getSnapshotOfDataSourcesWithAllUsedSegments().getDataSourcesWithAllUsedSegments(); } - @Override - public Set getOvershadowedSegments() - { - return getSnapshotOfDataSourcesWithAllUsedSegments().getOvershadowedSegments(); - } - @Override public DataSourcesSnapshot getSnapshotOfDataSourcesWithAllUsedSegments() {