Fix CompactionTask to consider only latest segments (#6429)

* CompactionTask should consider only latest segments

* fix test
This commit is contained in:
Jihoon Son 2018-10-08 21:53:16 -07:00 committed by Fangjin Yang
parent af9efdbedf
commit 9343cbc63a
2 changed files with 24 additions and 10 deletions

View File

@ -71,6 +71,7 @@ import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineLookup;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
@ -92,6 +93,7 @@ import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
public class CompactionTask extends AbstractTask
{
@ -634,19 +636,29 @@ public class CompactionTask extends AbstractTask
List<DataSegment> checkAndGetSegments(TaskToolbox toolbox) throws IOException
{
final List<DataSegment> usedSegments = toolbox.getTaskActionClient()
.submit(new SegmentListUsedAction(dataSource, interval, null));
final List<DataSegment> usedSegments = toolbox.getTaskActionClient().submit(
new SegmentListUsedAction(dataSource, interval, null)
);
final TimelineLookup<String, DataSegment> timeline = VersionedIntervalTimeline.forSegments(usedSegments);
final List<DataSegment> latestSegments = timeline
.lookup(interval)
.stream()
.map(TimelineObjectHolder::getObject)
.flatMap(partitionHolder -> StreamSupport.stream(partitionHolder.spliterator(), false))
.map(PartitionChunk::getObject)
.collect(Collectors.toList());
if (segments != null) {
Collections.sort(usedSegments);
Collections.sort(latestSegments);
Collections.sort(segments);
if (!usedSegments.equals(segments)) {
if (!latestSegments.equals(segments)) {
final List<DataSegment> unknownSegments = segments.stream()
.filter(segment -> !usedSegments.contains(segment))
.filter(segment -> !latestSegments.contains(segment))
.collect(Collectors.toList());
final List<DataSegment> missingSegments = usedSegments.stream()
.filter(segment -> !segments.contains(segment))
.collect(Collectors.toList());
final List<DataSegment> missingSegments = latestSegments.stream()
.filter(segment -> !segments.contains(segment))
.collect(Collectors.toList());
throw new ISE(
"Specified segments in the spec are different from the current used segments. "
+ "There are unknown segments[%s] and missing segments[%s] in the spec.",
@ -655,7 +667,7 @@ public class CompactionTask extends AbstractTask
);
}
}
return usedSegments;
return latestSegments;
}
}

View File

@ -698,7 +698,9 @@ public class CompactionTaskTest
expectedException.expectMessage(CoreMatchers.containsString("are different from the current used segments"));
final List<DataSegment> segments = new ArrayList<>(SEGMENTS);
segments.remove(0);
Collections.sort(segments);
// Remove one segment in the middle
segments.remove(segments.size() / 2);
CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(segments),