Fix auto compaction to compact only same or abutting intervals (#6808)

* Fix auto compaction to compact only same or abutting intervals

* fix test
This commit is contained in:
Jihoon Son 2019-01-16 14:54:11 -08:00 committed by Jonathan Wei
parent 5b8a221713
commit a07e66c540
3 changed files with 91 additions and 58 deletions

View File

@ -269,6 +269,15 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
final List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(timeChunkHolder.getObject().iterator());
final long timeChunkSizeBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum();
final boolean isSameOrAbuttingInterval;
final Interval lastInterval = segmentsToCompact.getIntervalOfLastSegment();
if (lastInterval == null) {
isSameOrAbuttingInterval = true;
} else {
final Interval currentInterval = chunks.get(0).getObject().getInterval();
isSameOrAbuttingInterval = currentInterval.isEqual(lastInterval) || currentInterval.abuts(lastInterval);
}
// The segments in a holder should be added all together or not.
final boolean isCompactibleSize = SegmentCompactorUtil.isCompactibleSize(
inputSegmentSize,
@ -280,7 +289,10 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
segmentsToCompact.getNumSegments(),
chunks.size()
);
if (isCompactibleSize && isCompactibleNum && (!keepSegmentGranularity || segmentsToCompact.isEmpty())) {
if (isCompactibleSize
&& isCompactibleNum
&& isSameOrAbuttingInterval
&& (!keepSegmentGranularity || segmentsToCompact.isEmpty())) {
chunks.forEach(chunk -> segmentsToCompact.add(chunk.getObject()));
} else {
if (segmentsToCompact.getNumSegments() > 1) {
@ -514,6 +526,16 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
return segments.isEmpty();
}
@Nullable
private Interval getIntervalOfLastSegment()
{
if (segments.isEmpty()) {
return null;
} else {
return segments.get(segments.size() - 1).getInterval();
}
}
private int getNumSegments()
{
return segments.size();

View File

@ -226,23 +226,23 @@ public class DruidCoordinatorSegmentCompactorTest
expectedVersionSupplier
);
// compact for 2017-01-07T12:00:00.000Z/2017-01-08T12:00:00.000Z
expectedRemainingSegments -= 40;
// compact for 2017-01-08T00:00:00.000Z/2017-01-08T12:00:00.000Z
expectedRemainingSegments -= 20;
assertCompactSegments(
compactor,
keepSegmentGranularity,
Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT12:00:00", 4, 8),
Intervals.of("2017-01-%02dT00:00:00/2017-01-%02dT12:00:00", 8, 8),
expectedRemainingSegments,
expectedCompactTaskCount,
expectedVersionSupplier
);
for (int endDay = 4; endDay > 1; endDay -= 1) {
for (int endDay = 5; endDay > 1; endDay -= 1) {
expectedRemainingSegments -= 40;
assertCompactSegments(
compactor,
keepSegmentGranularity,
Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT12:00:00", endDay - 1, endDay),
Intervals.of("2017-01-%02dT00:00:00/2017-01-%02dT00:00:00", endDay - 1, endDay),
expectedRemainingSegments,
expectedCompactTaskCount,
expectedVersionSupplier
@ -362,7 +362,7 @@ public class DruidCoordinatorSegmentCompactorTest
// One of dataSource is compacted
if (expectedRemainingSegments > 0) {
// If expectedRemainingSegments is positive, we check how many dataSources have the segments waiting
// If expectedRemainingSegments is positive, we check how many dataSources have the segments waiting for
// compaction.
long numDataSourceOfExpectedRemainingSegments = stats
.getDataSources(DruidCoordinatorSegmentCompactor.SEGMENT_SIZE_WAIT_COMPACT)

View File

@ -110,62 +110,19 @@ public class NewestSegmentFirstPolicyTest
assertCompactSegmentIntervals(
iterator,
segmentPeriod,
Intervals.of("2017-11-16T21:00:00/2017-11-16T22:00:00"),
Intervals.of("2017-11-16T20:00:00/2017-11-16T21:00:00"),
Intervals.of("2017-11-17T02:00:00/2017-11-17T03:00:00"),
false
);
final List<DataSegment> segments = iterator.next();
Assert.assertNotNull(segments);
if (keepSegmentGranularity) {
// If keepSegmentGranularity = true, the iterator returns the segments of only the next time chunk.
Assert.assertEquals(4, segments.size());
final List<Interval> expectedIntervals = new ArrayList<>(segments.size());
for (int i = 0; i < 4; i++) {
expectedIntervals.add(Intervals.of("2017-11-16T20:00:00/2017-11-16T21:00:00"));
}
Assert.assertEquals(
expectedIntervals,
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
);
assertCompactSegmentIntervals(
iterator,
segmentPeriod,
Intervals.of("2017-11-16T06:00:00/2017-11-16T07:00:00"),
Intervals.of("2017-11-16T06:00:00/2017-11-16T07:00:00"),
false
);
} else {
// If keepSegmentGranularity = false, the returned segments can span over multiple time chunks.
Assert.assertEquals(8, segments.size());
final List<Interval> expectedIntervals = new ArrayList<>(segments.size());
for (int i = 0; i < 4; i++) {
expectedIntervals.add(Intervals.of("2017-11-16T06:00:00/2017-11-16T07:00:00"));
}
for (int i = 0; i < 4; i++) {
expectedIntervals.add(Intervals.of("2017-11-16T20:00:00/2017-11-16T21:00:00"));
}
expectedIntervals.sort(Comparators.intervalsByStartThenEnd());
Assert.assertEquals(
expectedIntervals,
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
);
assertCompactSegmentIntervals(
iterator,
segmentPeriod,
Intervals.of("2017-11-14T00:00:00/2017-11-14T01:00:00"),
Intervals.of("2017-11-16T05:00:00/2017-11-16T06:00:00"),
Intervals.of("2017-11-16T06:00:00/2017-11-16T07:00:00"),
true
);
}
}
@Test
public void testLargeGapInData()
@ -588,6 +545,52 @@ public class NewestSegmentFirstPolicyTest
);
}
@Test
public void testHoleInSearchInterval()
{
final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, 100, new Period("PT1H"))),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
new SegmentGenerateSpec(Intervals.of("2017-11-16T00:00:00/2017-11-17T00:00:00"), segmentPeriod)
)
),
ImmutableMap.of(
DATA_SOURCE,
ImmutableList.of(
Intervals.of("2017-11-16T04:00:00/2017-11-16T10:00:00"),
Intervals.of("2017-11-16T14:00:00/2017-11-16T20:00:00")
)
)
);
assertCompactSegmentIntervals(
iterator,
segmentPeriod,
Intervals.of("2017-11-16T20:00:00/2017-11-16T21:00:00"),
Intervals.of("2017-11-16T22:00:00/2017-11-16T23:00:00"),
false
);
assertCompactSegmentIntervals(
iterator,
segmentPeriod,
Intervals.of("2017-11-16T10:00:00/2017-11-16T11:00:00"),
Intervals.of("2017-11-16T13:00:00/2017-11-16T14:00:00"),
false
);
assertCompactSegmentIntervals(
iterator,
segmentPeriod,
Intervals.of("2017-11-16T00:00:00/2017-11-16T01:00:00"),
Intervals.of("2017-11-16T03:00:00/2017-11-16T04:00:00"),
true
);
}
private static void assertCompactSegmentIntervals(
CompactionSegmentIterator iterator,
Period segmentPeriod,
@ -600,6 +603,14 @@ public class NewestSegmentFirstPolicyTest
while (iterator.hasNext()) {
final List<DataSegment> segments = iterator.next();
final Interval firstInterval = segments.get(0).getInterval();
Assert.assertTrue(
"Intervals should be same or abutting",
segments.stream().allMatch(
segment -> segment.getInterval().isEqual(firstInterval) || segment.getInterval().abuts(firstInterval)
)
);
final List<Interval> expectedIntervals = new ArrayList<>(segments.size());
for (int i = 0; i < segments.size(); i++) {
if (i > 0 && i % DEFAULT_NUM_SEGMENTS_PER_SHARD == 0) {