Fix auto compaction when the firstSegment is in skipOffset (#6738)

* Fix auto compaction when the firstSegment is in skipOffset

* remove duplicate
This commit is contained in:
Jihoon Son 2018-12-18 03:10:46 -08:00 committed by Benedict Jin
parent 2c380e3a26
commit f0ee6bf898
2 changed files with 70 additions and 20 deletions

View File

@ -30,6 +30,7 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
@ -80,7 +81,9 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
if (config != null && !timeline.isEmpty()) {
final Interval searchInterval = findInitialSearchInterval(timeline, config.getSkipOffsetFromLatest());
timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchInterval));
if (searchInterval != null) {
timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchInterval));
}
}
}
@ -339,8 +342,9 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
* @param timeline timeline of a dataSource
* @param skipOffset skipOFfset
*
* @return found searchInterval
* @return found interval to search or null if it's not found
*/
@Nullable
private static Interval findInitialSearchInterval(
VersionedIntervalTimeline<String, DataSegment> timeline,
Period skipOffset
@ -354,25 +358,31 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
final Interval skipInterval = new Interval(skipOffset, last.getInterval().getEnd());
final List<TimelineObjectHolder<String, DataSegment>> holders = timeline.lookup(
new Interval(first.getInterval().getStart(), last.getInterval().getEnd().minus(skipOffset))
);
final List<DataSegment> segments = holders
.stream()
.flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false))
.map(PartitionChunk::getObject)
.filter(segment -> !segment.getInterval().overlaps(skipInterval))
.sorted((s1, s2) -> Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), s2.getInterval()))
.collect(Collectors.toList());
if (segments.isEmpty()) {
return new Interval(first.getInterval().getStart(), first.getInterval().getStart());
} else {
return new Interval(
segments.get(0).getInterval().getStart(),
segments.get(segments.size() - 1).getInterval().getEnd()
final DateTime lookupStart = first.getInterval().getStart();
final DateTime lookupEnd = last.getInterval().getEnd().minus(skipOffset);
if (lookupStart.isBefore(lookupEnd)) {
final List<TimelineObjectHolder<String, DataSegment>> holders = timeline.lookup(
new Interval(lookupStart, lookupEnd)
);
final List<DataSegment> segments = holders
.stream()
.flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false))
.map(PartitionChunk::getObject)
.filter(segment -> !segment.getInterval().overlaps(skipInterval))
.sorted((s1, s2) -> Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), s2.getInterval()))
.collect(Collectors.toList());
if (segments.isEmpty()) {
return null;
} else {
return new Interval(
segments.get(0).getInterval().getStart(),
segments.get(segments.size() - 1).getInterval().getEnd()
);
}
} else {
return null;
}
}

View File

@ -496,6 +496,46 @@ public class NewestSegmentFirstPolicyTest
Assert.assertFalse(iterator.hasNext());
}
@Test
public void testIfFirstSegmentIsInSkipOffset()
{
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
new SegmentGenerateSpec(
Intervals.of("2017-12-02T14:00:00/2017-12-03T00:00:00"),
new Period("PT5H"),
40000,
1
)
);
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, 100, new Period("P1D"))),
ImmutableMap.of(DATA_SOURCE, timeline)
);
Assert.assertFalse(iterator.hasNext());
}
@Test
public void testIfFirstSegmentOverlapsSkipOffset()
{
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
new SegmentGenerateSpec(
Intervals.of("2017-12-01T23:00:00/2017-12-03T00:00:00"),
new Period("PT5H"),
40000,
1
)
);
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, 100, new Period("P1D"))),
ImmutableMap.of(DATA_SOURCE, timeline)
);
Assert.assertFalse(iterator.hasNext());
}
private static void assertCompactSegmentIntervals(
CompactionSegmentIterator iterator,
Period segmentPeriod,