Check targetCompactionSizeBytes to search for candidate segments in auto compaction (#8495)

* Check targetCompactionSizeBytes to search for candidate segments in auto compaction

* fix logs

* add javadoc

* rename
This commit is contained in:
Jihoon Son 2019-09-09 23:11:08 -07:00 committed by Fangjin Yang
parent 5f61374cb3
commit 762f4d0e58
3 changed files with 122 additions and 170 deletions

View File

@ -40,6 +40,7 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -181,9 +182,9 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
/**
* Iterates the given {@link VersionedIntervalTimeline}. Only compactible {@link TimelineObjectHolder}s are returned,
* which means the holder always has at least one {@link DataSegment} and the total size of segments is larger than 0.
* which means the holder always has at least two {@link DataSegment}s.
*/
private static class CompactibleTimelineObjectHolderCursor
private static class CompactibleTimelineObjectHolderCursor implements Iterator<List<DataSegment>>
{
private final List<TimelineObjectHolder<String, DataSegment>> holders;
@ -200,7 +201,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
.filter(holder -> {
final List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(holder.getObject().iterator());
final long partitionBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum();
return chunks.size() > 0
return chunks.size() > 1
&& partitionBytes > 0
&& interval.contains(chunks.get(0).getObject().getInterval());
})
@ -208,32 +209,23 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
.collect(Collectors.toList());
}
boolean hasNext()
@Override
public boolean hasNext()
{
return !holders.isEmpty();
}
/**
* Returns the latest holder.
*/
@Nullable
TimelineObjectHolder<String, DataSegment> get()
@Override
public List<DataSegment> next()
{
if (holders.isEmpty()) {
return null;
} else {
return holders.get(holders.size() - 1);
}
}
/**
* Removes the latest holder, so that {@link #get()} returns the next one.
*/
void next()
{
if (!holders.isEmpty()) {
holders.remove(holders.size() - 1);
throw new NoSuchElementException();
}
return holders.remove(holders.size() - 1)
.getObject()
.stream()
.map(PartitionChunk::getObject)
.collect(Collectors.toList());
}
}
@ -254,103 +246,59 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
)
{
final long inputSegmentSize = config.getInputSegmentSizeBytes();
final @Nullable Long targetCompactionSizeBytes = config.getTargetCompactionSizeBytes();
final int maxNumSegmentsToCompact = config.getMaxNumSegmentsToCompact();
final SegmentsToCompact segmentsToCompact = new SegmentsToCompact();
// Finds segments to compact together while iterating timeline from latest to oldest
while (compactibleTimelineObjectHolderCursor.hasNext()
&& segmentsToCompact.getTotalSize() < inputSegmentSize
&& segmentsToCompact.getNumSegments() < maxNumSegmentsToCompact) {
final TimelineObjectHolder<String, DataSegment> timeChunkHolder = Preconditions.checkNotNull(
compactibleTimelineObjectHolderCursor.get(),
"timelineObjectHolder"
while (compactibleTimelineObjectHolderCursor.hasNext()) {
final SegmentsToCompact candidates = new SegmentsToCompact(compactibleTimelineObjectHolderCursor.next());
final boolean isCompactibleSize = candidates.getTotalSize() <= inputSegmentSize;
final boolean isCompactibleNum = candidates.getNumSegments() <= maxNumSegmentsToCompact;
final boolean needsCompaction = SegmentCompactorUtil.needsCompaction(
targetCompactionSizeBytes,
candidates.segments
);
final List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(timeChunkHolder.getObject().iterator());
final long timeChunkSizeBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum();
final boolean isSameOrAbuttingInterval;
final Interval lastInterval = segmentsToCompact.getIntervalOfLastSegment();
if (lastInterval == null) {
isSameOrAbuttingInterval = true;
if (isCompactibleSize && isCompactibleNum && needsCompaction) {
return candidates;
} else {
final Interval currentInterval = chunks.get(0).getObject().getInterval();
isSameOrAbuttingInterval = currentInterval.isEqual(lastInterval) || currentInterval.abuts(lastInterval);
}
// The segments in a holder should be added all together or not.
final boolean isCompactibleSize = SegmentCompactorUtil.isCompactibleSize(
inputSegmentSize,
segmentsToCompact.getTotalSize(),
timeChunkSizeBytes
);
final boolean isCompactibleNum = SegmentCompactorUtil.isCompactibleNum(
maxNumSegmentsToCompact,
segmentsToCompact.getNumSegments(),
chunks.size()
);
if (isCompactibleSize
&& isCompactibleNum
&& isSameOrAbuttingInterval
&& segmentsToCompact.isEmpty()) {
chunks.forEach(chunk -> segmentsToCompact.add(chunk.getObject()));
} else {
if (segmentsToCompact.getNumSegments() > 1) {
// We found some segmens to compact and cannot add more. End here.
return segmentsToCompact;
} else {
if (!SegmentCompactorUtil.isCompactibleSize(inputSegmentSize, 0, timeChunkSizeBytes)) {
final DataSegment segment = chunks.get(0).getObject();
segmentsToCompact.clear();
log.warn(
"shardSize[%d] for dataSource[%s] and interval[%s] is larger than inputSegmentSize[%d]."
+ " Continue to the next shard.",
timeChunkSizeBytes,
segment.getDataSource(),
segment.getInterval(),
inputSegmentSize
);
} else if (maxNumSegmentsToCompact < chunks.size()) {
final DataSegment segment = chunks.get(0).getObject();
segmentsToCompact.clear();
log.warn(
"The number of segments[%d] for dataSource[%s] and interval[%s] is larger than "
+ "maxNumSegmentsToCompact[%d]. If you see lots of shards are being skipped due to too many "
+ "segments, consider increasing 'numTargetCompactionSegments' and "
+ "'druid.indexer.runner.maxZnodeBytes'. Continue to the next shard.",
chunks.size(),
segment.getDataSource(),
segment.getInterval(),
maxNumSegmentsToCompact
);
} else {
if (segmentsToCompact.getNumSegments() == 1) {
// We found a segment which is smaller than targetCompactionSize but too large to compact with other
// segments. Skip this one.
segmentsToCompact.clear();
chunks.forEach(chunk -> segmentsToCompact.add(chunk.getObject()));
} else {
throw new ISE(
"Cannot compact segments[%s]. shardBytes[%s], numSegments[%s] "
+ "with current segmentsToCompact[%s]",
chunks.stream().map(PartitionChunk::getObject).collect(Collectors.toList()),
timeChunkSizeBytes,
chunks.size(),
segmentsToCompact
);
}
}
if (!isCompactibleSize) {
log.warn(
"total segment size[%d] for datasource[%s] and interval[%s] is larger than inputSegmentSize[%d]."
+ " Continue to the next interval.",
candidates.getTotalSize(),
candidates.segments.get(0).getDataSource(),
candidates.segments.get(0).getInterval(),
inputSegmentSize
);
}
if (!isCompactibleNum) {
log.warn(
"Number of segments[%d] for datasource[%s] and interval[%s] is larger than "
+ "maxNumSegmentsToCompact[%d]. If you see lots of shards are being skipped due to too many "
+ "segments, consider increasing 'numTargetCompactionSegments' and "
+ "'druid.indexer.runner.maxZnodeBytes'. Continue to the next interval.",
candidates.getNumSegments(),
candidates.segments.get(0).getDataSource(),
candidates.segments.get(0).getInterval(),
maxNumSegmentsToCompact
);
}
if (!needsCompaction) {
log.warn(
"Size of most of segments[%s] is larger than targetCompactionSizeBytes[%s] "
+ "for datasource[%s] and interval[%s]. Skipping compaction for this interval.",
candidates.segments.stream().map(DataSegment::getSize).collect(Collectors.toList()),
targetCompactionSizeBytes,
candidates.segments.get(0).getDataSource(),
candidates.segments.get(0).getInterval()
);
}
}
compactibleTimelineObjectHolderCursor.next();
}
if (segmentsToCompact.getNumSegments() == 1) {
// Don't compact a single segment
segmentsToCompact.clear();
}
return segmentsToCompact;
// Return an empty set if nothing is found
return new SegmentsToCompact();
}
/**
@ -510,29 +458,18 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
private static class SegmentsToCompact
{
private final List<DataSegment> segments = new ArrayList<>();
private long totalSize;
private final List<DataSegment> segments;
private final long totalSize;
private void add(DataSegment segment)
private SegmentsToCompact()
{
segments.add(segment);
totalSize += segment.getSize();
this(Collections.emptyList());
}
private boolean isEmpty()
private SegmentsToCompact(List<DataSegment> segments)
{
Preconditions.checkState((totalSize == 0) == segments.isEmpty());
return segments.isEmpty();
}
@Nullable
private Interval getIntervalOfLastSegment()
{
if (segments.isEmpty()) {
return null;
} else {
return segments.get(segments.size() - 1).getInterval();
}
this.segments = segments;
this.totalSize = segments.stream().mapToLong(DataSegment::getSize).sum();
}
private int getNumSegments()
@ -545,12 +482,6 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
return totalSize;
}
private void clear()
{
segments.clear();
totalSize = 0;
}
@Override
public String toString()
{

View File

@ -20,21 +20,36 @@
package org.apache.druid.server.coordinator.helper;
import com.google.common.base.Preconditions;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.List;
/**
* Util class used by {@link DruidCoordinatorSegmentCompactor} and {@link CompactionSegmentSearchPolicy}.
*/
class SegmentCompactorUtil
{
static boolean isCompactibleSize(long targetBytes, long currentTotalBytes, long additionalBytes)
{
return currentTotalBytes + additionalBytes <= targetBytes;
}
/**
* The allowed error rate of the segment size after compaction.
* Its value is determined experimentally.
*/
private static final double ALLOWED_ERROR_OF_SEGMENT_SIZE = .2;
static boolean isCompactibleNum(int numTargetSegments, int numCurrentSegments, int numAdditionalSegments)
static boolean needsCompaction(@Nullable Long targetCompactionSizeBytes, List<DataSegment> candidates)
{
return numCurrentSegments + numAdditionalSegments <= numTargetSegments;
if (targetCompactionSizeBytes == null) {
// If targetCompactionSizeBytes is null, we have no way to check that the given segments need compaction or not.
return true;
}
final double minTargetThreshold = targetCompactionSizeBytes * (1 - ALLOWED_ERROR_OF_SEGMENT_SIZE);
final double maxTargetThreshold = targetCompactionSizeBytes * (1 + ALLOWED_ERROR_OF_SEGMENT_SIZE);
return candidates
.stream()
.filter(segment -> segment.getSize() < minTargetThreshold || segment.getSize() > maxTargetThreshold)
.count() > 1;
}
/**

View File

@ -38,7 +38,6 @@ import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.ShardSpec;
@ -79,18 +78,6 @@ public class DruidCoordinatorSegmentCompactorTest
segments.get(0).getInterval().getStart(),
segments.get(segments.size() - 1).getInterval().getEnd()
);
DataSegment compactSegment = new DataSegment(
segments.get(0).getDataSource(),
compactInterval,
"newVersion_" + compactVersionSuffix++,
null,
segments.get(0).getDimensions(),
segments.get(0).getMetrics(),
NoneShardSpec.instance(),
1,
segments.stream().mapToLong(DataSegment::getSize).sum()
);
final VersionedIntervalTimeline<String, DataSegment> timeline = dataSources.get(segments.get(0).getDataSource());
segments.forEach(
segment -> timeline.remove(
@ -99,11 +86,28 @@ public class DruidCoordinatorSegmentCompactorTest
segment.getShardSpec().createChunk(segment)
)
);
timeline.add(
compactInterval,
compactSegment.getVersion(),
compactSegment.getShardSpec().createChunk(compactSegment)
);
final String version = "newVersion_" + compactVersionSuffix++;
final long segmentSize = segments.stream().mapToLong(DataSegment::getSize).sum() / 2;
for (int i = 0; i < 2; i++) {
DataSegment compactSegment = new DataSegment(
segments.get(0).getDataSource(),
compactInterval,
version,
null,
segments.get(0).getDimensions(),
segments.get(0).getMetrics(),
new NumberedShardSpec(i, 0),
1,
segmentSize
);
timeline.add(
compactInterval,
compactSegment.getVersion(),
compactSegment.getShardSpec().createChunk(compactSegment)
);
}
return "task_" + idSuffix++;
}
@ -129,7 +133,7 @@ public class DruidCoordinatorSegmentCompactorTest
for (int i = 0; i < 3; i++) {
final String dataSource = DATA_SOURCE_PREFIX + i;
for (int j : new int[] {0, 1, 2, 3, 7, 8}) {
for (int k = 0; k < 2; k++) {
for (int k = 0; k < 4; k++) {
segments.add(createSegment(dataSource, j, true, k));
segments.add(createSegment(dataSource, j, false, k));
}
@ -187,7 +191,7 @@ public class DruidCoordinatorSegmentCompactorTest
}
};
int expectedCompactTaskCount = 1;
int expectedRemainingSegments = 200;
int expectedRemainingSegments = 400;
// compact for 2017-01-08T12:00:00.000Z/2017-01-09T12:00:00.000Z
assertCompactSegments(
@ -197,7 +201,7 @@ public class DruidCoordinatorSegmentCompactorTest
expectedCompactTaskCount,
expectedVersionSupplier
);
expectedRemainingSegments -= 20;
expectedRemainingSegments -= 40;
assertCompactSegments(
compactor,
Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT00:00:00", 8, 9),
@ -207,7 +211,7 @@ public class DruidCoordinatorSegmentCompactorTest
);
// compact for 2017-01-07T12:00:00.000Z/2017-01-08T12:00:00.000Z
expectedRemainingSegments -= 20;
expectedRemainingSegments -= 40;
assertCompactSegments(
compactor,
Intervals.of("2017-01-%02dT00:00:00/2017-01-%02dT12:00:00", 8, 8),
@ -215,7 +219,7 @@ public class DruidCoordinatorSegmentCompactorTest
expectedCompactTaskCount,
expectedVersionSupplier
);
expectedRemainingSegments -= 20;
expectedRemainingSegments -= 40;
assertCompactSegments(
compactor,
Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT00:00:00", 4, 5),
@ -225,7 +229,7 @@ public class DruidCoordinatorSegmentCompactorTest
);
for (int endDay = 4; endDay > 1; endDay -= 1) {
expectedRemainingSegments -= 20;
expectedRemainingSegments -= 40;
assertCompactSegments(
compactor,
Intervals.of("2017-01-%02dT00:00:00/2017-01-%02dT12:00:00", endDay, endDay),
@ -233,7 +237,7 @@ public class DruidCoordinatorSegmentCompactorTest
expectedCompactTaskCount,
expectedVersionSupplier
);
expectedRemainingSegments -= 20;
expectedRemainingSegments -= 40;
assertCompactSegments(
compactor,
Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT00:00:00", endDay - 1, endDay),
@ -296,10 +300,12 @@ public class DruidCoordinatorSegmentCompactorTest
List<TimelineObjectHolder<String, DataSegment>> holders = dataSources.get(dataSource).lookup(expectedInterval);
Assert.assertEquals(1, holders.size());
List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(holders.get(0).getObject());
Assert.assertEquals(1, chunks.size());
DataSegment segment = chunks.get(0).getObject();
Assert.assertEquals(expectedInterval, segment.getInterval());
Assert.assertEquals(expectedVersionSupplier.get(), segment.getVersion());
Assert.assertEquals(2, chunks.size());
final String expectedVersion = expectedVersionSupplier.get();
for (PartitionChunk<DataSegment> chunk : chunks) {
Assert.assertEquals(expectedInterval, chunk.getObject().getInterval());
Assert.assertEquals(expectedVersion, chunk.getObject().getVersion());
}
}
}
@ -313,7 +319,7 @@ public class DruidCoordinatorSegmentCompactorTest
Assert.assertEquals(1, holders.size());
for (TimelineObjectHolder<String, DataSegment> holder : holders) {
List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(holder.getObject());
Assert.assertEquals(2, chunks.size());
Assert.assertEquals(4, chunks.size());
for (PartitionChunk<DataSegment> chunk : chunks) {
DataSegment segment = chunk.getObject();
Assert.assertEquals(interval, segment.getInterval());
@ -369,7 +375,7 @@ public class DruidCoordinatorSegmentCompactorTest
dataSource,
0,
50L,
50L,
20L,
null,
null,
new Period("PT1H"), // smaller than segment interval