diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index dac3bd9a095..59c9019e639 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -61,6 +61,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningC import org.apache.druid.indexing.input.DruidInputSource; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.RE; @@ -98,11 +99,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.TreeMap; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -524,10 +523,30 @@ public class CompactionTask extends AbstractBatchIndexTask .add(p) ); - final List specs = new ArrayList<>(intervalToSegments.size()); - for (Entry>> entry : intervalToSegments.entrySet()) { - final Interval interval = entry.getKey(); - final List> segmentsToCompact = entry.getValue(); + // unify overlapping intervals to ensure overlapping segments compacting in the same indexSpec + List>>> intervalToSegmentsUnified = new ArrayList<>(); + Interval union = null; + List> segments = new ArrayList<>(); + for (Map.Entry>> entry : intervalToSegments.entrySet()) { + Interval cur = entry.getKey(); + if (union == null) { + union = cur; + segments.addAll(entry.getValue()); + } else if (union.overlaps(cur)) { + union = Intervals.utc(union.getStartMillis(), Math.max(union.getEndMillis(), cur.getEndMillis())); + segments.addAll(entry.getValue()); + } else { + intervalToSegmentsUnified.add(Pair.of(union, segments)); + union = cur; + segments = new ArrayList<>(entry.getValue()); + } + } + intervalToSegmentsUnified.add(Pair.of(union, segments)); + + final List specs = new ArrayList<>(intervalToSegmentsUnified.size()); + for (Pair>> entry : intervalToSegmentsUnified) { + final Interval interval = entry.lhs; + final List> segmentsToCompact = entry.rhs; final DataSchema dataSchema = createDataSchema( segmentProvider.dataSource, segmentsToCompact, @@ -710,20 +729,8 @@ public class CompactionTask extends AbstractBatchIndexTask // Dimensions are extracted from the recent segments to olders because recent segments are likely to be queried more // frequently, and thus the performance should be optimized for recent ones rather than old ones. - // timelineSegments are sorted in order of interval, but we do a sanity check here. - final Comparator intervalComparator = Comparators.intervalsByStartThenEnd(); - for (int i = 0; i < queryableIndices.size() - 1; i++) { - final Interval shouldBeSmaller = queryableIndices.get(i).lhs.getDataInterval(); - final Interval shouldBeLarger = queryableIndices.get(i + 1).lhs.getDataInterval(); - Preconditions.checkState( - intervalComparator.compare(shouldBeSmaller, shouldBeLarger) <= 0, - "QueryableIndexes are not sorted! Interval[%s] of segment[%s] is laster than interval[%s] of segment[%s]", - shouldBeSmaller, - queryableIndices.get(i).rhs.getId(), - shouldBeLarger, - queryableIndices.get(i + 1).rhs.getId() - ); - } + // sort timelineSegments in order of interval, see https://github.com/apache/druid/pull/9905 + queryableIndices.sort((o1, o2) -> Comparators.intervalsByStartThenEnd().compare(o1.rhs.getInterval(), o2.rhs.getInterval())); int index = 0; for (Pair pair : Lists.reverse(queryableIndices)) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 1d4bec4f99e..66ee8595d25 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -73,7 +73,6 @@ import org.apache.druid.indexing.input.DruidInputSource; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.PeriodGranularity; @@ -160,7 +159,11 @@ public class CompactionTaskTest Intervals.of("2017-03-01/2017-04-01"), Intervals.of("2017-04-01/2017-05-01"), Intervals.of("2017-05-01/2017-06-01"), - Intervals.of("2017-06-01/2017-07-01") + Intervals.of("2017-06-01/2017-07-01"), + // overlapping intervals + Intervals.of("2017-06-01/2017-06-02"), + Intervals.of("2017-06-15/2017-06-16"), + Intervals.of("2017-06-30/2017-07-01") ); private static final Map MIXED_TYPE_COLUMN_MAP = new HashMap<>(); private static final ParallelIndexTuningConfig TUNING_CONFIG = createTuningConfig(); @@ -191,12 +194,17 @@ public class CompactionTaskTest MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-05-01/2017-06-01"), new DoubleDimensionSchema(MIXED_TYPE_COLUMN)); MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-06-01/2017-07-01"), new DoubleDimensionSchema(MIXED_TYPE_COLUMN)); + MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-06-01/2017-06-02"), new DoubleDimensionSchema(MIXED_TYPE_COLUMN)); + MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-06-15/2017-06-16"), new DoubleDimensionSchema(MIXED_TYPE_COLUMN)); + MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-06-30/2017-07-01"), new DoubleDimensionSchema(MIXED_TYPE_COLUMN)); + DIMENSIONS = new HashMap<>(); AGGREGATORS = new ArrayList<>(); DIMENSIONS.put(ColumnHolder.TIME_COLUMN_NAME, new LongDimensionSchema(ColumnHolder.TIME_COLUMN_NAME)); DIMENSIONS.put(TIMESTAMP_COLUMN, new LongDimensionSchema(TIMESTAMP_COLUMN)); - for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) { + int numUmbrellaIntervals = 6; + for (int i = 0; i < numUmbrellaIntervals; i++) { final StringDimensionSchema schema = new StringDimensionSchema( "string_dim_" + i, null, @@ -204,15 +212,15 @@ public class CompactionTaskTest ); DIMENSIONS.put(schema.getName(), schema); } - for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) { + for (int i = 0; i < numUmbrellaIntervals; i++) { final LongDimensionSchema schema = new LongDimensionSchema("long_dim_" + i); DIMENSIONS.put(schema.getName(), schema); } - for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) { + for (int i = 0; i < numUmbrellaIntervals; i++) { final FloatDimensionSchema schema = new FloatDimensionSchema("float_dim_" + i); DIMENSIONS.put(schema.getName(), schema); } - for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) { + for (int i = 0; i < numUmbrellaIntervals; i++) { final DoubleDimensionSchema schema = new DoubleDimensionSchema("double_dim_" + i); DIMENSIONS.put(schema.getName(), schema); } @@ -224,14 +232,13 @@ public class CompactionTaskTest AGGREGATORS.add(new DoubleLastAggregatorFactory("agg_4", "double_dim_4")); for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) { - final Interval segmentInterval = Intervals.of(StringUtils.format("2017-0%d-01/2017-0%d-01", (i + 1), (i + 2))); SEGMENT_MAP.put( new DataSegment( DATA_SOURCE, - segmentInterval, - "version", + SEGMENT_INTERVALS.get(i), + "version_" + i, ImmutableMap.of(), - findDimensions(i, segmentInterval), + findDimensions(i, SEGMENT_INTERVALS.get(i)), AGGREGATORS.stream().map(AggregatorFactory::getName).collect(Collectors.toList()), new NumberedShardSpec(0, 1), 0, @@ -285,7 +292,7 @@ public class CompactionTaskTest dimensions.add(TIMESTAMP_COLUMN); for (int i = 0; i < 6; i++) { int postfix = i + startIndex; - postfix = postfix >= 6 ? postfix - 6 : postfix; + postfix = postfix % 6; dimensions.add("string_dim_" + postfix); dimensions.add("long_dim_" + postfix); dimensions.add("float_dim_" + postfix);