Fix bug in auto compaction needsCompaction method that can skip segments incorrectly (#11366)

* fix bug in needsCompaction

* fix bug in needsCompaction
This commit is contained in:
Maytas Monsereenusorn 2021-06-23 13:03:41 -07:00 committed by GitHub
parent 267c298293
commit 911a0c6c8c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 65 additions and 11 deletions

View File

@ -365,7 +365,6 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
} else {
configuredIndexSpec = tuningConfig.getIndexSpec();
}
boolean needsCompaction = false;
if (!Objects.equals(partitionsSpecFromConfig, segmentPartitionsSpec)) {
log.info(
"Configured partitionsSpec[%s] is differenet from "
@ -373,7 +372,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
partitionsSpecFromConfig,
segmentPartitionsSpec
);
needsCompaction = true;
return true;
}
// segmentIndexSpec cannot be null.
if (!segmentIndexSpec.equals(configuredIndexSpec)) {
@ -382,7 +381,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
configuredIndexSpec,
segmentIndexSpec
);
needsCompaction = true;
return true;
}
if (config.getGranularitySpec() != null && config.getGranularitySpec().getSegmentGranularity() != null) {
@ -393,24 +392,28 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
if (existingSegmentGranularity == null) {
// Candidate segments were all compacted without segment granularity set.
// We need to check if all segments have the same segment granularity as the configured segment granularity.
needsCompaction = candidates.segments.stream()
boolean needsCompaction = candidates.segments.stream()
.anyMatch(segment -> !config.getGranularitySpec().getSegmentGranularity().isAligned(segment.getInterval()));
log.info(
"Segments were previously compacted but without segmentGranularity in auto compaction."
+ " Configured segmentGranularity[%s] is different from granularity implied by segment intervals. Needs compaction",
config.getGranularitySpec().getSegmentGranularity()
);
if (needsCompaction) {
log.info(
"Segments were previously compacted but without segmentGranularity in auto compaction."
+ " Configured segmentGranularity[%s] is different from granularity implied by segment intervals. Needs compaction",
config.getGranularitySpec().getSegmentGranularity()
);
return true;
}
} else if (!config.getGranularitySpec().getSegmentGranularity().equals(existingSegmentGranularity)) {
log.info(
"Configured segmentGranularity[%s] is different from the segmentGranularity[%s] of segments. Needs compaction",
config.getGranularitySpec().getSegmentGranularity(),
existingSegmentGranularity
);
needsCompaction = true;
return true;
}
}
return needsCompaction;
return false;
}
/**

View File

@ -35,6 +35,7 @@ import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.ConciseBitmapSerdeFactory;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.timeline.CompactionState;
@ -964,6 +965,56 @@ public class NewestSegmentFirstPolicyTest
Assert.assertFalse(iterator.hasNext());
}
@Test
public void testIteratorReturnsSegmentsAsCompactionStateChangedWithCompactedStateHasSameSegmentGranularity()
{
// Different indexSpec as what is set in the auto compaction config
IndexSpec newIndexSpec = new IndexSpec(new ConciseBitmapSerdeFactory(), null, null, null);
Map<String, Object> newIndexSpecMap = mapper.convertValue(newIndexSpec, new TypeReference<Map<String, Object>>() {});
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
new SegmentGenerateSpec(
Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"),
new Period("P1D"),
null,
new CompactionState(partitionsSpec, newIndexSpecMap, null)
)
);
// Duration of new segmentGranularity is the same as before (P1D)
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE,
createCompactionConfig(
130000,
new Period("P0D"),
new UserCompactionTaskGranularityConfig(
new PeriodGranularity(
new Period("P1D"),
null,
DateTimeZone.UTC
),
null
)
)
),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
// We should get all segments in timeline back since indexSpec changed
Assert.assertTrue(iterator.hasNext());
List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-10-03T00:00:00"), Partitions.ONLY_COMPLETE)
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
);
// No more
Assert.assertFalse(iterator.hasNext());
}
private static void assertCompactSegmentIntervals(
CompactionSegmentIterator iterator,
Period segmentPeriod,