From 9343cbc63aa006a1dadaab2e530e3321ee9e29f9 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 8 Oct 2018 21:53:16 -0700 Subject: [PATCH] Fix CompactionTask to consider only latest segments (#6429) * CompactionTask should consider only latest segments * fix test --- .../indexing/common/task/CompactionTask.java | 30 +++++++++++++------ .../common/task/CompactionTaskTest.java | 4 ++- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 4b0da2adfa6..3c6992374f9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -71,6 +71,7 @@ import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.TimelineLookup; import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; @@ -92,6 +93,7 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.StreamSupport; public class CompactionTask extends AbstractTask { @@ -634,19 +636,29 @@ public class CompactionTask extends AbstractTask List checkAndGetSegments(TaskToolbox toolbox) throws IOException { - final List usedSegments = toolbox.getTaskActionClient() - .submit(new SegmentListUsedAction(dataSource, interval, null)); + final List usedSegments = toolbox.getTaskActionClient().submit( + new SegmentListUsedAction(dataSource, interval, null) + ); + final TimelineLookup timeline = VersionedIntervalTimeline.forSegments(usedSegments); + final List latestSegments = timeline + .lookup(interval) + .stream() + .map(TimelineObjectHolder::getObject) + .flatMap(partitionHolder -> StreamSupport.stream(partitionHolder.spliterator(), false)) + .map(PartitionChunk::getObject) + .collect(Collectors.toList()); + if (segments != null) { - Collections.sort(usedSegments); + Collections.sort(latestSegments); Collections.sort(segments); - if (!usedSegments.equals(segments)) { + if (!latestSegments.equals(segments)) { final List unknownSegments = segments.stream() - .filter(segment -> !usedSegments.contains(segment)) + .filter(segment -> !latestSegments.contains(segment)) .collect(Collectors.toList()); - final List missingSegments = usedSegments.stream() - .filter(segment -> !segments.contains(segment)) - .collect(Collectors.toList()); + final List missingSegments = latestSegments.stream() + .filter(segment -> !segments.contains(segment)) + .collect(Collectors.toList()); throw new ISE( "Specified segments in the spec are different from the current used segments. " + "There are unknown segments[%s] and missing segments[%s] in the spec.", @@ -655,7 +667,7 @@ public class CompactionTask extends AbstractTask ); } } - return usedSegments; + return latestSegments; } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 837b33a50a0..eb80011bc1c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -698,7 +698,9 @@ public class CompactionTaskTest expectedException.expectMessage(CoreMatchers.containsString("are different from the current used segments")); final List segments = new ArrayList<>(SEGMENTS); - segments.remove(0); + Collections.sort(segments); + // Remove one segment in the middle + segments.remove(segments.size() / 2); CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(segments),