Fix compact partially overlapping segments (#9905)

* fix compact overlapping segments

* fix comment

* fix CI failure
This commit is contained in:
Yuanli Han 2020-06-09 00:54:39 +08:00 committed by GitHub
parent 45b699fa4a
commit ee7bda5d8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 45 additions and 31 deletions

View File

@ -61,6 +61,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningC
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
@ -98,11 +99,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -524,10 +523,30 @@ public class CompactionTask extends AbstractBatchIndexTask
.add(p)
);
final List<ParallelIndexIngestionSpec> specs = new ArrayList<>(intervalToSegments.size());
for (Entry<Interval, List<Pair<QueryableIndex, DataSegment>>> entry : intervalToSegments.entrySet()) {
final Interval interval = entry.getKey();
final List<Pair<QueryableIndex, DataSegment>> segmentsToCompact = entry.getValue();
// unify overlapping intervals to ensure overlapping segments compacting in the same indexSpec
List<Pair<Interval, List<Pair<QueryableIndex, DataSegment>>>> intervalToSegmentsUnified = new ArrayList<>();
Interval union = null;
List<Pair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
for (Map.Entry<Interval, List<Pair<QueryableIndex, DataSegment>>> entry : intervalToSegments.entrySet()) {
Interval cur = entry.getKey();
if (union == null) {
union = cur;
segments.addAll(entry.getValue());
} else if (union.overlaps(cur)) {
union = Intervals.utc(union.getStartMillis(), Math.max(union.getEndMillis(), cur.getEndMillis()));
segments.addAll(entry.getValue());
} else {
intervalToSegmentsUnified.add(Pair.of(union, segments));
union = cur;
segments = new ArrayList<>(entry.getValue());
}
}
intervalToSegmentsUnified.add(Pair.of(union, segments));
final List<ParallelIndexIngestionSpec> specs = new ArrayList<>(intervalToSegmentsUnified.size());
for (Pair<Interval, List<Pair<QueryableIndex, DataSegment>>> entry : intervalToSegmentsUnified) {
final Interval interval = entry.lhs;
final List<Pair<QueryableIndex, DataSegment>> segmentsToCompact = entry.rhs;
final DataSchema dataSchema = createDataSchema(
segmentProvider.dataSource,
segmentsToCompact,
@ -710,20 +729,8 @@ public class CompactionTask extends AbstractBatchIndexTask
// Dimensions are extracted from the recent segments to olders because recent segments are likely to be queried more
// frequently, and thus the performance should be optimized for recent ones rather than old ones.
// timelineSegments are sorted in order of interval, but we do a sanity check here.
final Comparator<Interval> intervalComparator = Comparators.intervalsByStartThenEnd();
for (int i = 0; i < queryableIndices.size() - 1; i++) {
final Interval shouldBeSmaller = queryableIndices.get(i).lhs.getDataInterval();
final Interval shouldBeLarger = queryableIndices.get(i + 1).lhs.getDataInterval();
Preconditions.checkState(
intervalComparator.compare(shouldBeSmaller, shouldBeLarger) <= 0,
"QueryableIndexes are not sorted! Interval[%s] of segment[%s] is laster than interval[%s] of segment[%s]",
shouldBeSmaller,
queryableIndices.get(i).rhs.getId(),
shouldBeLarger,
queryableIndices.get(i + 1).rhs.getId()
);
}
// sort timelineSegments in order of interval, see https://github.com/apache/druid/pull/9905
queryableIndices.sort((o1, o2) -> Comparators.intervalsByStartThenEnd().compare(o1.rhs.getInterval(), o2.rhs.getInterval()));
int index = 0;
for (Pair<QueryableIndex, DataSegment> pair : Lists.reverse(queryableIndices)) {

View File

@ -73,7 +73,6 @@ import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
@ -160,7 +159,11 @@ public class CompactionTaskTest
Intervals.of("2017-03-01/2017-04-01"),
Intervals.of("2017-04-01/2017-05-01"),
Intervals.of("2017-05-01/2017-06-01"),
Intervals.of("2017-06-01/2017-07-01")
Intervals.of("2017-06-01/2017-07-01"),
// overlapping intervals
Intervals.of("2017-06-01/2017-06-02"),
Intervals.of("2017-06-15/2017-06-16"),
Intervals.of("2017-06-30/2017-07-01")
);
private static final Map<Interval, DimensionSchema> MIXED_TYPE_COLUMN_MAP = new HashMap<>();
private static final ParallelIndexTuningConfig TUNING_CONFIG = createTuningConfig();
@ -191,12 +194,17 @@ public class CompactionTaskTest
MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-05-01/2017-06-01"), new DoubleDimensionSchema(MIXED_TYPE_COLUMN));
MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-06-01/2017-07-01"), new DoubleDimensionSchema(MIXED_TYPE_COLUMN));
MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-06-01/2017-06-02"), new DoubleDimensionSchema(MIXED_TYPE_COLUMN));
MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-06-15/2017-06-16"), new DoubleDimensionSchema(MIXED_TYPE_COLUMN));
MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-06-30/2017-07-01"), new DoubleDimensionSchema(MIXED_TYPE_COLUMN));
DIMENSIONS = new HashMap<>();
AGGREGATORS = new ArrayList<>();
DIMENSIONS.put(ColumnHolder.TIME_COLUMN_NAME, new LongDimensionSchema(ColumnHolder.TIME_COLUMN_NAME));
DIMENSIONS.put(TIMESTAMP_COLUMN, new LongDimensionSchema(TIMESTAMP_COLUMN));
for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) {
int numUmbrellaIntervals = 6;
for (int i = 0; i < numUmbrellaIntervals; i++) {
final StringDimensionSchema schema = new StringDimensionSchema(
"string_dim_" + i,
null,
@ -204,15 +212,15 @@ public class CompactionTaskTest
);
DIMENSIONS.put(schema.getName(), schema);
}
for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) {
for (int i = 0; i < numUmbrellaIntervals; i++) {
final LongDimensionSchema schema = new LongDimensionSchema("long_dim_" + i);
DIMENSIONS.put(schema.getName(), schema);
}
for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) {
for (int i = 0; i < numUmbrellaIntervals; i++) {
final FloatDimensionSchema schema = new FloatDimensionSchema("float_dim_" + i);
DIMENSIONS.put(schema.getName(), schema);
}
for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) {
for (int i = 0; i < numUmbrellaIntervals; i++) {
final DoubleDimensionSchema schema = new DoubleDimensionSchema("double_dim_" + i);
DIMENSIONS.put(schema.getName(), schema);
}
@ -224,14 +232,13 @@ public class CompactionTaskTest
AGGREGATORS.add(new DoubleLastAggregatorFactory("agg_4", "double_dim_4"));
for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) {
final Interval segmentInterval = Intervals.of(StringUtils.format("2017-0%d-01/2017-0%d-01", (i + 1), (i + 2)));
SEGMENT_MAP.put(
new DataSegment(
DATA_SOURCE,
segmentInterval,
"version",
SEGMENT_INTERVALS.get(i),
"version_" + i,
ImmutableMap.of(),
findDimensions(i, segmentInterval),
findDimensions(i, SEGMENT_INTERVALS.get(i)),
AGGREGATORS.stream().map(AggregatorFactory::getName).collect(Collectors.toList()),
new NumberedShardSpec(0, 1),
0,
@ -285,7 +292,7 @@ public class CompactionTaskTest
dimensions.add(TIMESTAMP_COLUMN);
for (int i = 0; i < 6; i++) {
int postfix = i + startIndex;
postfix = postfix >= 6 ? postfix - 6 : postfix;
postfix = postfix % 6;
dimensions.add("string_dim_" + postfix);
dimensions.add("long_dim_" + postfix);
dimensions.add("float_dim_" + postfix);