From fa7cb906e40021549648848caada38cf80338b7b Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 27 Dec 2018 18:03:44 -0800 Subject: [PATCH] Fix auto compaction to consider intervals of running tasks (#6767) * Fix auto compaction to consider intervals of running tasks * adjust initial collection size --- .../NewestSegmentFirstPolicyBenchmark.java | 3 +- .../client/indexing/ClientAppendQuery.java | 4 +- .../client/indexing/ClientCompactQuery.java | 4 +- .../client/indexing/ClientKillQuery.java | 4 +- .../client/indexing/ClientMergeQuery.java | 4 +- .../druid/client/indexing/ClientQuery.java | 41 +++++ .../indexing/HttpIndexingServiceClient.java | 20 +++ .../indexing/IndexingServiceClient.java | 3 + .../client/indexing/TaskPayloadResponse.java | 60 +++++++ .../helper/CompactionSegmentSearchPolicy.java | 5 +- .../DruidCoordinatorSegmentCompactor.java | 44 ++++- .../helper/NewestSegmentFirstIterator.java | 155 ++++++++++++++---- .../helper/NewestSegmentFirstPolicy.java | 7 +- .../indexing/NoopIndexingServiceClient.java | 6 + .../NewestSegmentFirstIteratorTest.java | 76 +++++++++ .../helper/NewestSegmentFirstPolicyTest.java | 76 +++++++-- 16 files changed, 451 insertions(+), 61 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/client/indexing/ClientQuery.java create mode 100644 server/src/main/java/org/apache/druid/client/indexing/TaskPayloadResponse.java create mode 100644 server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIteratorTest.java diff --git a/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java b/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java index 760c0a523a3..658a3314904 100644 --- a/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java +++ b/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java @@ -42,6 +42,7 @@ import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.infra.Blackhole; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -137,7 +138,7 @@ public class NewestSegmentFirstPolicyBenchmark @Benchmark public void measureNewestSegmentFirstPolicy(Blackhole blackhole) { - final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources); + final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources, Collections.emptyMap()); for (int i = 0; i < numCompactionTaskSlots && iterator.hasNext(); i++) { final List segments = iterator.next(); blackhole.consume(segments); diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientAppendQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientAppendQuery.java index 4271200e7f6..c5497b8cb9d 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientAppendQuery.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientAppendQuery.java @@ -27,7 +27,7 @@ import java.util.List; /** */ -public class ClientAppendQuery +public class ClientAppendQuery implements ClientQuery { private final String dataSource; private final List segments; @@ -43,12 +43,14 @@ public class ClientAppendQuery } @JsonProperty + @Override public String getType() { return "append"; } @JsonProperty + @Override public String getDataSource() { return dataSource; diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java index 1fe2b0417ea..eec9004c752 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java @@ -26,7 +26,7 @@ import org.apache.druid.timeline.DataSegment; import java.util.List; import java.util.Map; -public class ClientCompactQuery +public class ClientCompactQuery implements ClientQuery { private final String dataSource; private final List segments; @@ -54,12 +54,14 @@ public class ClientCompactQuery } @JsonProperty + @Override public String getType() { return "compact"; } @JsonProperty + @Override public String getDataSource() { return dataSource; diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientKillQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientKillQuery.java index 4d972463295..06d88f9535a 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientKillQuery.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientKillQuery.java @@ -25,7 +25,7 @@ import org.joda.time.Interval; /** */ -public class ClientKillQuery +public class ClientKillQuery implements ClientQuery { private final String dataSource; private final Interval interval; @@ -41,12 +41,14 @@ public class ClientKillQuery } @JsonProperty + @Override public String getType() { return "kill"; } @JsonProperty + @Override public String getDataSource() { return dataSource; diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientMergeQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientMergeQuery.java index 3800953950f..b61c6e6a11e 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientMergeQuery.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientMergeQuery.java @@ -28,7 +28,7 @@ import java.util.List; /** */ -public class ClientMergeQuery +public class ClientMergeQuery implements ClientQuery { private final String dataSource; private final List segments; @@ -48,12 +48,14 @@ public class ClientMergeQuery } @JsonProperty + @Override public String getType() { return "merge"; } @JsonProperty + @Override public String getDataSource() { return dataSource; diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientQuery.java new file mode 100644 index 00000000000..6dbd631baf5 --- /dev/null +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientQuery.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client.indexing; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonSubTypes.Type; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +/** + * org.apache.druid.indexing.common.task.Task representation for clients + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @Type(name = "append", value = ClientAppendQuery.class), + @Type(name = "merge", value = ClientMergeQuery.class), + @Type(name = "kill", value = ClientKillQuery.class), + @Type(name = "compact", value = ClientCompactQuery.class) +}) +public interface ClientQuery +{ + String getType(); + + String getDataSource(); +} diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java index 09a4753ea3c..b8960c03c58 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java @@ -278,6 +278,26 @@ public class HttpIndexingServiceClient implements IndexingServiceClient return completeTaskStatuses.isEmpty() ? null : completeTaskStatuses.get(0); } + @Override + public TaskPayloadResponse getTaskPayload(String taskId) + { + try { + final FullResponseHolder responseHolder = druidLeaderClient.go( + druidLeaderClient.makeRequest(HttpMethod.GET, StringUtils.format("/druid/indexer/v1/task/%s", taskId)) + ); + + return jsonMapper.readValue( + responseHolder.getContent(), + new TypeReference() + { + } + ); + } + catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + @Override public int killPendingSegments(String dataSource, DateTime end) { diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java index d9d35f5e624..8a54bb4c5eb 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java @@ -61,4 +61,7 @@ public interface IndexingServiceClient @Nullable TaskStatusPlus getLastCompleteTask(); + + @Nullable + TaskPayloadResponse getTaskPayload(String taskId); } diff --git a/server/src/main/java/org/apache/druid/client/indexing/TaskPayloadResponse.java b/server/src/main/java/org/apache/druid/client/indexing/TaskPayloadResponse.java new file mode 100644 index 00000000000..1b938af3262 --- /dev/null +++ b/server/src/main/java/org/apache/druid/client/indexing/TaskPayloadResponse.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client.indexing; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class TaskPayloadResponse +{ + private final String task; + private final ClientQuery payload; + + @JsonCreator + public TaskPayloadResponse( + @JsonProperty("task") final String task, + @JsonProperty("payload") final ClientQuery payload + ) + { + this.task = task; + this.payload = payload; + } + + @JsonProperty + public String getTask() + { + return task; + } + + @JsonProperty + public ClientQuery getPayload() + { + return payload; + } + + @Override + public String toString() + { + return "TaskPayloadResponse{" + + "task='" + task + '\'' + + ", payload=" + payload + + '}'; + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/CompactionSegmentSearchPolicy.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/CompactionSegmentSearchPolicy.java index db861630af8..fddec4b299f 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/CompactionSegmentSearchPolicy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/CompactionSegmentSearchPolicy.java @@ -22,7 +22,9 @@ package org.apache.druid.server.coordinator.helper; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.joda.time.Interval; +import java.util.List; import java.util.Map; /** @@ -35,6 +37,7 @@ public interface CompactionSegmentSearchPolicy */ CompactionSegmentIterator reset( Map compactionConfigs, - Map> dataSources + Map> dataSources, + Map> skipIntervals ); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java index 82be5c4458d..ec189c0c2cd 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java @@ -21,9 +21,13 @@ package org.apache.druid.server.coordinator.helper; import com.google.inject.Inject; import it.unimi.dsi.fastutil.objects.Object2LongMap; +import org.apache.druid.client.indexing.ClientCompactQuery; import org.apache.druid.client.indexing.IndexingServiceClient; +import org.apache.druid.client.indexing.TaskPayloadResponse; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.CoordinatorStats; @@ -32,10 +36,12 @@ import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegmentUtils; import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.joda.time.Interval; import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -77,23 +83,46 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper Map compactionConfigs = compactionConfigList .stream() .collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity())); - final int numNonCompleteCompactionTasks = findNumNonCompleteCompactTasks( + final List compactTasks = filterNonCompactTasks( indexingServiceClient.getRunningTasks(), indexingServiceClient.getPendingTasks(), indexingServiceClient.getWaitingTasks() ); - final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources); + // dataSource -> list of intervals of compact tasks + final Map> compactTaskIntervals = new HashMap<>(compactionConfigList.size()); + for (TaskStatusPlus status : compactTasks) { + final TaskPayloadResponse response = indexingServiceClient.getTaskPayload(status.getId()); + if (response == null) { + throw new ISE("WTH? got a null paylord from overlord for task[%s]", status.getId()); + } + if (COMPACT_TASK_TYPE.equals(response.getPayload().getType())) { + final ClientCompactQuery compactQuery = (ClientCompactQuery) response.getPayload(); + final Interval interval = JodaUtils.umbrellaInterval( + compactQuery.getSegments() + .stream() + .map(DataSegment::getInterval) + .sorted(Comparators.intervalsByStartThenEnd()) + .collect(Collectors.toList()) + ); + compactTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval); + } else { + throw new ISE("WTH? task[%s] is not a compactTask?", status.getId()); + } + } + + final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources, compactTaskIntervals); final int compactionTaskCapacity = (int) Math.min( indexingServiceClient.getTotalWorkerCapacity() * dynamicConfig.getCompactionTaskSlotRatio(), dynamicConfig.getMaxCompactionTaskSlots() ); - final int numAvailableCompactionTaskSlots = numNonCompleteCompactionTasks > 0 ? - compactionTaskCapacity - numNonCompleteCompactionTasks : + final int numNonCompleteCompactionTasks = compactTasks.size(); + final int numAvailableCompactionTaskSlots = numNonCompleteCompactionTasks > 0 + ? Math.max(0, compactionTaskCapacity - numNonCompleteCompactionTasks) // compactionTaskCapacity might be 0 if totalWorkerCapacity is low. // This guarantees that at least one slot is available if // compaction is enabled and numRunningCompactTasks is 0. - Math.max(1, compactionTaskCapacity); + : Math.max(1, compactionTaskCapacity); LOG.info( "Found [%d] available task slots for compaction out of [%d] max compaction task capacity", numAvailableCompactionTaskSlots, @@ -117,7 +146,7 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper } @SafeVarargs - private static int findNumNonCompleteCompactTasks(List...taskStatusStreams) + private static List filterNonCompactTasks(List...taskStatusStreams) { final List allTaskStatusPlus = new ArrayList<>(); Arrays.stream(taskStatusStreams).forEach(allTaskStatusPlus::addAll); @@ -132,8 +161,7 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper // performance. return taskType == null || COMPACT_TASK_TYPE.equals(taskType); }) - .collect(Collectors.toList()) - .size(); + .collect(Collectors.toList()); } private CoordinatorStats doRun( diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java index d114fff0d7e..8c0d80d175f 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java @@ -19,10 +19,12 @@ package org.apache.druid.server.coordinator.helper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; @@ -67,7 +69,8 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator NewestSegmentFirstIterator( Map compactionConfigs, - Map> dataSources + Map> dataSources, + Map> skipIntervals ) { this.compactionConfigs = compactionConfigs; @@ -80,9 +83,9 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator final DataSourceCompactionConfig config = compactionConfigs.get(dataSource); if (config != null && !timeline.isEmpty()) { - final Interval searchInterval = findInitialSearchInterval(timeline, config.getSkipOffsetFromLatest()); - if (searchInterval != null) { - timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchInterval)); + final List searchIntervals = findInitialSearchInterval(timeline, config.getSkipOffsetFromLatest(), skipIntervals.get(dataSource)); + if (!searchIntervals.isEmpty()) { + timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchIntervals)); } } } @@ -186,19 +189,22 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator CompactibleTimelineObjectHolderCursor( VersionedIntervalTimeline timeline, - Interval totalIntervalToSearch + List totalIntervalsToSearch ) { - this.holders = timeline - .lookup(totalIntervalToSearch) + this.holders = totalIntervalsToSearch .stream() - .filter(holder -> { - final List> chunks = Lists.newArrayList(holder.getObject().iterator()); - final long partitionBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum(); - return chunks.size() > 0 - && partitionBytes > 0 - && totalIntervalToSearch.contains(chunks.get(0).getObject().getInterval()); - }) + .flatMap(interval -> timeline + .lookup(interval) + .stream() + .filter(holder -> { + final List> chunks = Lists.newArrayList(holder.getObject().iterator()); + final long partitionBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum(); + return chunks.size() > 0 + && partitionBytes > 0 + && interval.contains(chunks.get(0).getObject().getInterval()); + }) + ) .collect(Collectors.toList()); } @@ -339,15 +345,15 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator /** * Returns the initial searchInterval which is {@code (timeline.first().start, timeline.last().end - skipOffset)}. * - * @param timeline timeline of a dataSource - * @param skipOffset skipOFfset + * @param timeline timeline of a dataSource + * @param skipIntervals intervals to skip * * @return found interval to search or null if it's not found */ - @Nullable - private static Interval findInitialSearchInterval( + private static List findInitialSearchInterval( VersionedIntervalTimeline timeline, - Period skipOffset + Period skipOffset, + @Nullable List skipIntervals ) { Preconditions.checkArgument(timeline != null && !timeline.isEmpty(), "timeline should not be null or empty"); @@ -355,35 +361,118 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator final TimelineObjectHolder first = Preconditions.checkNotNull(timeline.first(), "first"); final TimelineObjectHolder last = Preconditions.checkNotNull(timeline.last(), "last"); + final List fullSkipIntervals = sortAndAddSkipIntervalFromLatest( + last.getInterval().getEnd(), + skipOffset, + skipIntervals + ); - final Interval skipInterval = new Interval(skipOffset, last.getInterval().getEnd()); + final Interval totalInterval = new Interval(first.getInterval().getStart(), last.getInterval().getEnd()); + final List filteredInterval = filterSkipIntervals(totalInterval, fullSkipIntervals); + final List searchIntervals = new ArrayList<>(); - final DateTime lookupStart = first.getInterval().getStart(); - final DateTime lookupEnd = last.getInterval().getEnd().minus(skipOffset); - if (lookupStart.isBefore(lookupEnd)) { + for (Interval lookupInterval : filteredInterval) { final List> holders = timeline.lookup( - new Interval(lookupStart, lookupEnd) + new Interval(lookupInterval.getStart(), lookupInterval.getEnd()) ); final List segments = holders .stream() .flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false)) .map(PartitionChunk::getObject) - .filter(segment -> !segment.getInterval().overlaps(skipInterval)) + .filter(segment -> lookupInterval.contains(segment.getInterval())) .sorted((s1, s2) -> Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), s2.getInterval())) .collect(Collectors.toList()); - if (segments.isEmpty()) { - return null; - } else { - return new Interval( - segments.get(0).getInterval().getStart(), - segments.get(segments.size() - 1).getInterval().getEnd() + if (!segments.isEmpty()) { + searchIntervals.add( + new Interval( + segments.get(0).getInterval().getStart(), + segments.get(segments.size() - 1).getInterval().getEnd() + ) ); } - } else { - return null; } + + return searchIntervals; + } + + @VisibleForTesting + static List sortAndAddSkipIntervalFromLatest( + DateTime latest, + Period skipOffset, + @Nullable List skipIntervals + ) + { + final List nonNullSkipIntervals = skipIntervals == null + ? new ArrayList<>(1) + : new ArrayList<>(skipIntervals.size()); + + if (skipIntervals != null) { + final List sortedSkipIntervals = new ArrayList<>(skipIntervals); + sortedSkipIntervals.sort(Comparators.intervalsByStartThenEnd()); + + final List overlapIntervals = new ArrayList<>(); + final Interval skipFromLatest = new Interval(skipOffset, latest); + + for (Interval interval : sortedSkipIntervals) { + if (interval.overlaps(skipFromLatest)) { + overlapIntervals.add(interval); + } else { + nonNullSkipIntervals.add(interval); + } + } + + if (!overlapIntervals.isEmpty()) { + overlapIntervals.add(skipFromLatest); + nonNullSkipIntervals.add(JodaUtils.umbrellaInterval(overlapIntervals)); + } else { + nonNullSkipIntervals.add(skipFromLatest); + } + } else { + final Interval skipFromLatest = new Interval(skipOffset, latest); + nonNullSkipIntervals.add(skipFromLatest); + } + + return nonNullSkipIntervals; + } + + /** + * Returns a list of intervals which are contained by totalInterval but don't ovarlap with skipIntervals. + * + * @param totalInterval total interval + * @param skipIntervals intervals to skip. This should be sorted by {@link Comparators#intervalsByStartThenEnd()}. + */ + @VisibleForTesting + static List filterSkipIntervals(Interval totalInterval, List skipIntervals) + { + final List filteredIntervals = new ArrayList<>(skipIntervals.size() + 1); + + DateTime remainingStart = totalInterval.getStart(); + DateTime remainingEnd = totalInterval.getEnd(); + for (Interval skipInterval : skipIntervals) { + if (skipInterval.getStart().isBefore(remainingStart) && skipInterval.getEnd().isAfter(remainingStart)) { + remainingStart = skipInterval.getEnd(); + } else if (skipInterval.getStart().isBefore(remainingEnd) && skipInterval.getEnd().isAfter(remainingEnd)) { + remainingEnd = skipInterval.getStart(); + } else if (!remainingStart.isAfter(skipInterval.getStart()) && !remainingEnd.isBefore(skipInterval.getEnd())) { + filteredIntervals.add(new Interval(remainingStart, skipInterval.getStart())); + remainingStart = skipInterval.getEnd(); + } else { + // Ignore this skipInterval + log.warn( + "skipInterval[%s] is not contained in remainingInterval[%s]", + skipInterval, + new Interval(remainingStart, remainingEnd) + ); + } + } + + if (!remainingStart.equals(remainingEnd)) { + filteredIntervals.add(new Interval(remainingStart, remainingEnd)); + } + + return filteredIntervals; } private static class QueueEntry diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicy.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicy.java index 19b882e0097..8a4118221d0 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicy.java @@ -22,7 +22,9 @@ package org.apache.druid.server.coordinator.helper; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.joda.time.Interval; +import java.util.List; import java.util.Map; /** @@ -33,9 +35,10 @@ public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy @Override public CompactionSegmentIterator reset( Map compactionConfigs, - Map> dataSources + Map> dataSources, + Map> skipIntervals ) { - return new NewestSegmentFirstIterator(compactionConfigs, dataSources); + return new NewestSegmentFirstIterator(compactionConfigs, dataSources, skipIntervals); } } diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java index d395a6c2efa..01f84df01f3 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java +++ b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java @@ -109,4 +109,10 @@ public class NoopIndexingServiceClient implements IndexingServiceClient { return null; } + + @Override + public TaskPayloadResponse getTaskPayload(String taskId) + { + return null; + } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIteratorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIteratorTest.java new file mode 100644 index 00000000000..0e5f3339709 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIteratorTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.helper; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; +import org.joda.time.Interval; +import org.joda.time.Period; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class NewestSegmentFirstIteratorTest +{ + @Test + public void testFilterSkipIntervals() + { + final Interval totalInterval = Intervals.of("2018-01-01/2019-01-01"); + final List expectedSkipIntervals = ImmutableList.of( + Intervals.of("2018-01-15/2018-03-02"), + Intervals.of("2018-07-23/2018-10-01"), + Intervals.of("2018-10-02/2018-12-25"), + Intervals.of("2018-12-31/2019-01-01") + ); + final List skipIntervals = NewestSegmentFirstIterator.filterSkipIntervals( + totalInterval, + Lists.newArrayList( + Intervals.of("2017-12-01/2018-01-15"), + Intervals.of("2018-03-02/2018-07-23"), + Intervals.of("2018-10-01/2018-10-02"), + Intervals.of("2018-12-25/2018-12-31") + ) + ); + + Assert.assertEquals(expectedSkipIntervals, skipIntervals); + } + + @Test + public void testAddSkipIntervalFromLatestAndSort() + { + final List expectedIntervals = ImmutableList.of( + Intervals.of("2018-12-24/2018-12-25"), + Intervals.of("2018-12-29/2019-01-01") + ); + final List fullSkipIntervals = NewestSegmentFirstIterator.sortAndAddSkipIntervalFromLatest( + DateTimes.of("2019-01-01"), + new Period(72, 0, 0, 0), + ImmutableList.of( + Intervals.of("2018-12-30/2018-12-31"), + Intervals.of("2018-12-24/2018-12-25") + ) + ); + + Assert.assertEquals(expectedIntervals, fullSkipIntervals); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java index 5ddbaab6cd9..8fe3e37cfa4 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java @@ -78,7 +78,8 @@ public class NewestSegmentFirstPolicyTest new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod), new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-16T07:00:00"), segmentPeriod) ) - ) + ), + Collections.emptyMap() ); assertCompactSegmentIntervals( @@ -102,7 +103,8 @@ public class NewestSegmentFirstPolicyTest new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod), new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-16T07:00:00"), segmentPeriod) ) - ) + ), + Collections.emptyMap() ); assertCompactSegmentIntervals( @@ -178,7 +180,8 @@ public class NewestSegmentFirstPolicyTest // larger gap than SegmentCompactorUtil.LOOKUP_PERIOD (1 day) new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-15T07:00:00"), segmentPeriod) ) - ) + ), + Collections.emptyMap() ); assertCompactSegmentIntervals( @@ -211,7 +214,8 @@ public class NewestSegmentFirstPolicyTest // larger gap than SegmentCompactorUtil.LOOKUP_PERIOD (1 day) new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-15T07:00:00"), segmentPeriod) ) - ) + ), + Collections.emptyMap() ); assertCompactSegmentIntervals( @@ -258,7 +262,8 @@ public class NewestSegmentFirstPolicyTest DEFAULT_NUM_SEGMENTS_PER_SHARD ) ) - ) + ), + Collections.emptyMap() ); Interval lastInterval = null; @@ -313,7 +318,8 @@ public class NewestSegmentFirstPolicyTest 80 ) ) - ) + ), + Collections.emptyMap() ); Interval lastInterval = null; @@ -392,7 +398,8 @@ public class NewestSegmentFirstPolicyTest 150 ) ) - ) + ), + Collections.emptyMap() ); Assert.assertFalse(iterator.hasNext()); @@ -416,7 +423,8 @@ public class NewestSegmentFirstPolicyTest new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod), new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-16T07:00:00"), segmentPeriod) ) - ) + ), + Collections.emptyMap() ); assertCompactSegmentIntervals( @@ -449,7 +457,8 @@ public class NewestSegmentFirstPolicyTest 1 ) ) - ) + ), + Collections.emptyMap() ); Assert.assertFalse(iterator.hasNext()); @@ -481,7 +490,8 @@ public class NewestSegmentFirstPolicyTest ); final CompactionSegmentIterator iterator = policy.reset( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(maxSizeOfSegmentsToCompact, 100, new Period("P0D"))), - ImmutableMap.of(DATA_SOURCE, timeline) + ImmutableMap.of(DATA_SOURCE, timeline), + Collections.emptyMap() ); final List expectedSegmentsToCompact = timeline @@ -510,7 +520,8 @@ public class NewestSegmentFirstPolicyTest final CompactionSegmentIterator iterator = policy.reset( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, 100, new Period("P1D"))), - ImmutableMap.of(DATA_SOURCE, timeline) + ImmutableMap.of(DATA_SOURCE, timeline), + Collections.emptyMap() ); Assert.assertFalse(iterator.hasNext()); @@ -530,12 +541,53 @@ public class NewestSegmentFirstPolicyTest final CompactionSegmentIterator iterator = policy.reset( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, 100, new Period("P1D"))), - ImmutableMap.of(DATA_SOURCE, timeline) + ImmutableMap.of(DATA_SOURCE, timeline), + Collections.emptyMap() ); Assert.assertFalse(iterator.hasNext()); } + @Test + public void testWithSkipIntervals() + { + final Period segmentPeriod = new Period("PT1H"); + final CompactionSegmentIterator iterator = policy.reset( + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, 100, new Period("P1D"))), + ImmutableMap.of( + DATA_SOURCE, + createTimeline( + new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod), + new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-16T07:00:00"), segmentPeriod) + ) + ), + ImmutableMap.of( + DATA_SOURCE, + ImmutableList.of( + Intervals.of("2017-11-16T00:00:00/2017-11-17T00:00:00"), + Intervals.of("2017-11-15T00:00:00/2017-11-15T20:00:00"), + Intervals.of("2017-11-13T00:00:00/2017-11-14T01:00:00") + ) + ) + ); + + assertCompactSegmentIntervals( + iterator, + segmentPeriod, + Intervals.of("2017-11-15T20:00:00/2017-11-15T21:00:00"), + Intervals.of("2017-11-15T23:00:00/2017-11-16T00:00:00"), + false + ); + + assertCompactSegmentIntervals( + iterator, + segmentPeriod, + Intervals.of("2017-11-14T01:00:00/2017-11-14T02:00:00"), + Intervals.of("2017-11-14T23:00:00/2017-11-15T00:00:00"), + true + ); + } + private static void assertCompactSegmentIntervals( CompactionSegmentIterator iterator, Period segmentPeriod,