Add method SegmentTimeline.addSegments (#13831)

This commit is contained in:
Kashif Faraz 2023-02-21 23:58:01 -08:00 committed by GitHub
parent 07883e311e
commit 3a67a43c8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 23 additions and 26 deletions

View File

@ -19,6 +19,8 @@
package org.apache.druid.timeline;
import com.google.common.collect.Iterators;
import java.util.Comparator;
import java.util.Iterator;
@ -35,7 +37,7 @@ public class SegmentTimeline extends VersionedIntervalTimeline<String, DataSegme
public static SegmentTimeline forSegments(Iterator<DataSegment> segments)
{
final SegmentTimeline timeline = new SegmentTimeline();
VersionedIntervalTimeline.addSegments(timeline, segments);
timeline.addSegments(segments);
return timeline;
}
@ -44,6 +46,20 @@ public class SegmentTimeline extends VersionedIntervalTimeline<String, DataSegme
super(Comparator.naturalOrder());
}
public void addSegments(Iterator<DataSegment> segments)
{
addAll(
Iterators.transform(
segments,
segment -> new PartitionChunkEntry<>(
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk(segment)
)
)
);
}
public boolean isOvershadowed(DataSegment segment)
{
return isOvershadowed(segment.getInterval(), segment.getVersion(), segment);

View File

@ -106,22 +106,6 @@ public class VersionedIntervalTimeline<VersionType, ObjectType extends Overshado
this.skipObjectsWithNoData = skipObjectsWithNoData;
}
public static void addSegments(
VersionedIntervalTimeline<String, DataSegment> timeline,
Iterator<DataSegment> segments
)
{
timeline.addAll(
Iterators.transform(
segments,
segment -> new PartitionChunkEntry<>(
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk(segment)
)
));
}
public static <VersionType, ObjectType extends Overshadowable<ObjectType>> Iterable<ObjectType> getAllObjects(
final List<TimelineObjectHolder<VersionType, ObjectType>> holders
)

View File

@ -50,7 +50,6 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.DateTime;
import org.joda.time.Duration;
@ -572,14 +571,14 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
try (final CloseableIterator<DataSegment> iterator =
queryTool.retrieveUsedSegments(dataSourceName, intervals)) {
VersionedIntervalTimeline.addSegments(timeline, iterator);
timeline.addSegments(iterator);
}
try (final CloseableIterator<DataSegment> iterator =
queryTool.retrieveUnusedSegments(dataSourceName, intervals)) {
while (iterator.hasNext()) {
final DataSegment dataSegment = iterator.next();
VersionedIntervalTimeline.addSegments(timeline, Iterators.singletonIterator(dataSegment));
timeline.addSegments(Iterators.singletonIterator(dataSegment));
unusedSegments.add(dataSegment);
}
}

View File

@ -30,7 +30,6 @@ import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import java.util.HashMap;
import java.util.HashSet;
@ -107,7 +106,7 @@ public class MarkAsUnusedOvershadowedSegments implements CoordinatorDuty
for (ImmutableDruidDataSource dataSource : server.getDataSources()) {
SegmentTimeline timeline = timelines
.computeIfAbsent(dataSource.getName(), dsName -> new SegmentTimeline());
VersionedIntervalTimeline.addSegments(timeline, dataSource.getSegments().iterator());
timeline.addSegments(dataSource.getSegments().iterator());
}
}
}

View File

@ -19,10 +19,9 @@
package org.apache.druid.segment.realtime.appenderator;
import com.google.common.collect.Ordering;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import java.util.HashSet;
@ -41,8 +40,8 @@ public class TestUsedSegmentChecker implements UsedSegmentChecker
@Override
public Set<DataSegment> findUsedSegments(Set<SegmentIdWithShardSpec> identifiers)
{
final VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(Ordering.natural());
VersionedIntervalTimeline.addSegments(timeline, pushedSegments.iterator());
final SegmentTimeline timeline = new SegmentTimeline();
timeline.addSegments(pushedSegments.iterator());
final Set<DataSegment> retVal = new HashSet<>();
for (SegmentIdWithShardSpec identifier : identifiers) {