From 8ddb847658813ee6445f86cfce51aae4f65d3d15 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 30 Nov 2023 14:57:13 +0530 Subject: [PATCH] Fix message when skipping compaction (#15460) --- .../compact/NewestSegmentFirstIterator.java | 77 ++++++++++++------- 1 file changed, 49 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIterator.java index f9059dca67b..2c888ad7170 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIterator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIterator.java @@ -36,7 +36,6 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.Partitions; import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.TimelineObjectHolder; -import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.NumberedPartitionChunk; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.PartitionChunk; @@ -71,7 +70,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator private final Map compactedSegmentStats = new HashMap<>(); private final Map skippedSegmentStats = new HashMap<>(); - private final Map timelineIterators; + private final Map timelineIterators; // This is needed for datasource that has segmentGranularity configured // If configured segmentGranularity in config is finer than current segmentGranularity, the same set of segments @@ -149,10 +148,18 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator originalTimeline = timeline; timeline = timelineWithConfiguredSegmentGranularity; } - final List searchIntervals = - findInitialSearchInterval(dataSource, timeline, config.getSkipOffsetFromLatest(), configuredSegmentGranularity, skipIntervals.get(dataSource)); + final List searchIntervals = findInitialSearchInterval( + dataSource, + timeline, + config.getSkipOffsetFromLatest(), + configuredSegmentGranularity, + skipIntervals.get(dataSource) + ); if (!searchIntervals.isEmpty()) { - timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchIntervals, originalTimeline)); + timelineIterators.put( + dataSource, + new CompactibleSegmentIterator(timeline, searchIntervals, originalTimeline) + ); } } }); @@ -218,41 +225,46 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator } /** - * Iterates the given {@link VersionedIntervalTimeline}. Only compactible {@link TimelineObjectHolder}s are returned, - * which means the holder always has at least one {@link DataSegment}. + * Iterates compactible segments in a {@link SegmentTimeline}. */ - private static class CompactibleTimelineObjectHolderCursor implements Iterator> + private static class CompactibleSegmentIterator implements Iterator> { private final List> holders; @Nullable private final SegmentTimeline originalTimeline; - CompactibleTimelineObjectHolderCursor( + CompactibleSegmentIterator( SegmentTimeline timeline, List totalIntervalsToSearch, // originalTimeline can be null if timeline was not modified @Nullable SegmentTimeline originalTimeline ) { - this.holders = totalIntervalsToSearch - .stream() - .flatMap(interval -> timeline + this.holders = totalIntervalsToSearch.stream().flatMap( + interval -> timeline .lookup(interval) .stream() .filter(holder -> isCompactibleHolder(interval, holder)) - ) - .collect(Collectors.toList()); + ).collect(Collectors.toList()); this.originalTimeline = originalTimeline; } - private boolean isCompactibleHolder(Interval interval, TimelineObjectHolder holder) + /** + * Checks if the {@link TimelineObjectHolder} satisfies the following: + *
    + *
  • It has atleast one segment.
  • + *
  • The interval of the segments is contained in the searchInterval.
  • + *
  • The total bytes across all the segments is positive.
  • + *
+ */ + private boolean isCompactibleHolder(Interval searchInterval, TimelineObjectHolder holder) { final Iterator> chunks = holder.getObject().iterator(); if (!chunks.hasNext()) { - return false; // There should be at least one chunk for a holder to be compactible. + return false; } PartitionChunk firstChunk = chunks.next(); - if (!interval.contains(firstChunk.getObject().getInterval())) { + if (!searchInterval.contains(firstChunk.getObject().getInterval())) { return false; } long partitionBytes = firstChunk.getObject().getSize(); @@ -268,10 +280,19 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator return !holders.isEmpty(); } + /** + * Returns the next list of compactible segments in the datasource timeline. + * The returned list satisfies the following conditions: + *
    + *
  • The list is non-null and non-empty.
  • + *
  • The segments are present in the search interval.
  • + *
  • Total bytes of segments in the list is greater than zero.
  • + *
+ */ @Override public List next() { - if (holders.isEmpty()) { + if (!hasNext()) { throw new NoSuchElementException(); } TimelineObjectHolder timelineObjectHolder = holders.remove(holders.size() - 1); @@ -302,20 +323,20 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator final DataSourceCompactionConfig config ) { - final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor + final CompactibleSegmentIterator compactibleSegmentIterator = timelineIterators.get(dataSourceName); - if (compactibleTimelineObjectHolderCursor == null) { - log.warn("Skipping dataSource[%s] as there is no timeline for it.", dataSourceName); + if (compactibleSegmentIterator == null) { + log.warn( + "Skipping compaction for datasource[%s] as there is no compactible segment in its timeline.", + dataSourceName + ); return SegmentsToCompact.empty(); } final long inputSegmentSize = config.getInputSegmentSizeBytes(); - while (compactibleTimelineObjectHolderCursor.hasNext()) { - List segments = compactibleTimelineObjectHolderCursor.next(); - if (segments.isEmpty()) { - throw new ISE("No segment is found?"); - } + while (compactibleSegmentIterator.hasNext()) { + List segments = compactibleSegmentIterator.next(); final SegmentsToCompact candidates = SegmentsToCompact.from(segments); final Interval interval = candidates.getUmbrellaInterval(); @@ -353,7 +374,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator } } - log.debug("All segments look good! Nothing to compact"); + log.debug("No more segments to compact for datasource[%s].", dataSourceName); return SegmentsToCompact.empty(); } @@ -377,7 +398,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator */ private List findInitialSearchInterval( String dataSourceName, - VersionedIntervalTimeline timeline, + SegmentTimeline timeline, Period skipOffset, Granularity configuredSegmentGranularity, @Nullable List skipIntervals