From 01d67ae543bec181341a1136cd4e4feecf4bf66e Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 16 Jul 2024 07:24:49 -0700 Subject: [PATCH] Allow CompactionSegmentIterator to have custom priority (#16737) Changes: - Break `NewestSegmentFirstIterator` into two parts - `DatasourceCompactibleSegmentIterator` - this contains all the code from `NewestSegmentFirstIterator` but now handles a single datasource and allows a priority to be specified - `PriorityBasedCompactionSegmentIterator` - contains separate iterator for each datasource and combines the results into a single queue to be used by a compaction search policy - Update `NewestSegmentFirstPolicy` to use the above new classes - Cleanup `CompactionStatistics` and `AutoCompactionSnapshot` - Cleanup `CompactSegments` - Remove unused methods from `Tasks` - Remove unneeded `TasksTest` - Move tests from `NewestSegmentFirstIteratorTest` to `CompactionStatusTest` and `DatasourceCompactibleSegmentIteratorTest` --- .../NewestSegmentFirstPolicyBenchmark.java | 2 +- .../druid/indexing/common/task/Tasks.java | 38 +- .../druid/indexing/common/task/TasksTest.java | 94 ---- ...ClientCompactionTaskQueryTuningConfig.java | 12 + .../coordinator/AutoCompactionSnapshot.java | 91 +--- .../CompactionSegmentSearchPolicy.java | 4 +- .../compact/CompactionStatistics.java | 16 +- .../coordinator/compact/CompactionStatus.java | 7 +- ...DataSourceCompactibleSegmentIterator.java} | 137 ++--- .../compact/NewestSegmentFirstPolicy.java | 15 +- ...riorityBasedCompactionSegmentIterator.java | 135 +++++ .../compact/SegmentsToCompact.java | 4 +- .../coordinator/duty/CompactSegments.java | 108 ++-- .../AutoCompactionSnapshotTest.java | 15 +- .../compact/CompactionStatusTest.java | 178 +++++++ ...aSourceCompactibleSegmentIteratorTest.java | 77 +++ .../NewestSegmentFirstIteratorTest.java | 477 ------------------ .../compact/NewestSegmentFirstPolicyTest.java | 112 ++-- 18 files changed, 587 insertions(+), 935 deletions(-) delete mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/common/task/TasksTest.java rename server/src/main/java/org/apache/druid/server/coordinator/compact/{NewestSegmentFirstIterator.java => DataSourceCompactibleSegmentIterator.java} (82%) create mode 100644 server/src/main/java/org/apache/druid/server/coordinator/compact/PriorityBasedCompactionSegmentIterator.java create mode 100644 server/src/test/java/org/apache/druid/server/coordinator/compact/CompactionStatusTest.java create mode 100644 server/src/test/java/org/apache/druid/server/coordinator/compact/DataSourceCompactibleSegmentIteratorTest.java delete mode 100644 server/src/test/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIteratorTest.java diff --git a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java index 37f7b665927..98c27c4b2b8 100644 --- a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java @@ -141,7 +141,7 @@ public class NewestSegmentFirstPolicyBenchmark @Benchmark public void measureNewestSegmentFirstPolicy(Blackhole blackhole) { - final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources, Collections.emptyMap()); + final CompactionSegmentIterator iterator = policy.createIterator(compactionConfigs, dataSources, Collections.emptyMap()); for (int i = 0; i < numCompactionTaskSlots && iterator.hasNext(); i++) { blackhole.consume(iterator.next()); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java index b357125dc4d..a8f268a67f1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java @@ -21,16 +21,9 @@ package org.apache.druid.indexing.common.task; import org.apache.curator.shaded.com.google.common.base.Verify; import org.apache.druid.indexing.common.TaskLockType; -import org.apache.druid.java.util.common.JodaUtils; -import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.duty.CompactSegments; -import org.joda.time.Interval; -import java.util.ArrayList; -import java.util.List; -import java.util.SortedSet; -import java.util.TreeSet; import java.util.concurrent.TimeUnit; public class Tasks @@ -63,44 +56,19 @@ public class Tasks * Context flag denoting if maximum possible values should be used to estimate * on-heap memory usage while indexing. Refer to OnHeapIncrementalIndex for * more details. - * + *

* The value of this flag is true by default which corresponds to the old method * of estimation. */ public static final String USE_MAX_MEMORY_ESTIMATES = "useMaxMemoryEstimates"; /** - * This context is used in compaction. When it is set in the context, the segments created by the task - * will fill 'lastCompactionState' in its metadata. This will be used to track what segments are compacted or not. - * See {@link org.apache.druid.timeline.DataSegment} and {@link - * org.apache.druid.server.coordinator.compact.NewestSegmentFirstIterator} for more details. + * Context flag to denote if segments published to metadata by a task should + * have the {@code lastCompactionState} field set. */ public static final String STORE_COMPACTION_STATE_KEY = "storeCompactionState"; static { Verify.verify(STORE_COMPACTION_STATE_KEY.equals(CompactSegments.STORE_COMPACTION_STATE_KEY)); } - - public static SortedSet computeCondensedIntervals(SortedSet intervals) - { - final SortedSet condensedIntervals = new TreeSet<>(Comparators.intervalsByStartThenEnd()); - List toBeAccumulated = new ArrayList<>(); - for (Interval interval : intervals) { - if (toBeAccumulated.size() == 0) { - toBeAccumulated.add(interval); - } else { - if (toBeAccumulated.get(toBeAccumulated.size() - 1).abuts(interval)) { - toBeAccumulated.add(interval); - } else { - condensedIntervals.add(JodaUtils.umbrellaInterval(toBeAccumulated)); - toBeAccumulated.clear(); - toBeAccumulated.add(interval); - } - } - } - if (toBeAccumulated.size() > 0) { - condensedIntervals.add(JodaUtils.umbrellaInterval(toBeAccumulated)); - } - return condensedIntervals; - } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TasksTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TasksTest.java deleted file mode 100644 index 248df1db44d..00000000000 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TasksTest.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.common.task; - -import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.Comparators; -import org.joda.time.Interval; -import org.junit.Assert; -import org.junit.Test; - -import java.util.Iterator; -import java.util.SortedSet; -import java.util.TreeSet; - -public class TasksTest -{ - - @Test - public void testComputeCondensedIntervals() - { - final SortedSet inputIntervals = new TreeSet<>(Comparators.intervalsByStartThenEnd()); - for (int m = 1; m < 13; m++) { - for (int d = 1; d < 10; d++) { - inputIntervals.add(getInterval(m, d, m, d + 1)); - } - - for (int d = 12; d < 20; d++) { - inputIntervals.add(getInterval(m, d, m, d + 1)); - } - - inputIntervals.add(getInterval(m, 22, m, 23)); - - for (int d = 25; d < 28; d++) { - inputIntervals.add(getInterval(m, d, m, d + 1)); - } - - if (m == 1 || m == 3 || m == 5 || m == 7 || m == 8 || m == 10) { - inputIntervals.add(getInterval(m, 31, m + 1, 1)); - } - } - - inputIntervals.add(Intervals.of("2017-12-31/2018-01-01")); - - final SortedSet condensedIntervals = Tasks.computeCondensedIntervals(inputIntervals); - final Iterator condensedIntervalIterator = condensedIntervals.iterator(); - Assert.assertTrue(condensedIntervalIterator.hasNext()); - - Interval condensedInterval = condensedIntervalIterator.next(); - final SortedSet checkedIntervals = new TreeSet<>(Comparators.intervalsByStartThenEnd()); - for (Interval inputInterval : inputIntervals) { - if (!condensedInterval.contains(inputInterval)) { - if (condensedIntervalIterator.hasNext()) { - condensedInterval = condensedIntervalIterator.next(); - Assert.assertTrue(condensedInterval.contains(inputInterval)); - } - } - checkedIntervals.add(inputInterval); - } - - Assert.assertFalse(condensedIntervalIterator.hasNext()); - Assert.assertEquals(inputIntervals, checkedIntervals); - } - - private static Interval getInterval(int startMonth, int startDay, int endMonth, int endDay) - { - return Intervals.of( - StringUtils.format( - "2017-%02d-%02d/2017-%02d-%02d", - startMonth, - startDay, - endMonth, - endDay - ) - ); - } -} diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java index 7b1a7c54682..55fe7d0114f 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java @@ -28,6 +28,7 @@ import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.incremental.AppendableIndexSpec; import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; +import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig; import org.joda.time.Duration; @@ -79,6 +80,17 @@ public class ClientCompactionTaskQueryTuningConfig @Nullable private final AppendableIndexSpec appendableIndexSpec; + public static ClientCompactionTaskQueryTuningConfig from( + DataSourceCompactionConfig compactionConfig + ) + { + if (compactionConfig == null) { + return from(null, null, null); + } else { + return from(compactionConfig.getTuningConfig(), compactionConfig.getMaxRowsPerSegment(), null); + } + } + public static ClientCompactionTaskQueryTuningConfig from( @Nullable UserCompactionTaskQueryTuningConfig userCompactionTaskQueryTuningConfig, @Nullable Integer maxRowsPerSegment, diff --git a/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java b/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java index fe46eabb426..d52d4e9eba0 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java @@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.server.coordinator.compact.CompactionStatistics; import javax.validation.constraints.NotNull; import java.util.Objects; @@ -193,15 +194,9 @@ public class AutoCompactionSnapshot private final String dataSource; private final AutoCompactionScheduleStatus scheduleStatus; - private long bytesAwaitingCompaction; - private long bytesCompacted; - private long bytesSkipped; - private long segmentCountAwaitingCompaction; - private long segmentCountCompacted; - private long segmentCountSkipped; - private long intervalCountAwaitingCompaction; - private long intervalCountCompacted; - private long intervalCountSkipped; + private final CompactionStatistics compactedStats = new CompactionStatistics(); + private final CompactionStatistics skippedStats = new CompactionStatistics(); + private final CompactionStatistics waitingStats = new CompactionStatistics(); private Builder( @NotNull String dataSource, @@ -217,69 +212,21 @@ public class AutoCompactionSnapshot this.dataSource = dataSource; this.scheduleStatus = scheduleStatus; - this.bytesAwaitingCompaction = 0; - this.bytesCompacted = 0; - this.bytesSkipped = 0; - this.segmentCountAwaitingCompaction = 0; - this.segmentCountCompacted = 0; - this.segmentCountSkipped = 0; - this.intervalCountAwaitingCompaction = 0; - this.intervalCountCompacted = 0; - this.intervalCountSkipped = 0; } - public Builder incrementBytesAwaitingCompaction(long incrementValue) + public void incrementWaitingStats(CompactionStatistics entry) { - this.bytesAwaitingCompaction = this.bytesAwaitingCompaction + incrementValue; - return this; + waitingStats.increment(entry); } - public Builder incrementBytesCompacted(long incrementValue) + public void incrementCompactedStats(CompactionStatistics entry) { - this.bytesCompacted = this.bytesCompacted + incrementValue; - return this; + compactedStats.increment(entry); } - public Builder incrementSegmentCountAwaitingCompaction(long incrementValue) + public void incrementSkippedStats(CompactionStatistics entry) { - this.segmentCountAwaitingCompaction = this.segmentCountAwaitingCompaction + incrementValue; - return this; - } - - public Builder incrementSegmentCountCompacted(long incrementValue) - { - this.segmentCountCompacted = this.segmentCountCompacted + incrementValue; - return this; - } - - public Builder incrementIntervalCountAwaitingCompaction(long incrementValue) - { - this.intervalCountAwaitingCompaction = this.intervalCountAwaitingCompaction + incrementValue; - return this; - } - - public Builder incrementIntervalCountCompacted(long incrementValue) - { - this.intervalCountCompacted = this.intervalCountCompacted + incrementValue; - return this; - } - - public Builder incrementBytesSkipped(long incrementValue) - { - this.bytesSkipped = this.bytesSkipped + incrementValue; - return this; - } - - public Builder incrementSegmentCountSkipped(long incrementValue) - { - this.segmentCountSkipped = this.segmentCountSkipped + incrementValue; - return this; - } - - public Builder incrementIntervalCountSkipped(long incrementValue) - { - this.intervalCountSkipped = this.intervalCountSkipped + incrementValue; - return this; + skippedStats.increment(entry); } public AutoCompactionSnapshot build() @@ -287,15 +234,15 @@ public class AutoCompactionSnapshot return new AutoCompactionSnapshot( dataSource, scheduleStatus, - bytesAwaitingCompaction, - bytesCompacted, - bytesSkipped, - segmentCountAwaitingCompaction, - segmentCountCompacted, - segmentCountSkipped, - intervalCountAwaitingCompaction, - intervalCountCompacted, - intervalCountSkipped + waitingStats.getTotalBytes(), + compactedStats.getTotalBytes(), + skippedStats.getTotalBytes(), + waitingStats.getNumSegments(), + compactedStats.getNumSegments(), + skippedStats.getNumSegments(), + waitingStats.getNumIntervals(), + compactedStats.getNumIntervals(), + skippedStats.getNumIntervals() ); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionSegmentSearchPolicy.java b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionSegmentSearchPolicy.java index 5a006908c31..cc5f4f59d85 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionSegmentSearchPolicy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionSegmentSearchPolicy.java @@ -33,9 +33,9 @@ import java.util.Map; public interface CompactionSegmentSearchPolicy { /** - * Reset the current states of this policy. This method should be called whenever iterating starts. + * Creates an iterator that returns compactible segments. */ - CompactionSegmentIterator reset( + CompactionSegmentIterator createIterator( Map compactionConfigs, Map dataSources, Map> skipIntervals diff --git a/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatistics.java b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatistics.java index dd672ce4480..6997dec47c0 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatistics.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatistics.java @@ -28,9 +28,13 @@ public class CompactionStatistics private long numSegments; private long numIntervals; - public static CompactionStatistics create() + public static CompactionStatistics create(long bytes, long numSegments, long numIntervals) { - return new CompactionStatistics(); + final CompactionStatistics stats = new CompactionStatistics(); + stats.totalBytes = bytes; + stats.numIntervals = numIntervals; + stats.numSegments = numSegments; + return stats; } public long getTotalBytes() @@ -48,10 +52,10 @@ public class CompactionStatistics return numIntervals; } - public void addFrom(SegmentsToCompact segments) + public void increment(CompactionStatistics other) { - totalBytes += segments.getTotalBytes(); - numIntervals += segments.getNumIntervals(); - numSegments += segments.size(); + totalBytes += other.getTotalBytes(); + numIntervals += other.getNumIntervals(); + numSegments += other.getNumSegments(); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatus.java b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatus.java index 862f2e7c5b4..fa053fb8d6a 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatus.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatus.java @@ -167,12 +167,7 @@ public class CompactionStatus this.objectMapper = objectMapper; this.lastCompactionState = candidateSegments.getFirst().getLastCompactionState(); this.compactionConfig = compactionConfig; - this.tuningConfig = ClientCompactionTaskQueryTuningConfig.from( - compactionConfig.getTuningConfig(), - compactionConfig.getMaxRowsPerSegment(), - null - ); - + this.tuningConfig = ClientCompactionTaskQueryTuningConfig.from(compactionConfig); this.configuredGranularitySpec = compactionConfig.getGranularitySpec(); if (lastCompactionState == null) { this.existingGranularitySpec = null; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/compact/DataSourceCompactibleSegmentIterator.java similarity index 82% rename from server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIterator.java rename to server/src/main/java/org/apache/druid/server/coordinator/compact/DataSourceCompactibleSegmentIterator.java index c4ae771f808..c086be3112b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIterator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/compact/DataSourceCompactibleSegmentIterator.java @@ -23,9 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.granularity.Granularity; @@ -59,44 +57,46 @@ import java.util.Set; import java.util.stream.Collectors; /** - * This class iterates all segments of the dataSources configured for compaction from the newest to the oldest. + * Iterator over compactible segments of a datasource in order of specified priority. */ -public class NewestSegmentFirstIterator implements CompactionSegmentIterator +public class DataSourceCompactibleSegmentIterator implements Iterator { - private static final Logger log = new Logger(NewestSegmentFirstIterator.class); + private static final Logger log = new Logger(DataSourceCompactibleSegmentIterator.class); + private final String dataSource; private final ObjectMapper objectMapper; - private final Map compactionConfigs; - private final Map compactedSegmentStats = new HashMap<>(); - private final Map skippedSegmentStats = new HashMap<>(); - - private final Map timelineIterators; + private final DataSourceCompactionConfig config; + private final CompactionStatistics compactedSegmentStats = new CompactionStatistics(); + private final CompactionStatistics skippedSegmentStats = new CompactionStatistics(); // This is needed for datasource that has segmentGranularity configured // If configured segmentGranularity in config is finer than current segmentGranularity, the same set of segments // can belong to multiple intervals in the timeline. We keep track of the compacted intervals between each // run of the compaction job and skip any interval that was already previously compacted. - private final Map> intervalCompactedForDatasource = new HashMap<>(); + private final Set compactedIntervals = new HashSet<>(); - private final PriorityQueue queue = new PriorityQueue<>( - (o1, o2) -> Comparators.intervalsByStartThenEnd().compare(o2.getUmbrellaInterval(), o1.getUmbrellaInterval()) - ); + private final PriorityQueue queue; - NewestSegmentFirstIterator( - ObjectMapper objectMapper, - Map compactionConfigs, - Map dataSources, - Map> skipIntervals + public DataSourceCompactibleSegmentIterator( + DataSourceCompactionConfig config, + SegmentTimeline timeline, + List skipIntervals, + Comparator segmentPriority, + ObjectMapper objectMapper ) { this.objectMapper = objectMapper; - this.compactionConfigs = compactionConfigs; - this.timelineIterators = Maps.newHashMapWithExpectedSize(dataSources.size()); + this.config = config; + this.dataSource = config.getDataSource(); + this.queue = new PriorityQueue<>(segmentPriority); + populateQueue(timeline, skipIntervals); + } - dataSources.forEach((dataSource, timeline) -> { - final DataSourceCompactionConfig config = compactionConfigs.get(dataSource); + private void populateQueue(SegmentTimeline timeline, List skipIntervals) + { + if (timeline != null) { Granularity configuredSegmentGranularity = null; - if (config != null && !timeline.isEmpty()) { + if (!timeline.isEmpty()) { SegmentTimeline originalTimeline = null; if (config.getGranularitySpec() != null && config.getGranularitySpec().getSegmentGranularity() != null) { String temporaryVersion = DateTimes.nowUtc().toString(); @@ -154,33 +154,25 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator timeline, config.getSkipOffsetFromLatest(), configuredSegmentGranularity, - skipIntervals.get(dataSource) + skipIntervals ); if (!searchIntervals.isEmpty()) { - timelineIterators.put( - dataSource, + findAndEnqueueSegmentsToCompact( new CompactibleSegmentIterator(timeline, searchIntervals, originalTimeline) ); + } else { + log.warn("Skipping compaction for datasource[%s] as it has no compactible segments.", dataSource); } } - }); - - compactionConfigs.forEach((dataSourceName, config) -> { - if (config == null) { - throw new ISE("Unknown dataSource[%s]", dataSourceName); - } - updateQueue(dataSourceName, config); - }); + } } - @Override - public Map totalCompactedStatistics() + public CompactionStatistics totalCompactedStatistics() { return compactedSegmentStats; } - @Override - public Map totalSkippedStatistics() + public CompactionStatistics totalSkippedStatistics() { return skippedSegmentStats; } @@ -206,25 +198,9 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator final List resultSegments = entry.getSegments(); Preconditions.checkState(!resultSegments.isEmpty(), "Queue entry must not be empty"); - final String dataSource = resultSegments.get(0).getDataSource(); - updateQueue(dataSource, compactionConfigs.get(dataSource)); - return entry; } - /** - * Find the next segments to compact for the given dataSource and add them to the queue. - * {@link #timelineIterators} is updated according to the found segments. That is, the found segments are removed from - * the timeline of the given dataSource. - */ - private void updateQueue(String dataSourceName, DataSourceCompactionConfig config) - { - final SegmentsToCompact segmentsToCompact = findSegmentsToCompact(dataSourceName, config); - if (!segmentsToCompact.isEmpty()) { - queue.add(segmentsToCompact); - } - } - /** * Iterates compactible segments in a {@link SegmentTimeline}. */ @@ -315,27 +291,12 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator } /** - * Finds segments to compact together for the given datasource. - * - * @return An empty {@link SegmentsToCompact} if there are no eligible candidates. + * Finds segments to compact together for the given datasource and adds them to + * the priority queue. */ - private SegmentsToCompact findSegmentsToCompact( - final String dataSourceName, - final DataSourceCompactionConfig config - ) + private void findAndEnqueueSegmentsToCompact(CompactibleSegmentIterator compactibleSegmentIterator) { - final CompactibleSegmentIterator compactibleSegmentIterator - = timelineIterators.get(dataSourceName); - if (compactibleSegmentIterator == null) { - log.warn( - "Skipping compaction for datasource[%s] as there is no compactible segment in its timeline.", - dataSourceName - ); - return SegmentsToCompact.empty(); - } - final long inputSegmentSize = config.getInputSegmentSizeBytes(); - while (compactibleSegmentIterator.hasNext()) { List segments = compactibleSegmentIterator.next(); @@ -352,47 +313,33 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator if (!compactionStatus.isComplete()) { log.debug( "Datasource[%s], interval[%s] has [%d] segments that need to be compacted because [%s].", - dataSourceName, interval, candidates.size(), compactionStatus.getReasonToCompact() + dataSource, interval, candidates.size(), compactionStatus.getReasonToCompact() ); } if (compactionStatus.isComplete()) { - addSegmentStatsTo(compactedSegmentStats, dataSourceName, candidates); + compactedSegmentStats.increment(candidates.getStats()); } else if (candidates.getTotalBytes() > inputSegmentSize) { - addSegmentStatsTo(skippedSegmentStats, dataSourceName, candidates); + skippedSegmentStats.increment(candidates.getStats()); log.warn( "Skipping compaction for datasource[%s], interval[%s] as total segment size[%d]" + " is larger than allowed inputSegmentSize[%d].", - dataSourceName, interval, candidates.getTotalBytes(), inputSegmentSize + dataSource, interval, candidates.getTotalBytes(), inputSegmentSize ); } else if (config.getGranularitySpec() != null && config.getGranularitySpec().getSegmentGranularity() != null) { - Set compactedIntervals = intervalCompactedForDatasource - .computeIfAbsent(dataSourceName, k -> new HashSet<>()); - if (compactedIntervals.contains(interval)) { // Skip these candidate segments as we have already compacted this interval } else { compactedIntervals.add(interval); - return candidates; + queue.add(candidates); } } else { - return candidates; + queue.add(candidates); } } - log.debug("No more segments to compact for datasource[%s].", dataSourceName); - return SegmentsToCompact.empty(); - } - - private void addSegmentStatsTo( - Map statisticsMap, - String dataSourceName, - SegmentsToCompact segments - ) - { - statisticsMap.computeIfAbsent(dataSourceName, v -> CompactionStatistics.create()) - .addFrom(segments); + log.debug("No more segments to compact for datasource[%s].", dataSource); } /** @@ -428,7 +375,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator final List segments = new ArrayList<>( timeline.findNonOvershadowedObjectsInInterval(skipInterval, Partitions.ONLY_COMPLETE) ); - addSegmentStatsTo(skippedSegmentStats, dataSourceName, SegmentsToCompact.from(segments)); + skippedSegmentStats.increment(SegmentsToCompact.from(segments).getStats()); } final Interval totalInterval = new Interval(first.getInterval().getStart(), last.getInterval().getEnd()); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicy.java b/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicy.java index 20f6d920441..bc923da4f80 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicy.java @@ -21,6 +21,7 @@ package org.apache.druid.server.coordinator.compact; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; +import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.timeline.SegmentTimeline; import org.joda.time.Interval; @@ -29,7 +30,7 @@ import java.util.List; import java.util.Map; /** - * This policy searches segments for compaction from the newest one to oldest one. + * This policy searches segments for compaction from newest to oldest. */ public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy { @@ -42,12 +43,20 @@ public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy } @Override - public CompactionSegmentIterator reset( + public CompactionSegmentIterator createIterator( Map compactionConfigs, Map dataSources, Map> skipIntervals ) { - return new NewestSegmentFirstIterator(objectMapper, compactionConfigs, dataSources, skipIntervals); + return new PriorityBasedCompactionSegmentIterator( + compactionConfigs, + dataSources, + skipIntervals, + (o1, o2) -> Comparators.intervalsByStartThenEnd() + .compare(o2.getUmbrellaInterval(), o1.getUmbrellaInterval()), + objectMapper + ); } + } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/compact/PriorityBasedCompactionSegmentIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/compact/PriorityBasedCompactionSegmentIterator.java new file mode 100644 index 00000000000..33aea2a0451 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/compact/PriorityBasedCompactionSegmentIterator.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.compact; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.timeline.SegmentTimeline; +import org.apache.druid.utils.CollectionUtils; +import org.joda.time.Interval; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.PriorityQueue; + +/** + * Implementation of {@link CompactionSegmentIterator} that returns segments in + * order of their priority. + */ +public class PriorityBasedCompactionSegmentIterator implements CompactionSegmentIterator +{ + private static final Logger log = new Logger(PriorityBasedCompactionSegmentIterator.class); + + private final PriorityQueue queue; + private final Map datasourceIterators; + + public PriorityBasedCompactionSegmentIterator( + Map compactionConfigs, + Map datasourceToTimeline, + Map> skipIntervals, + Comparator segmentPriority, + ObjectMapper objectMapper + ) + { + this.queue = new PriorityQueue<>(segmentPriority); + this.datasourceIterators = Maps.newHashMapWithExpectedSize(datasourceToTimeline.size()); + compactionConfigs.forEach((datasource, config) -> { + if (config == null) { + throw DruidException.defensive("Invalid null compaction config for dataSource[%s].", datasource); + } + final SegmentTimeline timeline = datasourceToTimeline.get(datasource); + if (timeline == null) { + log.warn("Skipping compaction for datasource[%s] as it has no timeline.", datasource); + return; + } + + datasourceIterators.put( + datasource, + new DataSourceCompactibleSegmentIterator( + compactionConfigs.get(datasource), + timeline, + skipIntervals.getOrDefault(datasource, Collections.emptyList()), + segmentPriority, + objectMapper + ) + ); + addNextItemForDatasourceToQueue(datasource); + }); + } + + @Override + public Map totalCompactedStatistics() + { + return CollectionUtils.mapValues( + datasourceIterators, + DataSourceCompactibleSegmentIterator::totalCompactedStatistics + ); + } + + @Override + public Map totalSkippedStatistics() + { + return CollectionUtils.mapValues( + datasourceIterators, + DataSourceCompactibleSegmentIterator::totalSkippedStatistics + ); + } + + @Override + public boolean hasNext() + { + return !queue.isEmpty(); + } + + @Override + public SegmentsToCompact next() + { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + final SegmentsToCompact entry = queue.poll(); + if (entry == null) { + throw new NoSuchElementException(); + } + Preconditions.checkState(!entry.isEmpty(), "Queue entry must not be empty"); + + addNextItemForDatasourceToQueue(entry.getFirst().getDataSource()); + return entry; + } + + private void addNextItemForDatasourceToQueue(String dataSourceName) + { + final DataSourceCompactibleSegmentIterator iterator = datasourceIterators.get(dataSourceName); + if (iterator.hasNext()) { + final SegmentsToCompact segmentsToCompact = iterator.next(); + if (!segmentsToCompact.isEmpty()) { + queue.add(segmentsToCompact); + } + } + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/compact/SegmentsToCompact.java b/server/src/main/java/org/apache/druid/server/coordinator/compact/SegmentsToCompact.java index 1bc53b7dbe7..27ce9beab81 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/compact/SegmentsToCompact.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/compact/SegmentsToCompact.java @@ -107,9 +107,9 @@ public class SegmentsToCompact return umbrellaInterval; } - public long getNumIntervals() + public CompactionStatistics getStats() { - return numIntervals; + return CompactionStatistics.create(totalBytes, size(), numIntervals); } @Override diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index 90fb684db6c..01f3bc77e9e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -54,7 +54,6 @@ import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.compact.CompactionSegmentIterator; import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy; -import org.apache.druid.server.coordinator.compact.CompactionStatistics; import org.apache.druid.server.coordinator.compact.SegmentsToCompact; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.coordinator.stats.Dimension; @@ -87,6 +86,7 @@ public class CompactSegments implements CoordinatorCustomDuty private static final Logger LOG = new Logger(CompactSegments.class); + private static final String TASK_ID_PREFIX = "coordinator-issued"; private static final Predicate IS_COMPACTION_TASK = status -> null != status && COMPACTION_TASK_TYPE.equals(status.getType()); @@ -196,7 +196,7 @@ public class CompactSegments implements CoordinatorCustomDuty // Get iterator over segments to compact and submit compaction tasks Map dataSources = params.getUsedSegmentsTimelinesPerDataSource(); final CompactionSegmentIterator iterator = - policy.reset(compactionConfigs, dataSources, intervalsToSkipCompaction); + policy.createIterator(compactionConfigs, dataSources, intervalsToSkipCompaction); final int compactionTaskCapacity = getCompactionTaskCapacity(dynamicConfig); final int availableCompactionTaskSlots @@ -215,7 +215,7 @@ public class CompactSegments implements CoordinatorCustomDuty stats.add(Stats.Compaction.MAX_SLOTS, compactionTaskCapacity); stats.add(Stats.Compaction.AVAILABLE_SLOTS, availableCompactionTaskSlots); stats.add(Stats.Compaction.SUBMITTED_TASKS, numSubmittedCompactionTasks); - addCompactionSnapshotStats(currentRunAutoCompactionSnapshotBuilders, iterator, stats); + updateCompactionSnapshotStats(currentRunAutoCompactionSnapshotBuilders, iterator, stats); return params; } @@ -392,28 +392,19 @@ public class CompactSegments implements CoordinatorCustomDuty while (iterator.hasNext() && totalTaskSlotsAssigned < numAvailableCompactionTaskSlots) { final SegmentsToCompact entry = iterator.next(); - final List segmentsToCompact = entry.getSegments(); - if (segmentsToCompact.isEmpty()) { + if (entry.isEmpty()) { throw new ISE("segmentsToCompact is empty?"); } - final String dataSourceName = segmentsToCompact.get(0).getDataSource(); + final String dataSourceName = entry.getFirst().getDataSource(); // As these segments will be compacted, we will aggregate the statistic to the Compacted statistics - AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent( - dataSourceName, - AutoCompactionSnapshot::builder - ); - snapshotBuilder - .incrementBytesCompacted( - segmentsToCompact.stream().mapToLong(DataSegment::getSize).sum() - ) - .incrementIntervalCountCompacted( - segmentsToCompact.stream().map(DataSegment::getInterval).distinct().count() - ) - .incrementSegmentCountCompacted(segmentsToCompact.size()); + currentRunAutoCompactionSnapshotBuilders + .computeIfAbsent(dataSourceName, AutoCompactionSnapshot::builder) + .incrementCompactedStats(entry.getStats()); final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName); + final List segmentsToCompact = entry.getSegments(); // Create granularitySpec to send to compaction task ClientCompactionTaskGranularitySpec granularitySpec; @@ -514,7 +505,6 @@ public class CompactSegments implements CoordinatorCustomDuty } final String taskId = compactSegments( - "coordinator-issued", segmentsToCompact, config.getTaskPriority(), ClientCompactionTaskQueryTuningConfig.from( @@ -536,7 +526,6 @@ public class CompactSegments implements CoordinatorCustomDuty taskId, segmentsToCompact.size(), dataSourceName, entry.getUmbrellaInterval() ); LOG.debugSegments(segmentsToCompact, "Compacting segments"); - // Count the compaction task itself + its sub tasks numSubmittedTasks++; totalTaskSlotsAssigned += slotsRequiredForCurrentTask; } @@ -554,7 +543,7 @@ public class CompactSegments implements CoordinatorCustomDuty return newContext; } - private void addCompactionSnapshotStats( + private void updateCompactionSnapshotStats( Map currentRunAutoCompactionSnapshotBuilders, CompactionSegmentIterator iterator, CoordinatorRunStats stats @@ -563,77 +552,45 @@ public class CompactSegments implements CoordinatorCustomDuty // Mark all the segments remaining in the iterator as "awaiting compaction" while (iterator.hasNext()) { final SegmentsToCompact entry = iterator.next(); - final List segmentsToCompact = entry.getSegments(); - if (!segmentsToCompact.isEmpty()) { - final String dataSourceName = segmentsToCompact.get(0).getDataSource(); - AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent( - dataSourceName, - AutoCompactionSnapshot::builder - ); - snapshotBuilder - .incrementBytesAwaitingCompaction( - segmentsToCompact.stream().mapToLong(DataSegment::getSize).sum() - ) - .incrementIntervalCountAwaitingCompaction( - segmentsToCompact.stream().map(DataSegment::getInterval).distinct().count() - ) - .incrementSegmentCountAwaitingCompaction(segmentsToCompact.size()); + if (!entry.isEmpty()) { + final String dataSourceName = entry.getFirst().getDataSource(); + currentRunAutoCompactionSnapshotBuilders + .computeIfAbsent(dataSourceName, AutoCompactionSnapshot::builder) + .incrementWaitingStats(entry.getStats()); } } // Statistics of all segments considered compacted after this run - Map allCompactedStatistics = iterator.totalCompactedStatistics(); - for (Map.Entry compactionStatisticsEntry : allCompactedStatistics.entrySet()) { - final String dataSource = compactionStatisticsEntry.getKey(); - final CompactionStatistics dataSourceCompactedStatistics = compactionStatisticsEntry.getValue(); - AutoCompactionSnapshot.Builder builder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent( - dataSource, - AutoCompactionSnapshot::builder - ); - builder.incrementBytesCompacted(dataSourceCompactedStatistics.getTotalBytes()); - builder.incrementSegmentCountCompacted(dataSourceCompactedStatistics.getNumSegments()); - builder.incrementIntervalCountCompacted(dataSourceCompactedStatistics.getNumIntervals()); - } + iterator.totalCompactedStatistics().forEach((dataSource, compactedStats) -> { + currentRunAutoCompactionSnapshotBuilders + .computeIfAbsent(dataSource, AutoCompactionSnapshot::builder) + .incrementCompactedStats(compactedStats); + }); // Statistics of all segments considered skipped after this run - Map allSkippedStatistics = iterator.totalSkippedStatistics(); - for (Map.Entry compactionStatisticsEntry : allSkippedStatistics.entrySet()) { - final String dataSource = compactionStatisticsEntry.getKey(); - final CompactionStatistics dataSourceSkippedStatistics = compactionStatisticsEntry.getValue(); - AutoCompactionSnapshot.Builder builder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent( - dataSource, - AutoCompactionSnapshot::builder - ); - builder.incrementBytesSkipped(dataSourceSkippedStatistics.getTotalBytes()) - .incrementSegmentCountSkipped(dataSourceSkippedStatistics.getNumSegments()) - .incrementIntervalCountSkipped(dataSourceSkippedStatistics.getNumIntervals()); - } + iterator.totalSkippedStatistics().forEach((dataSource, dataSourceSkippedStatistics) -> { + currentRunAutoCompactionSnapshotBuilders + .computeIfAbsent(dataSource, AutoCompactionSnapshot::builder) + .incrementSkippedStats(dataSourceSkippedStatistics); + }); final Map currentAutoCompactionSnapshotPerDataSource = new HashMap<>(); - for (Map.Entry autoCompactionSnapshotBuilderEntry - : currentRunAutoCompactionSnapshotBuilders.entrySet()) { - final String dataSource = autoCompactionSnapshotBuilderEntry.getKey(); - final AutoCompactionSnapshot.Builder builder = autoCompactionSnapshotBuilderEntry.getValue(); - - // Build the complete snapshot for the datasource - AutoCompactionSnapshot autoCompactionSnapshot = builder.build(); + currentRunAutoCompactionSnapshotBuilders.forEach((dataSource, builder) -> { + final AutoCompactionSnapshot autoCompactionSnapshot = builder.build(); currentAutoCompactionSnapshotPerDataSource.put(dataSource, autoCompactionSnapshot); - - // Use the complete snapshot to emit metrics - addStatsForDatasource(dataSource, autoCompactionSnapshot, stats); - } + collectSnapshotStats(autoCompactionSnapshot, stats); + }); // Atomic update of autoCompactionSnapshotPerDataSource with the latest from this coordinator run autoCompactionSnapshotPerDataSource.set(currentAutoCompactionSnapshotPerDataSource); } - private void addStatsForDatasource( - String dataSource, + private void collectSnapshotStats( AutoCompactionSnapshot autoCompactionSnapshot, CoordinatorRunStats stats ) { - final RowKey rowKey = RowKey.of(Dimension.DATASOURCE, dataSource); + final RowKey rowKey = RowKey.of(Dimension.DATASOURCE, autoCompactionSnapshot.getDataSource()); stats.add(Stats.Compaction.PENDING_BYTES, rowKey, autoCompactionSnapshot.getBytesAwaitingCompaction()); stats.add(Stats.Compaction.PENDING_SEGMENTS, rowKey, autoCompactionSnapshot.getSegmentCountAwaitingCompaction()); @@ -668,7 +625,6 @@ public class CompactSegments implements CoordinatorCustomDuty } private String compactSegments( - String idPrefix, List segments, int compactionTaskPriority, @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig, @@ -692,7 +648,7 @@ public class CompactSegments implements CoordinatorCustomDuty context = context == null ? new HashMap<>() : context; context.put("priority", compactionTaskPriority); - final String taskId = IdUtils.newTaskId(idPrefix, ClientCompactionTaskQuery.TYPE, dataSource, null); + final String taskId = IdUtils.newTaskId(TASK_ID_PREFIX, ClientCompactionTaskQuery.TYPE, dataSource, null); final Granularity segmentGranularity = granularitySpec == null ? null : granularitySpec.getSegmentGranularity(); final ClientTaskQuery taskPayload = new ClientCompactionTaskQuery( taskId, diff --git a/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java b/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java index e034459fc74..5517bf9e6a4 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java @@ -19,6 +19,7 @@ package org.apache.druid.server.coordinator; +import org.apache.druid.server.coordinator.compact.CompactionStatistics; import org.junit.Assert; import org.junit.Test; @@ -30,17 +31,11 @@ public class AutoCompactionSnapshotTest final String expectedDataSource = "data"; final AutoCompactionSnapshot.Builder builder = AutoCompactionSnapshot.builder(expectedDataSource); - // Increment every stats twice + // Increment every stat twice for (int i = 0; i < 2; i++) { - builder.incrementIntervalCountSkipped(13) - .incrementBytesSkipped(13) - .incrementSegmentCountSkipped(13) - .incrementIntervalCountCompacted(13) - .incrementBytesCompacted(13) - .incrementSegmentCountCompacted(13) - .incrementIntervalCountAwaitingCompaction(13) - .incrementBytesAwaitingCompaction(13) - .incrementSegmentCountAwaitingCompaction(13); + builder.incrementSkippedStats(CompactionStatistics.create(13, 13, 13)); + builder.incrementWaitingStats(CompactionStatistics.create(13, 13, 13)); + builder.incrementCompactedStats(CompactionStatistics.create(13, 13, 13)); } final AutoCompactionSnapshot actual = builder.build(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/compact/CompactionStatusTest.java b/server/src/test/java/org/apache/druid/server/coordinator/compact/CompactionStatusTest.java new file mode 100644 index 00000000000..0e13f8cd0e1 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/compact/CompactionStatusTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.compact; + +import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; +import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexer.partitions.HashedPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; + +public class CompactionStatusTest +{ + private static final String DS_WIKI = "wiki"; + + @Test + public void testFindPartitionsSpecWhenGivenIsNull() + { + final ClientCompactionTaskQueryTuningConfig tuningConfig + = ClientCompactionTaskQueryTuningConfig.from(null); + Assert.assertEquals( + new DynamicPartitionsSpec(null, Long.MAX_VALUE), + CompactionStatus.findPartitionsSpecFromConfig(tuningConfig) + ); + } + + @Test + public void testFindPartitionsSpecWhenGivenIsDynamicWithNullMaxTotalRows() + { + final PartitionsSpec partitionsSpec = new DynamicPartitionsSpec(null, null); + final ClientCompactionTaskQueryTuningConfig tuningConfig + = ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec)); + Assert.assertEquals( + new DynamicPartitionsSpec(null, Long.MAX_VALUE), + CompactionStatus.findPartitionsSpecFromConfig(tuningConfig) + ); + } + + @Test + public void testFindPartitionsSpecWhenGivenIsDynamicWithMaxTotalRows() + { + final PartitionsSpec partitionsSpec = new DynamicPartitionsSpec(null, 1000L); + final ClientCompactionTaskQueryTuningConfig tuningConfig + = ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec)); + Assert.assertEquals( + partitionsSpec, + CompactionStatus.findPartitionsSpecFromConfig(tuningConfig) + ); + } + + @Test + public void testFindPartitionsSpecWhenGivenIsDynamicWithMaxRowsPerSegment() + { + final PartitionsSpec partitionsSpec = new DynamicPartitionsSpec(100, 1000L); + final ClientCompactionTaskQueryTuningConfig tuningConfig + = ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec)); + Assert.assertEquals( + partitionsSpec, + CompactionStatus.findPartitionsSpecFromConfig(tuningConfig) + ); + } + + @Test + public void testFindPartitionsSpecFromConfigWithDeprecatedMaxRowsPerSegmentAndMaxTotalRowsReturnGivenValues() + { + final DataSourceCompactionConfig config = new DataSourceCompactionConfig( + "datasource", + null, + null, + 100, + null, + new UserCompactionTaskQueryTuningConfig( + null, + null, + null, + 1000L, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ), + null, + null, + null, + null, + null, + null, + null + ); + Assert.assertEquals( + new DynamicPartitionsSpec(100, 1000L), + CompactionStatus.findPartitionsSpecFromConfig( + ClientCompactionTaskQueryTuningConfig.from(config) + ) + ); + } + + @Test + public void testFindPartitionsSpecWhenGivenIsHashed() + { + final PartitionsSpec partitionsSpec = + new HashedPartitionsSpec(null, 100, Collections.singletonList("dim")); + final ClientCompactionTaskQueryTuningConfig tuningConfig + = ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec)); + Assert.assertEquals( + partitionsSpec, + CompactionStatus.findPartitionsSpecFromConfig(tuningConfig) + ); + } + + @Test + public void testFindPartitionsSpecWhenGivenIsRange() + { + final PartitionsSpec partitionsSpec = + new DimensionRangePartitionsSpec(null, 10000, Collections.singletonList("dim"), false); + final ClientCompactionTaskQueryTuningConfig tuningConfig + = ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec)); + Assert.assertEquals( + partitionsSpec, + CompactionStatus.findPartitionsSpecFromConfig(tuningConfig) + ); + } + + private static DataSourceCompactionConfig createCompactionConfig( + PartitionsSpec partitionsSpec + ) + { + return new DataSourceCompactionConfig( + DS_WIKI, + null, null, null, null, createTuningConfig(partitionsSpec), + null, null, null, null, null, null, null + ); + } + + private static UserCompactionTaskQueryTuningConfig createTuningConfig( + PartitionsSpec partitionsSpec + ) + { + return new UserCompactionTaskQueryTuningConfig( + null, + null, null, null, null, partitionsSpec, null, null, null, + null, null, null, null, null, null, null, null, null, null + ); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/compact/DataSourceCompactibleSegmentIteratorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/compact/DataSourceCompactibleSegmentIteratorTest.java new file mode 100644 index 00000000000..a2765ccfc09 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/compact/DataSourceCompactibleSegmentIteratorTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.compact; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; +import org.joda.time.Interval; +import org.joda.time.Period; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class DataSourceCompactibleSegmentIteratorTest +{ + @Test + public void testFilterSkipIntervals() + { + final Interval totalInterval = Intervals.of("2018-01-01/2019-01-01"); + final List expectedSkipIntervals = ImmutableList.of( + Intervals.of("2018-01-15/2018-03-02"), + Intervals.of("2018-07-23/2018-10-01"), + Intervals.of("2018-10-02/2018-12-25"), + Intervals.of("2018-12-31/2019-01-01") + ); + final List skipIntervals = DataSourceCompactibleSegmentIterator.filterSkipIntervals( + totalInterval, + Lists.newArrayList( + Intervals.of("2017-12-01/2018-01-15"), + Intervals.of("2018-03-02/2018-07-23"), + Intervals.of("2018-10-01/2018-10-02"), + Intervals.of("2018-12-25/2018-12-31") + ) + ); + + Assert.assertEquals(expectedSkipIntervals, skipIntervals); + } + + @Test + public void testAddSkipIntervalFromLatestAndSort() + { + final List expectedIntervals = ImmutableList.of( + Intervals.of("2018-12-24/2018-12-25"), + Intervals.of("2018-12-29/2019-01-01") + ); + final List fullSkipIntervals = DataSourceCompactibleSegmentIterator.sortAndAddSkipIntervalFromLatest( + DateTimes.of("2019-01-01"), + new Period(72, 0, 0, 0), + null, + ImmutableList.of( + Intervals.of("2018-12-30/2018-12-31"), + Intervals.of("2018-12-24/2018-12-25") + ) + ); + + Assert.assertEquals(expectedIntervals, fullSkipIntervals); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIteratorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIteratorTest.java deleted file mode 100644 index 9c96e6fcdd8..00000000000 --- a/server/src/test/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIteratorTest.java +++ /dev/null @@ -1,477 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.server.coordinator.compact; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; -import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; -import org.apache.druid.indexer.partitions.HashedPartitionsSpec; -import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; -import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.server.coordinator.DataSourceCompactionConfig; -import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig; -import org.joda.time.Interval; -import org.joda.time.Period; -import org.junit.Assert; -import org.junit.Test; - -import java.util.List; - -public class NewestSegmentFirstIteratorTest -{ - @Test - public void testFilterSkipIntervals() - { - final Interval totalInterval = Intervals.of("2018-01-01/2019-01-01"); - final List expectedSkipIntervals = ImmutableList.of( - Intervals.of("2018-01-15/2018-03-02"), - Intervals.of("2018-07-23/2018-10-01"), - Intervals.of("2018-10-02/2018-12-25"), - Intervals.of("2018-12-31/2019-01-01") - ); - final List skipIntervals = NewestSegmentFirstIterator.filterSkipIntervals( - totalInterval, - Lists.newArrayList( - Intervals.of("2017-12-01/2018-01-15"), - Intervals.of("2018-03-02/2018-07-23"), - Intervals.of("2018-10-01/2018-10-02"), - Intervals.of("2018-12-25/2018-12-31") - ) - ); - - Assert.assertEquals(expectedSkipIntervals, skipIntervals); - } - - @Test - public void testAddSkipIntervalFromLatestAndSort() - { - final List expectedIntervals = ImmutableList.of( - Intervals.of("2018-12-24/2018-12-25"), - Intervals.of("2018-12-29/2019-01-01") - ); - final List fullSkipIntervals = NewestSegmentFirstIterator.sortAndAddSkipIntervalFromLatest( - DateTimes.of("2019-01-01"), - new Period(72, 0, 0, 0), - null, - ImmutableList.of( - Intervals.of("2018-12-30/2018-12-31"), - Intervals.of("2018-12-24/2018-12-25") - ) - ); - - Assert.assertEquals(expectedIntervals, fullSkipIntervals); - } - - @Test - public void testFindPartitionsSpecFromConfigWithNullTuningConfigReturnDynamicPartitinosSpecWithMaxTotalRowsOfLongMax() - { - final DataSourceCompactionConfig config = new DataSourceCompactionConfig( - "datasource", - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null - ); - Assert.assertEquals( - new DynamicPartitionsSpec(null, Long.MAX_VALUE), - CompactionStatus.findPartitionsSpecFromConfig( - ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null) - ) - ); - } - - @Test - public void testFindPartitionsSpecFromConfigWithNullMaxTotalRowsReturnLongMaxValue() - { - final DataSourceCompactionConfig config = new DataSourceCompactionConfig( - "datasource", - null, - null, - null, - null, - new UserCompactionTaskQueryTuningConfig( - null, - null, - null, - null, - null, - new DynamicPartitionsSpec(null, null), - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null - ), - null, - null, - null, - null, - null, - null, - null - ); - Assert.assertEquals( - new DynamicPartitionsSpec(null, Long.MAX_VALUE), - CompactionStatus.findPartitionsSpecFromConfig( - ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null) - ) - ); - } - - @Test - public void testFindPartitionsSpecFromConfigWithNonNullMaxTotalRowsReturnGivenValue() - { - final DataSourceCompactionConfig config = new DataSourceCompactionConfig( - "datasource", - null, - null, - null, - null, - new UserCompactionTaskQueryTuningConfig( - null, - null, - null, - null, - null, - new DynamicPartitionsSpec(null, 1000L), - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null - ), - null, - null, - null, - null, - null, - null, - null - ); - Assert.assertEquals( - new DynamicPartitionsSpec(null, 1000L), - CompactionStatus.findPartitionsSpecFromConfig( - ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null) - ) - ); - } - - @Test - public void testFindPartitionsSpecFromConfigWithNonNullMaxRowsPerSegmentReturnGivenValue() - { - final DataSourceCompactionConfig config = new DataSourceCompactionConfig( - "datasource", - null, - null, - null, - null, - new UserCompactionTaskQueryTuningConfig( - null, - null, - null, - null, - null, - new DynamicPartitionsSpec(100, 1000L), - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null - ), - null, - null, - null, - null, - null, - null, - null - ); - Assert.assertEquals( - new DynamicPartitionsSpec(100, 1000L), - CompactionStatus.findPartitionsSpecFromConfig( - ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null) - ) - ); - } - - @Test - public void testFindPartitionsSpecFromConfigWithDeprecatedMaxRowsPerSegmentAndMaxTotalRowsReturnGivenValues() - { - final DataSourceCompactionConfig config = new DataSourceCompactionConfig( - "datasource", - null, - null, - 100, - null, - new UserCompactionTaskQueryTuningConfig( - null, - null, - null, - 1000L, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null - ), - null, - null, - null, - null, - null, - null, - null - ); - Assert.assertEquals( - new DynamicPartitionsSpec(100, 1000L), - CompactionStatus.findPartitionsSpecFromConfig( - ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null) - ) - ); - } - - @Test - public void testFindPartitionsSpecFromConfigWithDeprecatedMaxRowsPerSegmentAndPartitionsSpecIgnoreDeprecatedOne() - { - final DataSourceCompactionConfig config = new DataSourceCompactionConfig( - "datasource", - null, - null, - 100, - null, - new UserCompactionTaskQueryTuningConfig( - null, - null, - null, - null, - null, - new DynamicPartitionsSpec(null, null), - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null - ), - null, - null, - null, - null, - null, - null, - null - ); - Assert.assertEquals( - new DynamicPartitionsSpec(null, Long.MAX_VALUE), - CompactionStatus.findPartitionsSpecFromConfig( - ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null) - ) - ); - } - - @Test - public void testFindPartitionsSpecFromConfigWithDeprecatedMaxTotalRowsAndPartitionsSpecIgnoreDeprecatedOne() - { - final DataSourceCompactionConfig config = new DataSourceCompactionConfig( - "datasource", - null, - null, - null, - null, - new UserCompactionTaskQueryTuningConfig( - null, - null, - null, - 1000L, - null, - new DynamicPartitionsSpec(null, null), - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null - ), - null, - null, - null, - null, - null, - null, - null - ); - Assert.assertEquals( - new DynamicPartitionsSpec(null, Long.MAX_VALUE), - CompactionStatus.findPartitionsSpecFromConfig( - ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null) - ) - ); - } - - @Test - public void testFindPartitionsSpecFromConfigWithHashPartitionsSpec() - { - final DataSourceCompactionConfig config = new DataSourceCompactionConfig( - "datasource", - null, - null, - null, - null, - new UserCompactionTaskQueryTuningConfig( - null, - null, - null, - null, - null, - new HashedPartitionsSpec(null, 10, ImmutableList.of("dim")), - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null - ), - null, - null, - null, - null, - null, - null, - null - ); - Assert.assertEquals( - new HashedPartitionsSpec(null, 10, ImmutableList.of("dim")), - CompactionStatus.findPartitionsSpecFromConfig( - ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null) - ) - ); - } - - @Test - public void testFindPartitionsSpecFromConfigWithRangePartitionsSpec() - { - final DataSourceCompactionConfig config = new DataSourceCompactionConfig( - "datasource", - null, - null, - null, - null, - new UserCompactionTaskQueryTuningConfig( - null, - null, - null, - null, - null, - new SingleDimensionPartitionsSpec(10000, null, "dim", false), - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null - ), - null, - null, - null, - null, - null, - null, - null - ); - Assert.assertEquals( - new SingleDimensionPartitionsSpec(10000, null, "dim", false), - CompactionStatus.findPartitionsSpecFromConfig( - ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null) - ) - ); - } -} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicyTest.java index 31e269f50f4..8f24a4ebb7e 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicyTest.java @@ -88,7 +88,7 @@ public class NewestSegmentFirstPolicyTest public void testLargeOffsetAndSmallSegmentInterval() { final Period segmentPeriod = new Period("PT1H"); - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P2D"), null)), ImmutableMap.of( DATA_SOURCE, @@ -113,7 +113,7 @@ public class NewestSegmentFirstPolicyTest public void testSmallOffsetAndLargeSegmentInterval() { final Period segmentPeriod = new Period("PT1H"); - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1M"), null)), ImmutableMap.of( DATA_SOURCE, @@ -146,7 +146,7 @@ public class NewestSegmentFirstPolicyTest public void testLargeGapInData() { final Period segmentPeriod = new Period("PT1H"); - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H1M"), null)), ImmutableMap.of( DATA_SOURCE, @@ -179,7 +179,7 @@ public class NewestSegmentFirstPolicyTest @Test public void testHugeShard() { - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"), null)), ImmutableMap.of( DATA_SOURCE, @@ -229,7 +229,7 @@ public class NewestSegmentFirstPolicyTest @Test public void testManySegmentsPerShard() { - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, new Period("P1D"), null)), ImmutableMap.of( DATA_SOURCE, @@ -287,7 +287,7 @@ public class NewestSegmentFirstPolicyTest { final String unknownDataSource = "unknown"; final Period segmentPeriod = new Period("PT1H"); - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of( unknownDataSource, createCompactionConfig(10000, new Period("P2D"), null), @@ -337,7 +337,7 @@ public class NewestSegmentFirstPolicyTest 2 ) ); - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(inputSegmentSizeBytes, new Period("P0D"), null)), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() @@ -374,7 +374,7 @@ public class NewestSegmentFirstPolicyTest ) ); - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), null)), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() @@ -395,7 +395,7 @@ public class NewestSegmentFirstPolicyTest ) ); - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), null)), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() @@ -412,7 +412,7 @@ public class NewestSegmentFirstPolicyTest new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("P1D")) ); - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() @@ -445,7 +445,7 @@ public class NewestSegmentFirstPolicyTest new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("PT5H")) ); - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() @@ -471,7 +471,7 @@ public class NewestSegmentFirstPolicyTest new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("PT5H")) ); - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, null, null))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() @@ -496,7 +496,7 @@ public class NewestSegmentFirstPolicyTest public void testWithSkipIntervals() { final Period segmentPeriod = new Period("PT1H"); - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"), null)), ImmutableMap.of( DATA_SOURCE, @@ -536,7 +536,7 @@ public class NewestSegmentFirstPolicyTest public void testHoleInSearchInterval() { final Period segmentPeriod = new Period("PT1H"); - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H"), null)), ImmutableMap.of( DATA_SOURCE, @@ -586,7 +586,7 @@ public class NewestSegmentFirstPolicyTest new SegmentGenerateSpec(Intervals.of("2017-10-01T00:00:00/2017-12-31T00:00:00"), new Period("P1D")) ); - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() @@ -635,7 +635,7 @@ public class NewestSegmentFirstPolicyTest new SegmentGenerateSpec(Intervals.of("2020-02-08/2020-02-15"), new Period("P7D")) ); - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() @@ -670,7 +670,7 @@ public class NewestSegmentFirstPolicyTest new SegmentGenerateSpec(Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"), new Period("P1D")) ); - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, null, null))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() @@ -696,7 +696,7 @@ public class NewestSegmentFirstPolicyTest new SegmentGenerateSpec(Intervals.of("2017-10-01T01:00:00/2017-10-01T02:00:00"), new Period("PT1H"), "1994-04-30T00:00:00.000Z", null) ); - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() @@ -721,7 +721,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = IndexSpec.DEFAULT.asMap(mapper); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); + PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY final SegmentTimeline timeline = createTimeline( @@ -740,7 +740,7 @@ public class NewestSegmentFirstPolicyTest ); // Auto compaction config sets segmentGranularity=DAY - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() @@ -754,7 +754,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = IndexSpec.DEFAULT.asMap(mapper); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); + PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY final SegmentTimeline timeline = createTimeline( @@ -773,7 +773,7 @@ public class NewestSegmentFirstPolicyTest ); // Auto compaction config sets segmentGranularity=DAY - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() @@ -787,7 +787,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = IndexSpec.DEFAULT.asMap(mapper); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); + PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY final SegmentTimeline timeline = createTimeline( @@ -806,7 +806,7 @@ public class NewestSegmentFirstPolicyTest ); // Auto compaction config sets segmentGranularity=YEAR - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() @@ -830,7 +830,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = IndexSpec.DEFAULT.asMap(mapper); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); + PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY final SegmentTimeline timeline = createTimeline( @@ -849,7 +849,7 @@ public class NewestSegmentFirstPolicyTest ); // Auto compaction config sets segmentGranularity=YEAR - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() @@ -873,7 +873,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = IndexSpec.DEFAULT.asMap(mapper); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); + PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY final SegmentTimeline timeline = createTimeline( @@ -887,7 +887,7 @@ public class NewestSegmentFirstPolicyTest // Duration of new segmentGranularity is the same as before (P1D), // but we changed the timezone from UTC to Bangkok in the auto compaction spec - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig( 130000, @@ -925,7 +925,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = IndexSpec.DEFAULT.asMap(mapper); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); + PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY final SegmentTimeline timeline = createTimeline( @@ -938,7 +938,7 @@ public class NewestSegmentFirstPolicyTest ); // Duration of new segmentGranularity is the same as before (P1D), but we changed the origin in the autocompaction spec - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig( 130000, @@ -976,7 +976,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = IndexSpec.DEFAULT.asMap(mapper); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); + PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null)); // Create segments that were compacted (CompactionState != null) and have // rollup=false for interval 2017-10-01T00:00:00/2017-10-02T00:00:00, @@ -1004,7 +1004,7 @@ public class NewestSegmentFirstPolicyTest ); // Auto compaction config sets rollup=true - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(null, null, true))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() @@ -1036,7 +1036,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = IndexSpec.DEFAULT.asMap(mapper); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); + PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null)); // Create segments that were compacted (CompactionState != null) and have // queryGranularity=DAY for interval 2017-10-01T00:00:00/2017-10-02T00:00:00, @@ -1064,7 +1064,7 @@ public class NewestSegmentFirstPolicyTest ); // Auto compaction config sets queryGranularity=MINUTE - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(null, Granularities.MINUTE, null))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() @@ -1096,7 +1096,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = IndexSpec.DEFAULT.asMap(mapper); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); + PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null)); // Create segments that were compacted (CompactionState != null) and have // Dimensions=["foo", "bar"] for interval 2017-10-01T00:00:00/2017-10-02T00:00:00, @@ -1131,7 +1131,7 @@ public class NewestSegmentFirstPolicyTest ); // Auto compaction config sets Dimensions=["foo"] - CompactionSegmentIterator iterator = policy.reset( + CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig( 130000, new Period("P0D"), @@ -1172,7 +1172,7 @@ public class NewestSegmentFirstPolicyTest Assert.assertFalse(iterator.hasNext()); // Auto compaction config sets Dimensions=null - iterator = policy.reset( + iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig( 130000, new Period("P0D"), @@ -1195,7 +1195,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = IndexSpec.DEFAULT.asMap(mapper); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); + PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null)); // Create segments that were compacted (CompactionState != null) and have // filter=SelectorDimFilter("dim1", "foo", null) for interval 2017-10-01T00:00:00/2017-10-02T00:00:00, @@ -1251,7 +1251,7 @@ public class NewestSegmentFirstPolicyTest ); // Auto compaction config sets filter=SelectorDimFilter("dim1", "bar", null) - CompactionSegmentIterator iterator = policy.reset( + CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig( 130000, new Period("P0D"), @@ -1292,7 +1292,7 @@ public class NewestSegmentFirstPolicyTest Assert.assertFalse(iterator.hasNext()); // Auto compaction config sets filter=null - iterator = policy.reset( + iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig( 130000, new Period("P0D"), @@ -1319,7 +1319,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = IndexSpec.DEFAULT.asMap(mapper); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); + PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null)); // Create segments that were compacted (CompactionState != null) and have // metricsSpec={CountAggregatorFactory("cnt")} for interval 2017-10-01T00:00:00/2017-10-02T00:00:00, @@ -1375,7 +1375,7 @@ public class NewestSegmentFirstPolicyTest ); // Auto compaction config sets metricsSpec={CountAggregatorFactory("cnt"), LongSumAggregatorFactory("val", "val")} - CompactionSegmentIterator iterator = policy.reset( + CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig( 130000, new Period("P0D"), @@ -1416,7 +1416,7 @@ public class NewestSegmentFirstPolicyTest Assert.assertFalse(iterator.hasNext()); // Auto compaction config sets metricsSpec=null - iterator = policy.reset( + iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig( 130000, new Period("P0D"), @@ -1440,7 +1440,7 @@ public class NewestSegmentFirstPolicyTest new SegmentGenerateSpec(Intervals.of("2017-10-01T01:00:00/2017-10-01T02:00:00"), new Period("PT1H"), "1994-04-30T00:00:00.000Z", null) ); - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() @@ -1468,7 +1468,7 @@ public class NewestSegmentFirstPolicyTest // Different indexSpec as what is set in the auto compaction config IndexSpec newIndexSpec = IndexSpec.builder().withBitmapSerdeFactory(new ConciseBitmapSerdeFactory()).build(); Map newIndexSpecMap = mapper.convertValue(newIndexSpec, new TypeReference>() {}); - PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); + PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY final SegmentTimeline timeline = createTimeline( @@ -1481,7 +1481,7 @@ public class NewestSegmentFirstPolicyTest ); // Duration of new segmentGranularity is the same as before (P1D) - final CompactionSegmentIterator iterator = policy.reset( + final CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig( 130000, @@ -1517,7 +1517,7 @@ public class NewestSegmentFirstPolicyTest public void testIteratorDoesNotReturnSegmentWithChangingAppendableIndexSpec() { NullHandling.initializeForTests(); - PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); + PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null)); final SegmentTimeline timeline = createTimeline( new SegmentGenerateSpec( Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), @@ -1534,7 +1534,7 @@ public class NewestSegmentFirstPolicyTest ) ); - CompactionSegmentIterator iterator = policy.reset( + CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig( 130000, new Period("P0D"), @@ -1569,7 +1569,7 @@ public class NewestSegmentFirstPolicyTest ); Assert.assertFalse(iterator.hasNext()); - iterator = policy.reset( + iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig( 130000, new Period("P0D"), @@ -1608,7 +1608,7 @@ public class NewestSegmentFirstPolicyTest @Test public void testSkipAllGranularityToDefault() { - CompactionSegmentIterator iterator = policy.reset( + CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P0D"), @@ -1640,7 +1640,7 @@ public class NewestSegmentFirstPolicyTest @Test public void testSkipFirstHalfEternityToDefault() { - CompactionSegmentIterator iterator = policy.reset( + CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P0D"), @@ -1672,7 +1672,7 @@ public class NewestSegmentFirstPolicyTest @Test public void testSkipSecondHalfOfEternityToDefault() { - CompactionSegmentIterator iterator = policy.reset( + CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P0D"), @@ -1704,7 +1704,7 @@ public class NewestSegmentFirstPolicyTest @Test public void testSkipAllToAllGranularity() { - CompactionSegmentIterator iterator = policy.reset( + CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P0D"), @@ -1736,7 +1736,7 @@ public class NewestSegmentFirstPolicyTest @Test public void testSkipAllToFinerGranularity() { - CompactionSegmentIterator iterator = policy.reset( + CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P0D"), @@ -1799,7 +1799,7 @@ public class NewestSegmentFirstPolicyTest 0, 1); - CompactionSegmentIterator iterator = policy.reset( + CompactionSegmentIterator iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P0D"), @@ -1850,7 +1850,7 @@ public class NewestSegmentFirstPolicyTest TombstoneShardSpec.INSTANCE, 0, 1); - iterator = policy.reset( + iterator = policy.createIterator( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P0D"),