Fix message when skipping compaction (#15460)

This commit is contained in:
Kashif Faraz 2023-11-30 14:57:13 +05:30 committed by GitHub
parent 31fa63e789
commit 8ddb847658
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 49 additions and 28 deletions

View File

@ -36,7 +36,6 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions; import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NumberedPartitionChunk; import org.apache.druid.timeline.partition.NumberedPartitionChunk;
import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionChunk;
@ -71,7 +70,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
private final Map<String, CompactionStatistics> compactedSegmentStats = new HashMap<>(); private final Map<String, CompactionStatistics> compactedSegmentStats = new HashMap<>();
private final Map<String, CompactionStatistics> skippedSegmentStats = new HashMap<>(); private final Map<String, CompactionStatistics> skippedSegmentStats = new HashMap<>();
private final Map<String, CompactibleTimelineObjectHolderCursor> timelineIterators; private final Map<String, CompactibleSegmentIterator> timelineIterators;
// This is needed for datasource that has segmentGranularity configured // This is needed for datasource that has segmentGranularity configured
// If configured segmentGranularity in config is finer than current segmentGranularity, the same set of segments // If configured segmentGranularity in config is finer than current segmentGranularity, the same set of segments
@ -149,10 +148,18 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
originalTimeline = timeline; originalTimeline = timeline;
timeline = timelineWithConfiguredSegmentGranularity; timeline = timelineWithConfiguredSegmentGranularity;
} }
final List<Interval> searchIntervals = final List<Interval> searchIntervals = findInitialSearchInterval(
findInitialSearchInterval(dataSource, timeline, config.getSkipOffsetFromLatest(), configuredSegmentGranularity, skipIntervals.get(dataSource)); dataSource,
timeline,
config.getSkipOffsetFromLatest(),
configuredSegmentGranularity,
skipIntervals.get(dataSource)
);
if (!searchIntervals.isEmpty()) { if (!searchIntervals.isEmpty()) {
timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchIntervals, originalTimeline)); timelineIterators.put(
dataSource,
new CompactibleSegmentIterator(timeline, searchIntervals, originalTimeline)
);
} }
} }
}); });
@ -218,41 +225,46 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
} }
/** /**
* Iterates the given {@link VersionedIntervalTimeline}. Only compactible {@link TimelineObjectHolder}s are returned, * Iterates compactible segments in a {@link SegmentTimeline}.
* which means the holder always has at least one {@link DataSegment}.
*/ */
private static class CompactibleTimelineObjectHolderCursor implements Iterator<List<DataSegment>> private static class CompactibleSegmentIterator implements Iterator<List<DataSegment>>
{ {
private final List<TimelineObjectHolder<String, DataSegment>> holders; private final List<TimelineObjectHolder<String, DataSegment>> holders;
@Nullable @Nullable
private final SegmentTimeline originalTimeline; private final SegmentTimeline originalTimeline;
CompactibleTimelineObjectHolderCursor( CompactibleSegmentIterator(
SegmentTimeline timeline, SegmentTimeline timeline,
List<Interval> totalIntervalsToSearch, List<Interval> totalIntervalsToSearch,
// originalTimeline can be null if timeline was not modified // originalTimeline can be null if timeline was not modified
@Nullable SegmentTimeline originalTimeline @Nullable SegmentTimeline originalTimeline
) )
{ {
this.holders = totalIntervalsToSearch this.holders = totalIntervalsToSearch.stream().flatMap(
.stream() interval -> timeline
.flatMap(interval -> timeline
.lookup(interval) .lookup(interval)
.stream() .stream()
.filter(holder -> isCompactibleHolder(interval, holder)) .filter(holder -> isCompactibleHolder(interval, holder))
) ).collect(Collectors.toList());
.collect(Collectors.toList());
this.originalTimeline = originalTimeline; this.originalTimeline = originalTimeline;
} }
private boolean isCompactibleHolder(Interval interval, TimelineObjectHolder<String, DataSegment> holder) /**
* Checks if the {@link TimelineObjectHolder} satisfies the following:
* <ul>
* <li>It has atleast one segment.</li>
* <li>The interval of the segments is contained in the searchInterval.</li>
* <li>The total bytes across all the segments is positive.</li>
* </ul>
*/
private boolean isCompactibleHolder(Interval searchInterval, TimelineObjectHolder<String, DataSegment> holder)
{ {
final Iterator<PartitionChunk<DataSegment>> chunks = holder.getObject().iterator(); final Iterator<PartitionChunk<DataSegment>> chunks = holder.getObject().iterator();
if (!chunks.hasNext()) { if (!chunks.hasNext()) {
return false; // There should be at least one chunk for a holder to be compactible. return false;
} }
PartitionChunk<DataSegment> firstChunk = chunks.next(); PartitionChunk<DataSegment> firstChunk = chunks.next();
if (!interval.contains(firstChunk.getObject().getInterval())) { if (!searchInterval.contains(firstChunk.getObject().getInterval())) {
return false; return false;
} }
long partitionBytes = firstChunk.getObject().getSize(); long partitionBytes = firstChunk.getObject().getSize();
@ -268,10 +280,19 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
return !holders.isEmpty(); return !holders.isEmpty();
} }
/**
* Returns the next list of compactible segments in the datasource timeline.
* The returned list satisfies the following conditions:
* <ul>
* <li>The list is non-null and non-empty.</li>
* <li>The segments are present in the search interval.</li>
* <li>Total bytes of segments in the list is greater than zero.</li>
* </ul>
*/
@Override @Override
public List<DataSegment> next() public List<DataSegment> next()
{ {
if (holders.isEmpty()) { if (!hasNext()) {
throw new NoSuchElementException(); throw new NoSuchElementException();
} }
TimelineObjectHolder<String, DataSegment> timelineObjectHolder = holders.remove(holders.size() - 1); TimelineObjectHolder<String, DataSegment> timelineObjectHolder = holders.remove(holders.size() - 1);
@ -302,20 +323,20 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
final DataSourceCompactionConfig config final DataSourceCompactionConfig config
) )
{ {
final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor final CompactibleSegmentIterator compactibleSegmentIterator
= timelineIterators.get(dataSourceName); = timelineIterators.get(dataSourceName);
if (compactibleTimelineObjectHolderCursor == null) { if (compactibleSegmentIterator == null) {
log.warn("Skipping dataSource[%s] as there is no timeline for it.", dataSourceName); log.warn(
"Skipping compaction for datasource[%s] as there is no compactible segment in its timeline.",
dataSourceName
);
return SegmentsToCompact.empty(); return SegmentsToCompact.empty();
} }
final long inputSegmentSize = config.getInputSegmentSizeBytes(); final long inputSegmentSize = config.getInputSegmentSizeBytes();
while (compactibleTimelineObjectHolderCursor.hasNext()) { while (compactibleSegmentIterator.hasNext()) {
List<DataSegment> segments = compactibleTimelineObjectHolderCursor.next(); List<DataSegment> segments = compactibleSegmentIterator.next();
if (segments.isEmpty()) {
throw new ISE("No segment is found?");
}
final SegmentsToCompact candidates = SegmentsToCompact.from(segments); final SegmentsToCompact candidates = SegmentsToCompact.from(segments);
final Interval interval = candidates.getUmbrellaInterval(); final Interval interval = candidates.getUmbrellaInterval();
@ -353,7 +374,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
} }
} }
log.debug("All segments look good! Nothing to compact"); log.debug("No more segments to compact for datasource[%s].", dataSourceName);
return SegmentsToCompact.empty(); return SegmentsToCompact.empty();
} }
@ -377,7 +398,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
*/ */
private List<Interval> findInitialSearchInterval( private List<Interval> findInitialSearchInterval(
String dataSourceName, String dataSourceName,
VersionedIntervalTimeline<String, DataSegment> timeline, SegmentTimeline timeline,
Period skipOffset, Period skipOffset,
Granularity configuredSegmentGranularity, Granularity configuredSegmentGranularity,
@Nullable List<Interval> skipIntervals @Nullable List<Interval> skipIntervals