Auto-compaction with segment granularity retrieve incomplete segments from timeline when interval overlap (#11019)

* Fix Auto-compaction with segment granularity retrieve incomplete segments from timeline when interval overlap

* Fix Auto-compaction with segment granularity retrieve incomplete segments from timeline when interval overlap

* Fix Auto-compaction with segment granularity retrieve incomplete segments from timeline when interval overlap

* Fix Auto-compaction with segment granularity retrieve incomplete segments from timeline when interval overlap

* address comments
This commit is contained in:
Maytas Monsereenusorn 2021-03-24 11:37:29 -07:00 committed by GitHub
parent 8296123d89
commit c87ac0823f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 119 additions and 40 deletions

View File

@ -438,6 +438,67 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
}
@Test
public void testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranularityCoveringMultipleSegmentsInTimeline() throws Exception
{
loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
intervalsBeforeCompaction.sort(null);
// 4 segments across 2 days (4 total)...
verifySegmentsCount(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
Granularity newGranularity = Granularities.YEAR;
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null));
List<String> expectedIntervalAfterCompaction = new ArrayList<>();
// We wil have one segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR)
for (String interval : intervalsBeforeCompaction) {
for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
expectedIntervalAfterCompaction.add(newinterval.toString());
}
}
forceTriggerAutoCompaction(1);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(expectedIntervalAfterCompaction);
loadData(INDEX_TASK);
verifySegmentsCount(5);
verifyQuery(INDEX_QUERIES_RESOURCE);
// 5 segments. 1 compacted YEAR segment and 4 newly ingested DAY segments across 2 days
// We wil have one segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR) from the compaction earlier
// two segments with interval of 2013-08-31/2013-09-01 (newly ingested with DAY)
// and two segments with interval of 2013-09-01/2013-09-02 (newly ingested with DAY)
expectedIntervalAfterCompaction.addAll(intervalsBeforeCompaction);
checkCompactionIntervals(expectedIntervalAfterCompaction);
newGranularity = Granularities.MONTH;
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null));
// This will submit a single compaction task for interval of 2013-01-01/2014-01-01 with MONTH granularity
expectedIntervalAfterCompaction = new ArrayList<>();
// We wil have one segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR) from before the compaction
for (String interval : intervalsBeforeCompaction) {
for (Interval newinterval : Granularities.YEAR.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
expectedIntervalAfterCompaction.add(newinterval.toString());
}
}
// one segments with interval of 2013-09-01/2013-10-01 (compacted with MONTH)
// and one segments with interval of 2013-10-01/2013-11-01 (compacted with MONTH)
for (String interval : intervalsBeforeCompaction) {
for (Interval newinterval : Granularities.MONTH.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
expectedIntervalAfterCompaction.add(newinterval.toString());
}
}
forceTriggerAutoCompaction(3);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(3, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(expectedIntervalAfterCompaction);
}
}
private void loadData(String indexTask) throws Exception
{
String taskSpec = getResourceAsString(indexTask);

View File

@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator.duty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
@ -30,7 +31,6 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
@ -42,13 +42,11 @@ import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NumberedPartitionChunk;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.utils.Streams;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -110,8 +108,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
final DataSourceCompactionConfig config = compactionConfigs.get(dataSource);
Granularity configuredSegmentGranularity = null;
if (config != null && !timeline.isEmpty()) {
Map<Pair<Interval, SegmentId>, ShardSpec> originalShardSpecs = new HashMap<>();
Map<Pair<Interval, SegmentId>, String> originalVersion = new HashMap<>();
VersionedIntervalTimeline<String, DataSegment> originalTimeline = null;
if (config.getGranularitySpec() != null && config.getGranularitySpec().getSegmentGranularity() != null) {
String temporaryVersion = DateTimes.nowUtc().toString();
Map<Interval, Set<DataSegment>> intervalToPartitionMap = new HashMap<>();
@ -135,18 +132,6 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
int partitions = segmentSet.size();
for (DataSegment segment : segmentSet) {
DataSegment segmentsForCompact = segment.withShardSpec(new NumberedShardSpec(partitionNum, partitions));
// PartitionHolder can only holds chunks of one partition space
// However, partition in the new timeline (timelineWithConfiguredSegmentGranularity) can be hold multiple
// partitions of the original timeline (when the new segmentGranularity is larger than the original
// segmentGranularity). Hence, we group all the segments of the original timeline into intervals bucket
// by the new configuredSegmentGranularity. We then convert each segment into a new partition space so that
// there is no duplicate partitionNum across all segments of each new Interval. We will have to save the
// original ShardSpec to convert the segment back when returning from the iterator.
originalShardSpecs.put(new Pair<>(interval, segmentsForCompact.getId()), segment.getShardSpec());
// Segment versions may be mixed in the same time chunk based on new segment granularity
// Hence we stored the original version and create the new timeline with a temporary version,
// setting the fake version to all be the same for the same new time bucket
originalVersion.put(new Pair<>(interval, segmentsForCompact.getId()), segment.getVersion());
timelineWithConfiguredSegmentGranularity.add(
interval,
temporaryVersion,
@ -155,12 +140,24 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
partitionNum += 1;
}
}
// PartitionHolder can only holds chunks of one partition space
// However, partition in the new timeline (timelineWithConfiguredSegmentGranularity) can be hold multiple
// partitions of the original timeline (when the new segmentGranularity is larger than the original
// segmentGranularity). Hence, we group all the segments of the original timeline into intervals bucket
// by the new configuredSegmentGranularity. We then convert each segment into a new partition space so that
// there is no duplicate partitionNum across all segments of each new Interval.
// Similarly, segment versions may be mixed in the same time chunk based on new segment granularity
// Hence we create the new timeline with a temporary version, setting the fake version to all be the same
// for the same new time bucket.
// We need to save and store the originalTimeline so that we can use it
// to get the original ShardSpec and original version back (when converting the segment back to return from this iterator).
originalTimeline = timeline;
timeline = timelineWithConfiguredSegmentGranularity;
}
final List<Interval> searchIntervals =
findInitialSearchInterval(timeline, config.getSkipOffsetFromLatest(), configuredSegmentGranularity, skipIntervals.get(dataSource));
if (!searchIntervals.isEmpty()) {
timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchIntervals, originalShardSpecs, originalVersion));
timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchIntervals, originalTimeline));
}
}
});
@ -249,14 +246,14 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
private static class CompactibleTimelineObjectHolderCursor implements Iterator<List<DataSegment>>
{
private final List<TimelineObjectHolder<String, DataSegment>> holders;
private final Map<Pair<Interval, SegmentId>, ShardSpec> originalShardSpecs;
private final Map<Pair<Interval, SegmentId>, String> originalVersion;
@Nullable
private final VersionedIntervalTimeline<String, DataSegment> originalTimeline;
CompactibleTimelineObjectHolderCursor(
VersionedIntervalTimeline<String, DataSegment> timeline,
List<Interval> totalIntervalsToSearch,
Map<Pair<Interval, SegmentId>, ShardSpec> originalShardSpecs,
Map<Pair<Interval, SegmentId>, String> originalVersion
// originalTimeline can be nullable if timeline was not modified
@Nullable VersionedIntervalTimeline<String, DataSegment> originalTimeline
)
{
this.holders = totalIntervalsToSearch
@ -267,8 +264,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
.filter(holder -> isCompactibleHolder(interval, holder))
)
.collect(Collectors.toList());
this.originalShardSpecs = originalShardSpecs;
this.originalVersion = originalVersion;
this.originalTimeline = originalTimeline;
}
private boolean isCompactibleHolder(Interval interval, TimelineObjectHolder<String, DataSegment> holder)
@ -288,18 +284,6 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
return partitionBytes > 0;
}
private DataSegment transformDataSegmentIfNeeded(DataSegment dataSegment, Interval interval)
{
if (originalShardSpecs != null && originalVersion != null && !originalShardSpecs.isEmpty() && !originalVersion.isEmpty()) {
DataSegment.Builder transformedSegmentBuilder = DataSegment.builder(dataSegment);
transformedSegmentBuilder.shardSpec(originalShardSpecs.get(new Pair<>(interval, dataSegment.getId())));
transformedSegmentBuilder.version(originalVersion.get(new Pair<>(interval, dataSegment.getId())));
return transformedSegmentBuilder.build();
} else {
return dataSegment;
}
}
@Override
public boolean hasNext()
{
@ -313,10 +297,14 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
throw new NoSuchElementException();
}
TimelineObjectHolder<String, DataSegment> timelineObjectHolder = holders.remove(holders.size() - 1);
return Streams.sequentialStreamFrom(timelineObjectHolder.getObject())
.map(PartitionChunk::getObject)
.map(dataSegment -> transformDataSegmentIfNeeded(dataSegment, timelineObjectHolder.getTrueInterval()))
.collect(Collectors.toList());
List<DataSegment> candidates = Streams.sequentialStreamFrom(timelineObjectHolder.getObject())
.map(PartitionChunk::getObject)
.collect(Collectors.toList());
if (originalTimeline != null) {
Interval umbrellaInterval = JodaUtils.umbrellaInterval(candidates.stream().map(DataSegment::getInterval).collect(Collectors.toList()));
return Lists.newArrayList(originalTimeline.findNonOvershadowedObjectsInInterval(umbrellaInterval, Partitions.ONLY_COMPLETE));
}
return candidates;
}
}

View File

@ -934,6 +934,36 @@ public class NewestSegmentFirstPolicyTest
Assert.assertFalse(iterator.hasNext());
}
@Test
public void testIteratorReturnsSegmentsSmallerSegmentGranularityCoveringMultipleSegmentsInTimeline()
{
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
new SegmentGenerateSpec(Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), new Period("P1D"), "1994-04-29T00:00:00.000Z", null),
new SegmentGenerateSpec(Intervals.of("2017-10-01T01:00:00/2017-10-01T02:00:00"), new Period("PT1H"), "1994-04-30T00:00:00.000Z", null)
);
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.HOUR, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
// We should get all segments in timeline back since skip offset is P0D.
// Although the first iteration only covers the last hour of 2017-10-01 (2017-10-01T23:00:00/2017-10-02T00:00:00),
// the iterator will returns all segment as the umbrella interval the DAY segment (2017-10-01T00:00:00/2017-10-02T00:00:00)
// also convers the HOUR segment (2017-10-01T01:00:00/2017-10-01T02:00:00)
Assert.assertTrue(iterator.hasNext());
List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), Partitions.ONLY_COMPLETE)
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
);
// No more
Assert.assertFalse(iterator.hasNext());
}
private static void assertCompactSegmentIntervals(
CompactionSegmentIterator iterator,
Period segmentPeriod,