diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java index 6548db6f203..e0eecf18ea9 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java @@ -40,6 +40,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -181,9 +182,9 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator /** * Iterates the given {@link VersionedIntervalTimeline}. Only compactible {@link TimelineObjectHolder}s are returned, - * which means the holder always has at least one {@link DataSegment} and the total size of segments is larger than 0. + * which means the holder always has at least two {@link DataSegment}s. */ - private static class CompactibleTimelineObjectHolderCursor + private static class CompactibleTimelineObjectHolderCursor implements Iterator> { private final List> holders; @@ -200,7 +201,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator .filter(holder -> { final List> chunks = Lists.newArrayList(holder.getObject().iterator()); final long partitionBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum(); - return chunks.size() > 0 + return chunks.size() > 1 && partitionBytes > 0 && interval.contains(chunks.get(0).getObject().getInterval()); }) @@ -208,32 +209,23 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator .collect(Collectors.toList()); } - boolean hasNext() + @Override + public boolean hasNext() { return !holders.isEmpty(); } - /** - * Returns the latest holder. - */ - @Nullable - TimelineObjectHolder get() + @Override + public List next() { if (holders.isEmpty()) { - return null; - } else { - return holders.get(holders.size() - 1); - } - } - - /** - * Removes the latest holder, so that {@link #get()} returns the next one. - */ - void next() - { - if (!holders.isEmpty()) { - holders.remove(holders.size() - 1); + throw new NoSuchElementException(); } + return holders.remove(holders.size() - 1) + .getObject() + .stream() + .map(PartitionChunk::getObject) + .collect(Collectors.toList()); } } @@ -254,103 +246,59 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator ) { final long inputSegmentSize = config.getInputSegmentSizeBytes(); + final @Nullable Long targetCompactionSizeBytes = config.getTargetCompactionSizeBytes(); final int maxNumSegmentsToCompact = config.getMaxNumSegmentsToCompact(); - final SegmentsToCompact segmentsToCompact = new SegmentsToCompact(); // Finds segments to compact together while iterating timeline from latest to oldest - while (compactibleTimelineObjectHolderCursor.hasNext() - && segmentsToCompact.getTotalSize() < inputSegmentSize - && segmentsToCompact.getNumSegments() < maxNumSegmentsToCompact) { - final TimelineObjectHolder timeChunkHolder = Preconditions.checkNotNull( - compactibleTimelineObjectHolderCursor.get(), - "timelineObjectHolder" + while (compactibleTimelineObjectHolderCursor.hasNext()) { + final SegmentsToCompact candidates = new SegmentsToCompact(compactibleTimelineObjectHolderCursor.next()); + final boolean isCompactibleSize = candidates.getTotalSize() <= inputSegmentSize; + final boolean isCompactibleNum = candidates.getNumSegments() <= maxNumSegmentsToCompact; + final boolean needsCompaction = SegmentCompactorUtil.needsCompaction( + targetCompactionSizeBytes, + candidates.segments ); - final List> chunks = Lists.newArrayList(timeChunkHolder.getObject().iterator()); - final long timeChunkSizeBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum(); - final boolean isSameOrAbuttingInterval; - final Interval lastInterval = segmentsToCompact.getIntervalOfLastSegment(); - if (lastInterval == null) { - isSameOrAbuttingInterval = true; + if (isCompactibleSize && isCompactibleNum && needsCompaction) { + return candidates; } else { - final Interval currentInterval = chunks.get(0).getObject().getInterval(); - isSameOrAbuttingInterval = currentInterval.isEqual(lastInterval) || currentInterval.abuts(lastInterval); - } - - // The segments in a holder should be added all together or not. - final boolean isCompactibleSize = SegmentCompactorUtil.isCompactibleSize( - inputSegmentSize, - segmentsToCompact.getTotalSize(), - timeChunkSizeBytes - ); - final boolean isCompactibleNum = SegmentCompactorUtil.isCompactibleNum( - maxNumSegmentsToCompact, - segmentsToCompact.getNumSegments(), - chunks.size() - ); - if (isCompactibleSize - && isCompactibleNum - && isSameOrAbuttingInterval - && segmentsToCompact.isEmpty()) { - chunks.forEach(chunk -> segmentsToCompact.add(chunk.getObject())); - } else { - if (segmentsToCompact.getNumSegments() > 1) { - // We found some segmens to compact and cannot add more. End here. - return segmentsToCompact; - } else { - if (!SegmentCompactorUtil.isCompactibleSize(inputSegmentSize, 0, timeChunkSizeBytes)) { - final DataSegment segment = chunks.get(0).getObject(); - segmentsToCompact.clear(); - log.warn( - "shardSize[%d] for dataSource[%s] and interval[%s] is larger than inputSegmentSize[%d]." - + " Continue to the next shard.", - timeChunkSizeBytes, - segment.getDataSource(), - segment.getInterval(), - inputSegmentSize - ); - } else if (maxNumSegmentsToCompact < chunks.size()) { - final DataSegment segment = chunks.get(0).getObject(); - segmentsToCompact.clear(); - log.warn( - "The number of segments[%d] for dataSource[%s] and interval[%s] is larger than " - + "maxNumSegmentsToCompact[%d]. If you see lots of shards are being skipped due to too many " - + "segments, consider increasing 'numTargetCompactionSegments' and " - + "'druid.indexer.runner.maxZnodeBytes'. Continue to the next shard.", - chunks.size(), - segment.getDataSource(), - segment.getInterval(), - maxNumSegmentsToCompact - ); - } else { - if (segmentsToCompact.getNumSegments() == 1) { - // We found a segment which is smaller than targetCompactionSize but too large to compact with other - // segments. Skip this one. - segmentsToCompact.clear(); - chunks.forEach(chunk -> segmentsToCompact.add(chunk.getObject())); - } else { - throw new ISE( - "Cannot compact segments[%s]. shardBytes[%s], numSegments[%s] " - + "with current segmentsToCompact[%s]", - chunks.stream().map(PartitionChunk::getObject).collect(Collectors.toList()), - timeChunkSizeBytes, - chunks.size(), - segmentsToCompact - ); - } - } + if (!isCompactibleSize) { + log.warn( + "total segment size[%d] for datasource[%s] and interval[%s] is larger than inputSegmentSize[%d]." + + " Continue to the next interval.", + candidates.getTotalSize(), + candidates.segments.get(0).getDataSource(), + candidates.segments.get(0).getInterval(), + inputSegmentSize + ); + } + if (!isCompactibleNum) { + log.warn( + "Number of segments[%d] for datasource[%s] and interval[%s] is larger than " + + "maxNumSegmentsToCompact[%d]. If you see lots of shards are being skipped due to too many " + + "segments, consider increasing 'numTargetCompactionSegments' and " + + "'druid.indexer.runner.maxZnodeBytes'. Continue to the next interval.", + candidates.getNumSegments(), + candidates.segments.get(0).getDataSource(), + candidates.segments.get(0).getInterval(), + maxNumSegmentsToCompact + ); + } + if (!needsCompaction) { + log.warn( + "Size of most of segments[%s] is larger than targetCompactionSizeBytes[%s] " + + "for datasource[%s] and interval[%s]. Skipping compaction for this interval.", + candidates.segments.stream().map(DataSegment::getSize).collect(Collectors.toList()), + targetCompactionSizeBytes, + candidates.segments.get(0).getDataSource(), + candidates.segments.get(0).getInterval() + ); } } - - compactibleTimelineObjectHolderCursor.next(); } - if (segmentsToCompact.getNumSegments() == 1) { - // Don't compact a single segment - segmentsToCompact.clear(); - } - - return segmentsToCompact; + // Return an empty set if nothing is found + return new SegmentsToCompact(); } /** @@ -510,29 +458,18 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator private static class SegmentsToCompact { - private final List segments = new ArrayList<>(); - private long totalSize; + private final List segments; + private final long totalSize; - private void add(DataSegment segment) + private SegmentsToCompact() { - segments.add(segment); - totalSize += segment.getSize(); + this(Collections.emptyList()); } - private boolean isEmpty() + private SegmentsToCompact(List segments) { - Preconditions.checkState((totalSize == 0) == segments.isEmpty()); - return segments.isEmpty(); - } - - @Nullable - private Interval getIntervalOfLastSegment() - { - if (segments.isEmpty()) { - return null; - } else { - return segments.get(segments.size() - 1).getInterval(); - } + this.segments = segments; + this.totalSize = segments.stream().mapToLong(DataSegment::getSize).sum(); } private int getNumSegments() @@ -545,12 +482,6 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator return totalSize; } - private void clear() - { - segments.clear(); - totalSize = 0; - } - @Override public String toString() { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java index 08a651e1b87..d68c3a0d40b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java @@ -20,21 +20,36 @@ package org.apache.druid.server.coordinator.helper; import com.google.common.base.Preconditions; +import org.apache.druid.timeline.DataSegment; import org.joda.time.Interval; +import javax.annotation.Nullable; +import java.util.List; + /** * Util class used by {@link DruidCoordinatorSegmentCompactor} and {@link CompactionSegmentSearchPolicy}. */ class SegmentCompactorUtil { - static boolean isCompactibleSize(long targetBytes, long currentTotalBytes, long additionalBytes) - { - return currentTotalBytes + additionalBytes <= targetBytes; - } + /** + * The allowed error rate of the segment size after compaction. + * Its value is determined experimentally. + */ + private static final double ALLOWED_ERROR_OF_SEGMENT_SIZE = .2; - static boolean isCompactibleNum(int numTargetSegments, int numCurrentSegments, int numAdditionalSegments) + static boolean needsCompaction(@Nullable Long targetCompactionSizeBytes, List candidates) { - return numCurrentSegments + numAdditionalSegments <= numTargetSegments; + if (targetCompactionSizeBytes == null) { + // If targetCompactionSizeBytes is null, we have no way to check that the given segments need compaction or not. + return true; + } + final double minTargetThreshold = targetCompactionSizeBytes * (1 - ALLOWED_ERROR_OF_SEGMENT_SIZE); + final double maxTargetThreshold = targetCompactionSizeBytes * (1 + ALLOWED_ERROR_OF_SEGMENT_SIZE); + + return candidates + .stream() + .filter(segment -> segment.getSize() < minTargetThreshold || segment.getSize() > maxTargetThreshold) + .count() > 1; } /** diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java index 0eb8b39ccc0..fcc1053b7de 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java @@ -38,7 +38,6 @@ import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; -import org.apache.druid.timeline.partition.NoneShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.ShardSpec; @@ -79,18 +78,6 @@ public class DruidCoordinatorSegmentCompactorTest segments.get(0).getInterval().getStart(), segments.get(segments.size() - 1).getInterval().getEnd() ); - DataSegment compactSegment = new DataSegment( - segments.get(0).getDataSource(), - compactInterval, - "newVersion_" + compactVersionSuffix++, - null, - segments.get(0).getDimensions(), - segments.get(0).getMetrics(), - NoneShardSpec.instance(), - 1, - segments.stream().mapToLong(DataSegment::getSize).sum() - ); - final VersionedIntervalTimeline timeline = dataSources.get(segments.get(0).getDataSource()); segments.forEach( segment -> timeline.remove( @@ -99,11 +86,28 @@ public class DruidCoordinatorSegmentCompactorTest segment.getShardSpec().createChunk(segment) ) ); - timeline.add( - compactInterval, - compactSegment.getVersion(), - compactSegment.getShardSpec().createChunk(compactSegment) - ); + final String version = "newVersion_" + compactVersionSuffix++; + final long segmentSize = segments.stream().mapToLong(DataSegment::getSize).sum() / 2; + for (int i = 0; i < 2; i++) { + DataSegment compactSegment = new DataSegment( + segments.get(0).getDataSource(), + compactInterval, + version, + null, + segments.get(0).getDimensions(), + segments.get(0).getMetrics(), + new NumberedShardSpec(i, 0), + 1, + segmentSize + ); + + timeline.add( + compactInterval, + compactSegment.getVersion(), + compactSegment.getShardSpec().createChunk(compactSegment) + ); + } + return "task_" + idSuffix++; } @@ -129,7 +133,7 @@ public class DruidCoordinatorSegmentCompactorTest for (int i = 0; i < 3; i++) { final String dataSource = DATA_SOURCE_PREFIX + i; for (int j : new int[] {0, 1, 2, 3, 7, 8}) { - for (int k = 0; k < 2; k++) { + for (int k = 0; k < 4; k++) { segments.add(createSegment(dataSource, j, true, k)); segments.add(createSegment(dataSource, j, false, k)); } @@ -187,7 +191,7 @@ public class DruidCoordinatorSegmentCompactorTest } }; int expectedCompactTaskCount = 1; - int expectedRemainingSegments = 200; + int expectedRemainingSegments = 400; // compact for 2017-01-08T12:00:00.000Z/2017-01-09T12:00:00.000Z assertCompactSegments( @@ -197,7 +201,7 @@ public class DruidCoordinatorSegmentCompactorTest expectedCompactTaskCount, expectedVersionSupplier ); - expectedRemainingSegments -= 20; + expectedRemainingSegments -= 40; assertCompactSegments( compactor, Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT00:00:00", 8, 9), @@ -207,7 +211,7 @@ public class DruidCoordinatorSegmentCompactorTest ); // compact for 2017-01-07T12:00:00.000Z/2017-01-08T12:00:00.000Z - expectedRemainingSegments -= 20; + expectedRemainingSegments -= 40; assertCompactSegments( compactor, Intervals.of("2017-01-%02dT00:00:00/2017-01-%02dT12:00:00", 8, 8), @@ -215,7 +219,7 @@ public class DruidCoordinatorSegmentCompactorTest expectedCompactTaskCount, expectedVersionSupplier ); - expectedRemainingSegments -= 20; + expectedRemainingSegments -= 40; assertCompactSegments( compactor, Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT00:00:00", 4, 5), @@ -225,7 +229,7 @@ public class DruidCoordinatorSegmentCompactorTest ); for (int endDay = 4; endDay > 1; endDay -= 1) { - expectedRemainingSegments -= 20; + expectedRemainingSegments -= 40; assertCompactSegments( compactor, Intervals.of("2017-01-%02dT00:00:00/2017-01-%02dT12:00:00", endDay, endDay), @@ -233,7 +237,7 @@ public class DruidCoordinatorSegmentCompactorTest expectedCompactTaskCount, expectedVersionSupplier ); - expectedRemainingSegments -= 20; + expectedRemainingSegments -= 40; assertCompactSegments( compactor, Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT00:00:00", endDay - 1, endDay), @@ -296,10 +300,12 @@ public class DruidCoordinatorSegmentCompactorTest List> holders = dataSources.get(dataSource).lookup(expectedInterval); Assert.assertEquals(1, holders.size()); List> chunks = Lists.newArrayList(holders.get(0).getObject()); - Assert.assertEquals(1, chunks.size()); - DataSegment segment = chunks.get(0).getObject(); - Assert.assertEquals(expectedInterval, segment.getInterval()); - Assert.assertEquals(expectedVersionSupplier.get(), segment.getVersion()); + Assert.assertEquals(2, chunks.size()); + final String expectedVersion = expectedVersionSupplier.get(); + for (PartitionChunk chunk : chunks) { + Assert.assertEquals(expectedInterval, chunk.getObject().getInterval()); + Assert.assertEquals(expectedVersion, chunk.getObject().getVersion()); + } } } @@ -313,7 +319,7 @@ public class DruidCoordinatorSegmentCompactorTest Assert.assertEquals(1, holders.size()); for (TimelineObjectHolder holder : holders) { List> chunks = Lists.newArrayList(holder.getObject()); - Assert.assertEquals(2, chunks.size()); + Assert.assertEquals(4, chunks.size()); for (PartitionChunk chunk : chunks) { DataSegment segment = chunk.getObject(); Assert.assertEquals(interval, segment.getInterval()); @@ -369,7 +375,7 @@ public class DruidCoordinatorSegmentCompactorTest dataSource, 0, 50L, - 50L, + 20L, null, null, new Period("PT1H"), // smaller than segment interval